[XFS] Initial pass at going directly-to-bio on the buffered IO path.  This
allows us to submit much larger I/Os instead of sending down lots of small
buffer_heads.  To do this we need to have a rather complicated I/O
submission and completion tracking infrastructure.  Part of the latter has
been merged already a long time ago for direct I/O support. Part of the
problem is that we need to track sub-pagesize regions and for that we
still need buffer_heads for the time beeing.  Long-term I hope we can move
to better data strucutures and/or maybe move this to fs/mpage.c instead of
having it in XFS.  Original patch from Nathan Scott with various updates
from David Chinner and Christoph Hellwig.

SGI-PV: 947118
SGI-Modid: xfs-linux-melb:xfs-kern:203822a

Signed-off-by: Christoph Hellwig <hch@sgi.com>
Signed-off-by: Nathan Scott <nathans@sgi.com>
diff --git a/fs/xfs/linux-2.6/xfs_aops.c b/fs/xfs/linux-2.6/xfs_aops.c
index 3f6b9e2..e99d04d 100644
--- a/fs/xfs/linux-2.6/xfs_aops.c
+++ b/fs/xfs/linux-2.6/xfs_aops.c
@@ -43,8 +43,6 @@
 #include <linux/writeback.h>
 
 STATIC void xfs_count_page_state(struct page *, int *, int *, int *);
-STATIC void xfs_convert_page(struct inode *, struct page *, xfs_iomap_t *,
-		struct writeback_control *wbc, void *, int, int);
 
 #if defined(XFS_RW_TRACE)
 void
@@ -58,7 +56,7 @@
 	bhv_desc_t	*bdp;
 	vnode_t		*vp = LINVFS_GET_VP(inode);
 	loff_t		isize = i_size_read(inode);
-	loff_t		offset = (loff_t)page->index << PAGE_CACHE_SHIFT;
+	loff_t		offset = page_offset(page);
 	int		delalloc = -1, unmapped = -1, unwritten = -1;
 
 	if (page_has_buffers(page))
@@ -103,15 +101,56 @@
 		queue_work(xfsdatad_workqueue, &ioend->io_work);
 }
 
+/*
+ * We're now finished for good with this ioend structure.
+ * Update the page state via the associated buffer_heads,
+ * release holds on the inode and bio, and finally free
+ * up memory.  Do not use the ioend after this.
+ */
 STATIC void
 xfs_destroy_ioend(
 	xfs_ioend_t		*ioend)
 {
+	struct buffer_head	*bh, *next;
+
+	for (bh = ioend->io_buffer_head; bh; bh = next) {
+		next = bh->b_private;
+		bh->b_end_io(bh, ioend->io_uptodate);
+	}
+
 	vn_iowake(ioend->io_vnode);
 	mempool_free(ioend, xfs_ioend_pool);
 }
 
 /*
+ * Buffered IO write completion for delayed allocate extents.
+ * TODO: Update ondisk isize now that we know the file data
+ * has been flushed (i.e. the notorious "NULL file" problem).
+ */
+STATIC void
+xfs_end_bio_delalloc(
+	void			*data)
+{
+	xfs_ioend_t		*ioend = data;
+
+	xfs_destroy_ioend(ioend);
+}
+
+/*
+ * Buffered IO write completion for regular, written extents.
+ */
+STATIC void
+xfs_end_bio_written(
+	void			*data)
+{
+	xfs_ioend_t		*ioend = data;
+
+	xfs_destroy_ioend(ioend);
+}
+
+/*
+ * IO write completion for unwritten extents.
+ *
  * Issue transactions to convert a buffer range from unwritten
  * to written extents.
  */
@@ -123,21 +162,10 @@
 	vnode_t			*vp = ioend->io_vnode;
 	xfs_off_t		offset = ioend->io_offset;
 	size_t			size = ioend->io_size;
-	struct buffer_head	*bh, *next;
 	int			error;
 
 	if (ioend->io_uptodate)
 		VOP_BMAP(vp, offset, size, BMAPI_UNWRITTEN, NULL, NULL, error);
-
-	/* ioend->io_buffer_head is only non-NULL for buffered I/O */
-	for (bh = ioend->io_buffer_head; bh; bh = next) {
-		next = bh->b_private;
-
-		bh->b_end_io = NULL;
-		clear_buffer_unwritten(bh);
-		end_buffer_async_write(bh, ioend->io_uptodate);
-	}
-
 	xfs_destroy_ioend(ioend);
 }
 
