xfs simplify and speed up direct I/O completions

Our current handling of direct I/O completions is rather suboptimal,
because we defer it to a workqueue more often than needed, and we
perform a much to aggressive flush of the workqueue in case unwritten
extent conversions happen.

This patch changes the direct I/O reads to not even use a completion
handler, as we don't bother to use it at all, and to perform the unwritten
extent conversions in caller context for synchronous direct I/O.

For a small I/O size direct I/O workload on a consumer grade SSD, such as
the untar of a kernel tree inside qemu this patch gives speedups of
about 5%.  Getting us much closer to the speed of a native block device,
or a fully allocated XFS file.

Signed-off-by: Christoph Hellwig <hch@lst.de>
Reviewed-by: Dave Chinner <dchinner@redhat.com>
Signed-off-by: Alex Elder <aelder@sgi.com>
diff --git a/fs/xfs/linux-2.6/xfs_aops.c b/fs/xfs/linux-2.6/xfs_aops.c
index 13622d5..d24e78f 100644
--- a/fs/xfs/linux-2.6/xfs_aops.c
+++ b/fs/xfs/linux-2.6/xfs_aops.c
@@ -202,23 +202,17 @@
 }
 
 /*
- * Schedule IO completion handling on a xfsdatad if this was
- * the final hold on this ioend. If we are asked to wait,
- * flush the workqueue.
+ * Schedule IO completion handling on the final put of an ioend.
  */
 STATIC void
 xfs_finish_ioend(
-	xfs_ioend_t	*ioend,
-	int		wait)
+	struct xfs_ioend	*ioend)
 {
 	if (atomic_dec_and_test(&ioend->io_remaining)) {
-		struct workqueue_struct *wq;
-
-		wq = (ioend->io_type == IO_UNWRITTEN) ?
-			xfsconvertd_workqueue : xfsdatad_workqueue;
-		queue_work(wq, &ioend->io_work);
-		if (wait)
-			flush_workqueue(wq);
+		if (ioend->io_type == IO_UNWRITTEN)
+			queue_work(xfsconvertd_workqueue, &ioend->io_work);
+		else
+			queue_work(xfsdatad_workqueue, &ioend->io_work);
 	}
 }
 
@@ -262,7 +256,7 @@
 	 */
 	if (error == EAGAIN) {
 		atomic_inc(&ioend->io_remaining);
-		xfs_finish_ioend(ioend, 0);
+		xfs_finish_ioend(ioend);
 		/* ensure we don't spin on blocked ioends */
 		delay(1);
 	} else {
@@ -273,6 +267,17 @@
 }
 
 /*
+ * Call IO completion handling in caller context on the final put of an ioend.
+ */
+STATIC void
+xfs_finish_ioend_sync(
+	struct xfs_ioend	*ioend)
+{
+	if (atomic_dec_and_test(&ioend->io_remaining))
+		xfs_end_io(&ioend->io_work);
+}
+
+/*
  * Allocate and initialise an IO completion structure.
  * We need to track unwritten extent write completion here initially.
  * We'll need to extend this for updating the ondisk inode size later
@@ -353,7 +358,7 @@
 	bio->bi_end_io = NULL;
 	bio_put(bio);
 
-	xfs_finish_ioend(ioend, 0);
+	xfs_finish_ioend(ioend);
 }
 
 STATIC void
@@ -495,7 +500,7 @@
 		}
 		if (bio)
 			xfs_submit_ioend_bio(wbc, ioend, bio);
-		xfs_finish_ioend(ioend, 0);
+		xfs_finish_ioend(ioend);
 	} while ((ioend = next) != NULL);
 }
 
@@ -1406,60 +1411,27 @@
 	return __xfs_get_blocks(inode, iblock, bh_result, create, 1);
 }
 
+/*
+ * Complete a direct I/O write request.
+ *
+ * If the private argument is non-NULL __xfs_get_blocks signals us that we
+ * need to issue a transaction to convert the range from unwritten to written
+ * extents.  In case this is regular synchronous I/O we just call xfs_end_io
+ * to do this and we are done.  But in case this was a successfull AIO
+ * request this handler is called from interrupt context, from which we
+ * can't start transactions.  In that case offload the I/O completion to
+ * the workqueues we also use for buffered I/O completion.
+ */
 STATIC void