@@ -149,7 +177,8 @@
  */
 STATIC xfs_ioend_t *
 xfs_alloc_ioend(
-	struct inode		*inode)
+	struct inode		*inode,
+	unsigned int		type)
 {
 	xfs_ioend_t		*ioend;
 
@@ -162,45 +191,25 @@
 	 */
 	atomic_set(&ioend->io_remaining, 1);
 	ioend->io_uptodate = 1; /* cleared if any I/O fails */
+	ioend->io_list = NULL;
+	ioend->io_type = type;
 	ioend->io_vnode = LINVFS_GET_VP(inode);
 	ioend->io_buffer_head = NULL;
+	ioend->io_buffer_tail = NULL;
 	atomic_inc(&ioend->io_vnode->v_iocount);
 	ioend->io_offset = 0;
 	ioend->io_size = 0;
 
-	INIT_WORK(&ioend->io_work, xfs_end_bio_unwritten, ioend);
+	if (type == IOMAP_UNWRITTEN)
+		INIT_WORK(&ioend->io_work, xfs_end_bio_unwritten, ioend);
+	else if (type == IOMAP_DELAY)
+		INIT_WORK(&ioend->io_work, xfs_end_bio_delalloc, ioend);
+	else
+		INIT_WORK(&ioend->io_work, xfs_end_bio_written, ioend);
 
 	return ioend;
 }
 
-void
-linvfs_unwritten_done(
-	struct buffer_head	*bh,
-	int			uptodate)
-{
-	xfs_ioend_t		*ioend = bh->b_private;
-	static spinlock_t	unwritten_done_lock = SPIN_LOCK_UNLOCKED;
-	unsigned long		flags;
-
-	ASSERT(buffer_unwritten(bh));
-	bh->b_end_io = NULL;
-
-	if (!uptodate)
-		ioend->io_uptodate = 0;
-
-	/*
-	 * Deep magic here.  We reuse b_private in the buffer_heads to build
-	 * a chain for completing the I/O from user context after we've issued
-	 * a transaction to convert the unwritten extent.
-	 */
-	spin_lock_irqsave(&unwritten_done_lock, flags);
-	bh->b_private = ioend->io_buffer_head;
-	ioend->io_buffer_head = bh;
-	spin_unlock_irqrestore(&unwritten_done_lock, flags);
-
-	xfs_finish_ioend(ioend);
-}
-
 STATIC int
 xfs_map_blocks(
 	struct inode		*inode,
@@ -228,7 +237,7 @@
 	xfs_iomap_t		*iomapp,
 	unsigned long		offset)
 {
-	loff_t			full_offset;	/* offset from start of file */
+	xfs_off_t		full_offset;	/* offset from start of file */
 
 	ASSERT(offset < PAGE_CACHE_SIZE);
 
@@ -243,16 +252,223 @@
 	return NULL;
 }
 
+/*
+ * BIO completion handler for buffered IO.
+ */
+STATIC int
+xfs_end_bio(
+	struct bio		*bio,
+	unsigned int		bytes_done,
+	int			error)
+{
+	xfs_ioend_t		*ioend = bio->bi_private;
+
+	if (bio->bi_size)
+		return 1;
+
+	ASSERT(ioend);
+	ASSERT(atomic_read(&bio->bi_cnt) >= 1);
+
+	/* Toss bio and pass work off to an xfsdatad thread */
+	if (!test_bit(BIO_UPTODATE, &bio->bi_flags))
+		ioend->io_uptodate = 0;
+	bio->bi_private = NULL;
+	bio->bi_end_io = NULL;
+
+	bio_put(bio);
+	xfs_finish_ioend(ioend);
+	return 0;
+}
+
+STATIC void
+xfs_submit_ioend_bio(
+	xfs_ioend_t	*ioend,
+	struct bio	*bio)
+{
+	atomic_inc(&ioend->io_remaining);
+
+	bio->bi_private = ioend;
+	bio->bi_end_io = xfs_end_bio;
+
+	submit_bio(WRITE, bio);
+	ASSERT(!bio_flagged(bio, BIO_EOPNOTSUPP));
+	bio_put(bio);
+}
+
+STATIC struct bio *
+xfs_alloc_ioend_bio(
+	struct buffer_head	*bh)
+{
+	struct bio		*bio;
+	int			nvecs = bio_get_nr_vecs(bh->b_bdev);
+
+	do {
+		bio = bio_alloc(GFP_NOIO, nvecs);
+		nvecs >>= 1;
+	} while (!bio);
+
+	ASSERT(bio->bi_private == NULL);
+	bio->bi_sector = bh->b_blocknr * (bh->b_size >> 9);
+	bio->bi_bdev = bh->b_bdev;
+	bio_get(bio);
+	return bio;
+}
+
+STATIC void
+xfs_start_buffer_writeback(
+	struct buffer_head	*bh)
+{
+	ASSERT(buffer_mapped(bh));
+	ASSERT(buffer_locked(bh));
+	ASSERT(!buffer_delay(bh));
+	ASSERT(!buffer_unwritten(bh));
+
+	mark_buffer_async_write(bh);
+	set_buffer_uptodate(bh);
+	clear_buffer_dirty(bh);
+}
+
+STATIC void
+xfs_start_page_writeback(
+	struct page		*page,
+	struct writeback_control *wbc,
+	int			clear_dirty,
+	int			buffers)
+{
+	ASSERT(PageLocked(page));
+	ASSERT(!PageWriteback(page));
+	set_page_writeback(page);
+	if (clear_dirty)
+		clear_page_dirty(page);
+	unlock_page(page);
+	if (!buffers) {
+		end_page_writeback(page);
+		wbc->pages_skipped++;	/* We didn't write this page */
+	}
+}
+
+static inline int bio_add_buffer(struct bio *bio, struct buffer_head *bh)
+{
+	return bio_add_page(bio, bh->b_page, bh->b_size, bh_offset(bh));
+}
+
+/*
+ * Submit all of the bios for all of the ioends we have saved up,
+ * covering the initial writepage page and also any probed pages.
+ */
+STATIC void
+xfs_submit_ioend(
+	xfs_ioend_t		*ioend)
+{
+	xfs_ioend_t		*next;
+	struct buffer_head	*bh;
+	struct bio		*bio;
+	sector_t		lastblock = 0;
+
+	do {
+		next = ioend->io_list;
+		bio = NULL;
+
+		for (bh = ioend->io_buffer_head; bh; bh = bh->b_private) {
+			xfs_start_buffer_writeback(bh);
+
+			if (!bio) {
+ retry:
+				bio = xfs_alloc_ioend_bio(bh);
+			} else if (bh->b_blocknr != lastblock + 1) {
+				xfs_submit_ioend_bio(ioend, bio);
+				goto retry;
+			}
+
+			if (bio_add_buffer(bio, bh) != bh->b_size) {
+				xfs_submit_ioend_bio(ioend, bio);
+				goto retry;
+			}
+
+			lastblock = bh->b_blocknr;
+		}
+		if (bio)
+			xfs_submit_ioend_bio(ioend, bio);
+		xfs_finish_ioend(ioend);
+	} while ((ioend = next) != NULL);
+}
+
+/*
+ * Cancel submission of all buffer_heads so far in this endio.
+ * Toss the endio too.  Only ever called for the initial page
+ * in a writepage request, so only ever one page.
+ */
+STATIC void
+xfs_cancel_ioend(
+	xfs_ioend_t		*ioend)
+{
+	xfs_ioend_t		*next;
+	struct buffer_head	*bh, *next_bh;
+
+	do {
+		next = ioend->io_list;
+		bh = ioend->io_buffer_head;
+		do {
+			next_bh = bh->b_private;
+			clear_buffer_async_write(bh);
+			unlock_buffer(bh);
+		} while ((bh = next_bh) != NULL);
+
+		vn_iowake(ioend->io_vnode);
+		mempool_free(ioend, xfs_ioend_pool);
+	} while ((ioend = next) != NULL);
+}
+
+/*
+ * Test to see if we've been building up a completion structure for
+ * earlier buffers -- if so, we try to append to this ioend if we
+ * can, otherwise we finish off any current ioend and start another.
+ * Return true if we've finished the given ioend.
+ */
+STATIC void
+xfs_add_to_ioend(
+	struct inode		*inode,
+	struct buffer_head	*bh,
+	unsigned int		p_offset,
+	unsigned int		type,
+	xfs_ioend_t		**result,
+	int			need_ioend)
+{
+	xfs_ioend_t		*ioend = *result;
+
+	if (!ioend || need_ioend || type != ioend->io_type) {
+		xfs_ioend_t	*previous = *result;
+		xfs_off_t	offset;
+
+		offset = (xfs_off_t)bh->b_page->index << PAGE_CACHE_SHIFT;
+		offset += p_offset;
+		ioend = xfs_alloc_ioend(inode, type);
+		ioend->io_offset = offset;
+		ioend->io_buffer_head = bh;
+		ioend->io_buffer_tail = bh;
+		if (previous)
+			previous->io_list = ioend;
+		*result = ioend;
+	} else {
+		ioend->io_buffer_tail->b_private = bh;
+		ioend->io_buffer_tail = bh;
+	}
+
+	bh->b_private = NULL;
+	ioend->io_size += bh->b_size;
+}
+
 STATIC void
 xfs_map_at_offset(
 	struct page		*page,
 	struct buffer_head	*bh,
 	unsigned long		offset,
 	int			block_bits,
-	xfs_iomap_t		*iomapp)
+	xfs_iomap_t		*iomapp,
+	xfs_ioend_t		*ioend)
 {
 	xfs_daddr_t		bn;
-	loff_t			delta;
+	xfs_off_t		delta;
 	int			sector_shift;
 
 	ASSERT(!(iomapp->iomap_flags & IOMAP_HOLE));
@@ -276,60 +492,7 @@
 	bh->b_bdev = iomapp->iomap_target->bt_bdev;
 	set_buffer_mapped(bh);
 	clear_buffer_delay(bh);
-}
-
-/*
- * Look for a page at index which is unlocked and contains our
- * unwritten extent flagged buffers at its head.  Returns page
- * locked and with an extra reference count, and length of the
- * unwritten extent component on this page that we can write,
- * in units of filesystem blocks.
- */
-STATIC struct page *
-xfs_probe_unwritten_page(
-	struct address_space	*mapping,
-	pgoff_t			index,
-	xfs_iomap_t		*iomapp,
-	xfs_ioend_t		*ioend,
-	unsigned long		max_offset,
-	unsigned long		*fsbs,
-	unsigned int            bbits)
-{
-	struct page		*page;
-
-	page = find_trylock_page(mapping, index);
-	if (!page)
-		return NULL;
-	if (PageWriteback(page))
-		goto out;
-
-	if (page->mapping && page_has_buffers(page)) {
-		struct buffer_head	*bh, *head;
-		unsigned long		p_offset = 0;
-
-		*fsbs = 0;
-		bh = head = page_buffers(page);
-		do {
-			if (!buffer_unwritten(bh) || !buffer_uptodate(bh))
-				break;
-			if (!xfs_offset_to_map(page, iomapp, p_offset))
-				break;
-			if (p_offset >= max_offset)
-				break;
-			xfs_map_at_offset(page, bh, p_offset, bbits, iomapp);
-			set_buffer_unwritten_io(bh);
-			bh->b_private = ioend;
-			p_offset += bh->b_size;
-			(*fsbs)++;
-		} while ((bh = bh->b_this_page) != head);
-
-		if (p_offset)
-			return page;
-	}
-
-out:
-	unlock_page(page);
-	return NULL;
+	clear_buffer_unwritten(bh);
 }
 
 /*
@@ -372,15 +535,16 @@
 	return ret;
 }
 
-STATIC unsigned int
+STATIC size_t
 xfs_probe_unmapped_cluster(
 	struct inode		*inode,
 	struct page		*startpage,
 	struct buffer_head	*bh,
 	struct buffer_head	*head)
 {
+	size_t			len, total = 0;
 	pgoff_t			tindex, tlast, tloff;
-	unsigned int		pg_offset, len, total = 0;
+	unsigned int		pg_offset;
 	struct address_space	*mapping = inode->i_mapping;
 
 	/* First sum forwards in this page */