-xfs_end_io_direct(
-	struct kiocb	*iocb,
-	loff_t		offset,
-	ssize_t		size,
-	void		*private,
-	int		ret,
-	bool		is_async)
+xfs_end_io_direct_write(
+	struct kiocb		*iocb,
+	loff_t			offset,
+	ssize_t			size,
+	void			*private,
+	int			ret,
+	bool			is_async)
 {
-	xfs_ioend_t	*ioend = iocb->private;
-	bool		complete_aio = is_async;
-
-	/*
-	 * Non-NULL private data means we need to issue a transaction to
-	 * convert a range from unwritten to written extents.  This needs
-	 * to happen from process context but aio+dio I/O completion
-	 * happens from irq context so we need to defer it to a workqueue.
-	 * This is not necessary for synchronous direct I/O, but we do
-	 * it anyway to keep the code uniform and simpler.
-	 *
-	 * Well, if only it were that simple. Because synchronous direct I/O
-	 * requires extent conversion to occur *before* we return to userspace,
-	 * we have to wait for extent conversion to complete. Look at the
-	 * iocb that has been passed to us to determine if this is AIO or
-	 * not. If it is synchronous, tell xfs_finish_ioend() to kick the
-	 * workqueue and wait for it to complete.
-	 *
-	 * The core direct I/O code might be changed to always call the
-	 * completion handler in the future, in which case all this can
-	 * go away.
-	 */
-	ioend->io_offset = offset;
-	ioend->io_size = size;
-	if (ioend->io_type == IO_READ) {
-		xfs_finish_ioend(ioend, 0);
-	} else if (private && size > 0) {
-		if (is_async) {
-			ioend->io_iocb = iocb;
-			ioend->io_result = ret;
-			complete_aio = false;
-			xfs_finish_ioend(ioend, 0);
-		} else {
-			xfs_finish_ioend(ioend, 1);
-		}
-	} else {
-		/*
-		 * A direct I/O write ioend starts it's life in unwritten
-		 * state in case they map an unwritten extent.  This write
-		 * didn't map an unwritten extent so switch it's completion
-		 * handler.
-		 */
-		ioend->io_type = IO_NEW;
-		xfs_finish_ioend(ioend, 0);
-	}
+	struct xfs_ioend	*ioend = iocb->private;
 
 	/*
 	 * blockdev_direct_IO can return an error even after the I/O
@@ -1468,8 +1440,27 @@
 	 */
 	iocb->private = NULL;
 
-	if (complete_aio)
-		aio_complete(iocb, ret, 0);
+	ioend->io_offset = offset;
+	ioend->io_size = size;
+	if (private && size > 0)
+		ioend->io_type = IO_UNWRITTEN;
+
+	if (is_async) {
+		/*
+		 * If we are converting an unwritten extent we need to delay
+		 * the AIO completion until after the unwrittent extent
+		 * conversion has completed, otherwise do it ASAP.
+		 */
+		if (ioend->io_type == IO_UNWRITTEN) {
+			ioend->io_iocb = iocb;
+			ioend->io_result = ret;
+		} else {
+			aio_complete(iocb, ret, 0);
+		}
+		xfs_finish_ioend(ioend);
+	} else {
+		xfs_finish_ioend_sync(ioend);
+	}
 }
 
 STATIC ssize_t
@@ -1480,23 +1471,26 @@
 	loff_t			offset,
 	unsigned long		nr_segs)
 {
-	struct file	*file = iocb->ki_filp;
-	struct inode	*inode = file->f_mapping->host;
-	struct block_device *bdev;
-	ssize_t		ret;
+	struct inode		*inode = iocb->ki_filp->f_mapping->host;
+	struct block_device	*bdev = xfs_find_bdev_for_inode(inode);
+	ssize_t			ret;
 
-	bdev = xfs_find_bdev_for_inode(inode);
+	if (rw & WRITE) {
+		iocb->private = xfs_alloc_ioend(inode, IO_NEW);
 
-	iocb->private = xfs_alloc_ioend(inode, rw == WRITE ?
-					IO_UNWRITTEN : IO_READ);
+		ret = blockdev_direct_IO_no_locking(rw, iocb, inode, bdev, iov,
+						    offset, nr_segs,
+						    xfs_get_blocks_direct,
+						    xfs_end_io_direct_write);
+		if (ret != -EIOCBQUEUED && iocb->private)
+			xfs_destroy_ioend(iocb->private);
+	} else {
+		ret = blockdev_direct_IO_no_locking(rw, iocb, inode, bdev, iov,
+						    offset, nr_segs,
+						    xfs_get_blocks_direct,
+						    NULL);
+	}
 
-	ret = blockdev_direct_IO_no_locking(rw, iocb, inode, bdev, iov,
-					    offset, nr_segs,
-					    xfs_get_blocks_direct,
-					    xfs_end_io_direct);
-
-	if (unlikely(ret != -EIOCBQUEUED && iocb->private))
-		xfs_destroy_ioend(iocb->private);
 	return ret;
 }