@@ -414,14 +578,15 @@
 }
 
 /*
- * Probe for a given page (index) in the inode and test if it is delayed
- * and without unwritten buffers.  Returns page locked and with an extra
- * reference count.
+ * Probe for a given page (index) in the inode and test if it is suitable
+ * for writing as part of an unwritten or delayed allocate extent.
+ * Returns page locked and with an extra reference count if so, else NULL.
  */
 STATIC struct page *
-xfs_probe_delalloc_page(
+xfs_probe_delayed_page(
 	struct inode		*inode,
-	pgoff_t			index)
+	pgoff_t			index,
+	unsigned int		type)
 {
 	struct page		*page;
 
@@ -437,12 +602,12 @@
 
 		bh = head = page_buffers(page);
 		do {
-			if (buffer_unwritten(bh)) {
-				acceptable = 0;
+			if (buffer_unwritten(bh))
+				acceptable = (type == IOMAP_UNWRITTEN);
+			else if (buffer_delay(bh))
+				acceptable = (type == IOMAP_DELAY);
+			else
 				break;
-			} else if (buffer_delay(bh)) {
-				acceptable = 1;
-			}
 		} while ((bh = bh->b_this_page) != head);
 
 		if (acceptable)
@@ -454,161 +619,30 @@
 	return NULL;
 }
 
-STATIC int
-xfs_map_unwritten(
-	struct inode		*inode,
-	struct page		*start_page,
-	struct buffer_head	*head,
-	struct buffer_head	*curr,
-	unsigned long		p_offset,
-	int			block_bits,
-	xfs_iomap_t		*iomapp,
-	struct writeback_control *wbc,
-	int			startio,
-	int			all_bh)
-{
-	struct buffer_head	*bh = curr;
-	xfs_iomap_t		*tmp;
-	xfs_ioend_t		*ioend;
-	loff_t			offset;
-	unsigned long		nblocks = 0;
-
-	offset = start_page->index;
-	offset <<= PAGE_CACHE_SHIFT;
-	offset += p_offset;
-
-	ioend = xfs_alloc_ioend(inode);
-
-	/* First map forwards in the page consecutive buffers
-	 * covering this unwritten extent
-	 */
-	do {
-		if (!buffer_unwritten(bh))
-			break;
-		tmp = xfs_offset_to_map(start_page, iomapp, p_offset);
-		if (!tmp)
-			break;
-		xfs_map_at_offset(start_page, bh, p_offset, block_bits, iomapp);
-		set_buffer_unwritten_io(bh);
-		bh->b_private = ioend;
-		p_offset += bh->b_size;
-		nblocks++;
-	} while ((bh = bh->b_this_page) != head);
-
-	atomic_add(nblocks, &ioend->io_remaining);
-
-	/* If we reached the end of the page, map forwards in any
-	 * following pages which are also covered by this extent.
-	 */
-	if (bh == head) {
-		struct address_space	*mapping = inode->i_mapping;
-		pgoff_t			tindex, tloff, tlast;
-		unsigned long		bs;
-		unsigned int		pg_offset, bbits = inode->i_blkbits;
-		struct page		*page;
-
-		tlast = i_size_read(inode) >> PAGE_CACHE_SHIFT;
-		tloff = (iomapp->iomap_offset + iomapp->iomap_bsize) >> PAGE_CACHE_SHIFT;
-		tloff = min(tlast, tloff);
-		for (tindex = start_page->index + 1; tindex < tloff; tindex++) {
-			page = xfs_probe_unwritten_page(mapping,
-						tindex, iomapp, ioend,
-						PAGE_CACHE_SIZE, &bs, bbits);
-			if (!page)
-				break;
-			nblocks += bs;
-			atomic_add(bs, &ioend->io_remaining);
-			xfs_convert_page(inode, page, iomapp, wbc, ioend,
-							startio, all_bh);
-			/* stop if converting the next page might add
-			 * enough blocks that the corresponding byte
-			 * count won't fit in our ulong page buf length */
-			if (nblocks >= ((ULONG_MAX - PAGE_SIZE) >> block_bits))
-				goto enough;
-		}
-
-		if (tindex == tlast &&
-		    (pg_offset = (i_size_read(inode) & (PAGE_CACHE_SIZE - 1)))) {
-			page = xfs_probe_unwritten_page(mapping,
-							tindex, iomapp, ioend,
-							pg_offset, &bs, bbits);
-			if (page) {
-				nblocks += bs;
-				atomic_add(bs, &ioend->io_remaining);
-				xfs_convert_page(inode, page, iomapp, wbc, ioend,
-							startio, all_bh);
-				if (nblocks >= ((ULONG_MAX - PAGE_SIZE) >> block_bits))
-					goto enough;
-			}
-		}
-	}
-
-enough:
-	ioend->io_size = (xfs_off_t)nblocks << block_bits;
-	ioend->io_offset = offset;
-	xfs_finish_ioend(ioend);
-	return 0;
-}
-
-STATIC void
-xfs_submit_page(
-	struct page		*page,
-	struct writeback_control *wbc,
-	struct buffer_head	*bh_arr[],
-	int			bh_count,
-	int			probed_page,
-	int			clear_dirty)
-{
-	struct buffer_head	*bh;
-	int			i;
-
-	BUG_ON(PageWriteback(page));
-	if (bh_count)
-		set_page_writeback(page);
-	if (clear_dirty)
-		clear_page_dirty(page);
-	unlock_page(page);
-
-	if (bh_count) {
-		for (i = 0; i < bh_count; i++) {
-			bh = bh_arr[i];
-			mark_buffer_async_write(bh);
-			if (buffer_unwritten(bh))
-				set_buffer_unwritten_io(bh);
-			set_buffer_uptodate(bh);
-			clear_buffer_dirty(bh);
-		}
-
-		for (i = 0; i < bh_count; i++)
-			submit_bh(WRITE, bh_arr[i]);
-
-		if (probed_page && clear_dirty)
-			wbc->nr_to_write--;	/* Wrote an "extra" page */
-	}
-}
-
 /*
  * Allocate & map buffers for page given the extent map. Write it out.
  * except for the original page of a writepage, this is called on
  * delalloc/unwritten pages only, for the original page it is possible
  * that the page has no mapping at all.
  */
-STATIC void
+STATIC int
 xfs_convert_page(
 	struct inode		*inode,
 	struct page		*page,
 	xfs_iomap_t		*iomapp,
+	xfs_ioend_t		**ioendp,
 	struct writeback_control *wbc,
 	void			*private,
 	int			startio,
 	int			all_bh)
 {
-	struct buffer_head	*bh_arr[MAX_BUF_PER_PAGE], *bh, *head;
+	struct buffer_head	*bh, *head;
 	xfs_iomap_t		*mp = iomapp, *tmp;
-	unsigned long		offset, end_offset;
-	int			index = 0;
+	unsigned long		p_offset, end_offset;
+	unsigned int		type;
 	int			bbits = inode->i_blkbits;
 	int			len, page_dirty;
+	int			count = 0, done = 0, uptodate = 1;
 
 	end_offset = (i_size_read(inode) & (PAGE_CACHE_SIZE - 1));
 
@@ -621,59 +655,66 @@
 	end_offset = roundup(end_offset, len);
 	page_dirty = end_offset / len;
 
-	offset = 0;
+	p_offset = 0;
 	bh = head = page_buffers(page);
 	do {
-		if (offset >= end_offset)
+		if (p_offset >= end_offset)
 			break;
-		if (!(PageUptodate(page) || buffer_uptodate(bh)))
+		if (!buffer_uptodate(bh))
+			uptodate = 0;
+		if (!(PageUptodate(page) || buffer_uptodate(bh))) {
+			done = 1;
 			continue;
-		if (buffer_mapped(bh) && all_bh &&
-		    !(buffer_unwritten(bh) || buffer_delay(bh))) {
-			if (startio) {
+		}
+
+		if (buffer_unwritten(bh))
+			type = IOMAP_UNWRITTEN;
+		else if (buffer_delay(bh))
+			type = IOMAP_DELAY;
+		else {
+			type = 0;
+			if (!(buffer_mapped(bh) && all_bh && startio)) {
+				done = 1;
+			} else if (startio) {
 				lock_buffer(bh);
-				bh_arr[index++] = bh;
+				xfs_add_to_ioend(inode, bh, p_offset,
+						type, ioendp, done);
+				count++;
 				page_dirty--;
 			}
 			continue;
 		}
-		tmp = xfs_offset_to_map(page, mp, offset);
-		if (!tmp)
+		tmp = xfs_offset_to_map(page, mp, p_offset);
+		if (!tmp) {
+			done = 1;
 			continue;
+		}
 		ASSERT(!(tmp->iomap_flags & IOMAP_HOLE));
 		ASSERT(!(tmp->iomap_flags & IOMAP_DELAY));
 
-		/* If this is a new unwritten extent buffer (i.e. one
-		 * that we haven't passed in private data for, we must
-		 * now map this buffer too.
-		 */
-		if (buffer_unwritten(bh) && !bh->b_end_io) {
-			ASSERT(tmp->iomap_flags & IOMAP_UNWRITTEN);
-			xfs_map_unwritten(inode, page, head, bh, offset,
-					bbits, tmp, wbc, startio, all_bh);
-		} else if (! (buffer_unwritten(bh) && buffer_locked(bh))) {
-			xfs_map_at_offset(page, bh, offset, bbits, tmp);
-			if (buffer_unwritten(bh)) {
-				set_buffer_unwritten_io(bh);
-				bh->b_private = private;
-				ASSERT(private);
-			}
-		}
+		xfs_map_at_offset(page, bh, p_offset, bbits, tmp, *ioendp);
 		if (startio) {
-			bh_arr[index++] = bh;
+			xfs_add_to_ioend(inode, bh, p_offset,
+					type, ioendp, done);
+			count++;
 		} else {
 			set_buffer_dirty(bh);
 			unlock_buffer(bh);
 			mark_buffer_dirty(bh);
 		}
 		page_dirty--;
-	} while (offset += len, (bh = bh->b_this_page) != head);
+	} while (p_offset += len, (bh = bh->b_this_page) != head);
 
-	if (startio && index) {
-		xfs_submit_page(page, wbc, bh_arr, index, 1, !page_dirty);
-	} else {
-		unlock_page(page);
+	if (uptodate && bh == head)
+		SetPageUptodate(page);
+
+	if (startio) {
+		if (count)
+			wbc->nr_to_write--;
+		xfs_start_page_writeback(page, wbc, !page_dirty, count);
 	}
+
+	return done;
 }
 
 /*
@@ -685,19 +726,22 @@
 	struct inode		*inode,
 	pgoff_t			tindex,
 	xfs_iomap_t		*iomapp,
+	xfs_ioend_t		**ioendp,
 	struct writeback_control *wbc,
 	int			startio,
 	int			all_bh,
 	pgoff_t			tlast)
 {
 	struct page		*page;
+	unsigned int		type = (*ioendp)->io_type;
+	int			done;
 
-	for (; tindex <= tlast; tindex++) {
-		page = xfs_probe_delalloc_page(inode, tindex);
+	for (done = 0; tindex <= tlast && !done; tindex++) {
+		page = xfs_probe_delayed_page(inode, tindex, type);
 		if (!page)
 			break;
-		xfs_convert_page(inode, page, iomapp, wbc, NULL,
-				startio, all_bh);
+		done = xfs_convert_page(inode, page, iomapp, ioendp,
+						wbc, NULL, startio, all_bh);
 	}
 }
 
@@ -728,18 +772,21 @@
 	int		startio,
 	int		unmapped) /* also implies page uptodate */
 {
-	struct buffer_head	*bh_arr[MAX_BUF_PER_PAGE], *bh, *head;
+	struct buffer_head	*bh, *head;
 	xfs_iomap_t		*iomp, iomap;
+	xfs_ioend_t		*ioend = NULL, *iohead = NULL;
 	loff_t			offset;
 	unsigned long           p_offset = 0;
+	unsigned int		type;
 	__uint64_t              end_offset;
 	pgoff_t                 end_index, last_index, tlast;
-	int			len, err, i, cnt = 0, uptodate = 1;
-	int			flags;
-	int			page_dirty;
+	int			flags, len, err, done = 1;
+	int			uptodate = 1;
+	int			page_dirty, count = 0, trylock_flag = 0;
 
 	/* wait for other IO threads? */
-	flags = (startio && wbc->sync_mode != WB_SYNC_NONE) ? 0 : BMAPI_TRYLOCK;
+	if (startio && wbc->sync_mode != WB_SYNC_NONE)
+		trylock_flag |= BMAPI_TRYLOCK;
 
 	/* Is this page beyond the end of the file? */
 	offset = i_size_read(inode);
@@ -754,98 +801,98 @@
 		}
 	}
 
-	end_offset = min_t(unsigned long long,
-			(loff_t)(page->index + 1) << PAGE_CACHE_SHIFT, offset);
-	offset = (loff_t)page->index << PAGE_CACHE_SHIFT;
-
 	/*
 	 * page_dirty is initially a count of buffers on the page before
 	 * EOF and is decrememted as we move each into a cleanable state.
-	 */
+	 *
+	 * Derivation:
+	 *
+	 * End offset is the highest offset that this page should represent.
+	 * If we are on the last page, (end_offset & (PAGE_CACHE_SIZE - 1))
+	 * will evaluate non-zero and be less than PAGE_CACHE_SIZE and
+	 * hence give us the correct page_dirty count. On any other page,
+	 * it will be zero and in that case we need page_dirty to be the
+	 * count of buffers on the page.
+ 	 */
+	end_offset = min_t(unsigned long long,
+			(xfs_off_t)(page->index + 1) << PAGE_CACHE_SHIFT, offset);
 	len = 1 << inode->i_blkbits;
-	p_offset = max(p_offset, PAGE_CACHE_SIZE);
-	p_offset = roundup(p_offset, len);
+	p_offset = min_t(unsigned long, end_offset & (PAGE_CACHE_SIZE - 1),
+					PAGE_CACHE_SIZE);
+	p_offset = p_offset ? roundup(p_offset, len) : PAGE_CACHE_SIZE;
 	page_dirty = p_offset / len;
 
 	iomp = NULL;
-	p_offset = 0;
 	bh = head = page_buffers(page);
+	offset = page_offset(page);
+
+	/* TODO: fix up "done" variable and iomap pointer (boolean) */
+	/* TODO: cleanup count and page_dirty */
 
 	do {
 		if (offset >= end_offset)
 			break;
 		if (!buffer_uptodate(bh))
 			uptodate = 0;
-		if (!(PageUptodate(page) || buffer_uptodate(bh)) && !startio)
+		if (!(PageUptodate(page) || buffer_uptodate(bh)) && !startio) {
+			done = 1;
 			continue;
+		}
 
 		if (iomp) {
 			iomp = xfs_offset_to_map(page, &iomap, p_offset);
+			done = (iomp == NULL);
 		}
 
 		/*
 		 * First case, map an unwritten extent and prepare for
 		 * extent state conversion transaction on completion.
-		 */
-		if (buffer_unwritten(bh)) {
-			if (!startio)
-				continue;
-			if (!iomp) {
-				err = xfs_map_blocks(inode, offset, len, &iomap,
-						BMAPI_WRITE|BMAPI_IGNSTATE);
-				if (err) {
-					goto error;
-				}
-				iomp = xfs_offset_to_map(page, &iomap,
-								p_offset);
-			}
-			if (iomp) {
-				if (!bh->b_end_io) {
-					err = xfs_map_unwritten(inode, page,
-							head, bh, p_offset,
-							inode->i_blkbits, iomp,
-							wbc, startio, unmapped);
-					if (err) {
-						goto error;
-					}
-				} else {
-					set_bit(BH_Lock, &bh->b_state);
-				}
-				BUG_ON(!buffer_locked(bh));
-				bh_arr[cnt++] = bh;
-				page_dirty--;
-			}
-		/*
+		 *
 		 * Second case, allocate space for a delalloc buffer.
 		 * We can return EAGAIN here in the release page case.
 		 */
-		} else if (buffer_delay(bh)) {
+		if (buffer_unwritten(bh) || buffer_delay(bh)) {
+			if (buffer_unwritten(bh)) {
+				type = IOMAP_UNWRITTEN;
+				flags = BMAPI_WRITE|BMAPI_IGNSTATE;
+			} else {
+				type = IOMAP_DELAY;
+				flags = BMAPI_ALLOCATE;
+				if (!startio)
+					flags |= trylock_flag;
+			}
+
 			if (!iomp) {
+				done = 1;
 				err = xfs_map_blocks(inode, offset, len, &iomap,
-						BMAPI_ALLOCATE | flags);
-				if (err) {
+						flags);
+				if (err)
 					goto error;
-				}
 				iomp = xfs_offset_to_map(page, &iomap,
 								p_offset);
+				done = (iomp == NULL);
 			}
 			if (iomp) {
 				xfs_map_at_offset(page, bh, p_offset,
-						inode->i_blkbits, iomp);
+						inode->i_blkbits, iomp, ioend);
 				if (startio) {
-					bh_arr[cnt++] = bh;
+					xfs_add_to_ioend(inode, bh, p_offset,
+						type, &ioend, done);
 				} else {
 					set_buffer_dirty(bh);
 					unlock_buffer(bh);
 					mark_buffer_dirty(bh);
 				}
 				page_dirty--;
+				count++;
+			} else {
+				done = 1;
 			}
 		} else if ((buffer_uptodate(bh) || PageUptodate(page)) &&
 			   (unmapped || startio)) {
 
+			type = 0;
 			if (!buffer_mapped(bh)) {
-				int	size;
 
 				/*
 				 * Getting here implies an unmapped buffer
@@ -853,6 +900,8 @@
 				 * need to write the whole page out.
 				 */
 				if (!iomp) {
+					int	size;
+
 					size = xfs_probe_unmapped_cluster(
 							inode, page, bh, head);
 					err = xfs_map_blocks(inode, offset,
@@ -863,52 +912,70 @@
 					}
 					iomp = xfs_offset_to_map(page, &iomap,
 								     p_offset);
+					done = (iomp == NULL);
 				}
 				if (iomp) {
-					xfs_map_at_offset(page,
-							bh, p_offset,
-							inode->i_blkbits, iomp);
+					xfs_map_at_offset(page, bh, p_offset,
+							inode->i_blkbits, iomp,
+							ioend);
 					if (startio) {
-						bh_arr[cnt++] = bh;
+						xfs_add_to_ioend(inode,
+							bh, p_offset, type,
+							&ioend, done);
 					} else {
 						set_buffer_dirty(bh);
 						unlock_buffer(bh);
 						mark_buffer_dirty(bh);
 					}
 					page_dirty--;
+					count++;
+				} else {
+					done = 1;
 				}
 			} else if (startio) {
 				if (buffer_uptodate(bh) &&
 				    !test_and_set_bit(BH_Lock, &bh->b_state)) {
-					bh_arr[cnt++] = bh;
+					ASSERT(buffer_mapped(bh));
+					xfs_add_to_ioend(inode,
+							bh, p_offset, type,
+							&ioend, done);
 					page_dirty--;
+					count++;
+				} else {
+					done = 1;
 				}
+			} else {
+				done = 1;
 			}
 		}
-	} while (offset += len, p_offset += len,
-		((bh = bh->b_this_page) != head));
+
+		if (!iohead)
+			iohead = ioend;
+
+	} while (offset += len, ((bh = bh->b_this_page) != head));
 
 	if (uptodate && bh == head)
 		SetPageUptodate(page);
 
-	if (startio) {
-		xfs_submit_page(page, wbc, bh_arr, cnt, 0, !page_dirty);
-	}
+	if (startio)
+		xfs_start_page_writeback(page, wbc, 1, count);
 
-	if (iomp) {
+	if (ioend && iomp && !done) {
 		offset = (iomp->iomap_offset + iomp->iomap_bsize - 1) >>
 					PAGE_CACHE_SHIFT;
 		tlast = min_t(pgoff_t, offset, last_index);
-		xfs_cluster_write(inode, page->index + 1, iomp, wbc,
-					startio, unmapped, tlast);
+		xfs_cluster_write(inode, page->index + 1, iomp, &ioend,
+					wbc, startio, unmapped, tlast);
 	}
 
+	if (iohead)
+		xfs_submit_ioend(iohead);
+
 	return page_dirty;
 
 error:
-	for (i = 0; i < cnt; i++) {
-		unlock_buffer(bh_arr[i]);
-	}
+	if (iohead)
+		xfs_cancel_ioend(iohead);
 
 	/*
 	 * If it's delalloc and we have nowhere to put it,
@@ -916,9 +983,8 @@
 	 * us to try again.
 	 */
 	if (err != -EAGAIN) {
-		if (!unmapped) {
+		if (!unmapped)
 			block_invalidatepage(page, 0);
-		}
 		ClearPageUptodate(page);
 	}
 	return err;
@@ -1094,7 +1160,7 @@
 	if (error)
 		return -error;
 
-	iocb->private = xfs_alloc_ioend(inode);
+	iocb->private = xfs_alloc_ioend(inode, IOMAP_UNWRITTEN);
 
 	ret = blockdev_direct_IO_own_locking(rw, iocb, inode,
 		iomap.iomap_target->bt_bdev,