Merge tag 'nfs-for-4.19-1' of git://git.linux-nfs.org/projects/anna/linux-nfs

Pull NFS client updates from Anna Schumaker:
 "These patches include adding async support for the v4.2 COPY
  operation. I think Bruce is planning to send the server patches for
  the next release, but I figured we could get the client side out of
  the way now since it's been in my tree for a while. This shouldn't
  cause any problems, since the server will still respond with
  synchronous copies even if the client requests async.

  Features:
   - Add support for asynchronous server-side COPY operations

  Stable bufixes:
   - Fix an off-by-one in bl_map_stripe() (v3.17+)
   - NFSv4 client live hangs after live data migration recovery (v4.9+)
   - xprtrdma: Fix disconnect regression (v4.18+)
   - Fix locking in pnfs_generic_recover_commit_reqs (v4.14+)
   - Fix a sleep in atomic context in nfs4_callback_sequence() (v4.9+)

  Other bugfixes and cleanups:
   - Optimizations and fixes involving NFS v4.1 / pNFS layout handling
   - Optimize lseek(fd, SEEK_CUR, 0) on directories to avoid locking
   - Immediately reschedule writeback when the server replies with an
     error
   - Fix excessive attribute revalidation in nfs_execute_ok()
   - Add error checking to nfs_idmap_prepare_message()
   - Use new vm_fault_t return type
   - Return a delegation when reclaiming one that the server has
     recalled
   - Referrals should inherit proto setting from parents
   - Make rpc_auth_create_args a const
   - Improvements to rpc_iostats tracking
   - Fix a potential reference leak when there is an error processing a
     callback
   - Fix rmdir / mkdir / rename nlink accounting
   - Fix updating inode change attribute
   - Fix error handling in nfsn4_sp4_select_mode()
   - Use an appropriate work queue for direct-write completion
   - Don't busy wait if NFSv4 session draining is interrupted"

* tag 'nfs-for-4.19-1' of git://git.linux-nfs.org/projects/anna/linux-nfs: (54 commits)
  pNFS: Remove unwanted optimisation of layoutget
  pNFS/flexfiles: ff_layout_pg_init_read should exit on error
  pNFS: Treat RECALLCONFLICT like DELAY...
  pNFS: When updating the stateid in layoutreturn, also update the recall range
  NFSv4: Fix a sleep in atomic context in nfs4_callback_sequence()
  NFSv4: Fix locking in pnfs_generic_recover_commit_reqs
  NFSv4: Fix a typo in nfs4_init_channel_attrs()
  NFSv4: Don't busy wait if NFSv4 session draining is interrupted
  NFS recover from destination server reboot for copies
  NFS add a simple sync nfs4_proc_commit after async COPY
  NFS handle COPY ERR_OFFLOAD_NO_REQS
  NFS send OFFLOAD_CANCEL when COPY killed
  NFS export nfs4_async_handle_error
  NFS handle COPY reply CB_OFFLOAD call race
  NFS add support for asynchronous COPY
  NFS COPY xdr handle async reply
  NFS OFFLOAD_CANCEL xdr
  NFS CB_OFFLOAD xdr
  NFS: Use an appropriate work queue for direct-write completion
  NFSv4: Fix error handling in nfs4_sp4_select_mode()
  ...
diff --git a/fs/nfs/blocklayout/blocklayout.c b/fs/nfs/blocklayout/blocklayout.c
index 7cb5c38..06cb0c1 100644
--- a/fs/nfs/blocklayout/blocklayout.c
+++ b/fs/nfs/blocklayout/blocklayout.c
@@ -753,6 +753,7 @@ bl_alloc_lseg(struct pnfs_layout_hdr *lo, struct nfs4_layoutget_res *lgr,
 	case -ENODEV:
 		/* Our extent block devices are unavailable */
 		set_bit(NFS_LSEG_UNAVAILABLE, &lseg->pls_flags);
+		/* Fall through */
 	case 0:
 		return lseg;
 	default:
diff --git a/fs/nfs/blocklayout/dev.c b/fs/nfs/blocklayout/dev.c
index a7efd83..dec5880 100644
--- a/fs/nfs/blocklayout/dev.c
+++ b/fs/nfs/blocklayout/dev.c
@@ -204,7 +204,7 @@ static bool bl_map_stripe(struct pnfs_block_dev *dev, u64 offset,
 	chunk = div_u64(offset, dev->chunk_size);
 	div_u64_rem(chunk, dev->nr_children, &chunk_idx);
 
-	if (chunk_idx > dev->nr_children) {
+	if (chunk_idx >= dev->nr_children) {
 		dprintk("%s: invalid chunk idx %d (%lld/%lld)\n",
 			__func__, chunk_idx, offset, dev->chunk_size);
 		/* error, should not happen */
diff --git a/fs/nfs/callback.h b/fs/nfs/callback.h
index a20a0bc..8f34daf 100644
--- a/fs/nfs/callback.h
+++ b/fs/nfs/callback.h
@@ -184,6 +184,18 @@ struct cb_notify_lock_args {
 extern __be32 nfs4_callback_notify_lock(void *argp, void *resp,
 					 struct cb_process_state *cps);
 #endif /* CONFIG_NFS_V4_1 */
+#ifdef CONFIG_NFS_V4_2
+struct cb_offloadargs {
+	struct nfs_fh		coa_fh;
+	nfs4_stateid		coa_stateid;
+	uint32_t		error;
+	uint64_t		wr_count;
+	struct nfs_writeverf	wr_writeverf;
+};
+
+extern __be32 nfs4_callback_offload(void *args, void *dummy,
+				    struct cb_process_state *cps);
+#endif /* CONFIG_NFS_V4_2 */
 extern int check_gss_callback_principal(struct nfs_client *, struct svc_rqst *);
 extern __be32 nfs4_callback_getattr(void *argp, void *resp,
 				    struct cb_process_state *cps);
diff --git a/fs/nfs/callback_proc.c b/fs/nfs/callback_proc.c
index 64c214f..fa515d5 100644
--- a/fs/nfs/callback_proc.c
+++ b/fs/nfs/callback_proc.c
@@ -215,9 +215,9 @@ static u32 pnfs_check_callback_stateid(struct pnfs_layout_hdr *lo,
 {
 	u32 oldseq, newseq;
 
-	/* Is the stateid still not initialised? */
+	/* Is the stateid not initialised? */
 	if (!pnfs_layout_is_valid(lo))
-		return NFS4ERR_DELAY;
+		return NFS4ERR_NOMATCHING_LAYOUT;
 
 	/* Mismatched stateid? */
 	if (!nfs4_stateid_match_other(&lo->plh_stateid, new))
@@ -273,7 +273,6 @@ static u32 initiate_file_draining(struct nfs_client *clp,
 	rv = pnfs_check_callback_stateid(lo, &args->cbl_stateid);
 	if (rv != NFS_OK)
 		goto unlock;
-	pnfs_set_layout_stateid(lo, &args->cbl_stateid, true);
 
 	/*
 	 * Enforce RFC5661 Section 12.5.5.2.1.5 (Bulk Recall and Return)
@@ -283,19 +282,23 @@ static u32 initiate_file_draining(struct nfs_client *clp,
 		goto unlock;
 	}
 
-	if (pnfs_mark_matching_lsegs_return(lo, &free_me_list,
+	pnfs_set_layout_stateid(lo, &args->cbl_stateid, true);
+	switch (pnfs_mark_matching_lsegs_return(lo, &free_me_list,
 				&args->cbl_range,
 				be32_to_cpu(args->cbl_stateid.seqid))) {
+	case 0:
+	case -EBUSY:
+		/* There are layout segments that need to be returned */
 		rv = NFS4_OK;
-		goto unlock;
-	}
+		break;
+	case -ENOENT:
+		/* Embrace your forgetfulness! */
+		rv = NFS4ERR_NOMATCHING_LAYOUT;
 
-	/* Embrace your forgetfulness! */
-	rv = NFS4ERR_NOMATCHING_LAYOUT;
-
-	if (NFS_SERVER(ino)->pnfs_curr_ld->return_range) {
-		NFS_SERVER(ino)->pnfs_curr_ld->return_range(lo,
-			&args->cbl_range);
+		if (NFS_SERVER(ino)->pnfs_curr_ld->return_range) {
+			NFS_SERVER(ino)->pnfs_curr_ld->return_range(lo,
+				&args->cbl_range);
+		}
 	}
 unlock:
 	spin_unlock(&ino->i_lock);
@@ -328,8 +331,6 @@ static u32 initiate_bulk_draining(struct nfs_client *clp,
 static u32 do_callback_layoutrecall(struct nfs_client *clp,
 				    struct cb_layoutrecallargs *args)
 {
-	write_seqcount_begin(&clp->cl_callback_count);
-	write_seqcount_end(&clp->cl_callback_count);
 	if (args->cbl_recall_type == RETURN_FILE)
 		return initiate_file_draining(clp, args);
 	return initiate_bulk_draining(clp, args);
@@ -441,11 +442,14 @@ validate_seqid(const struct nfs4_slot_table *tbl, const struct nfs4_slot *slot,
  * a match.  If the slot is in use and the sequence numbers match, the
  * client is still waiting for a response to the original request.
  */
-static bool referring_call_exists(struct nfs_client *clp,
+static int referring_call_exists(struct nfs_client *clp,
 				  uint32_t nrclists,
-				  struct referring_call_list *rclists)
+				  struct referring_call_list *rclists,
+				  spinlock_t *lock)
+	__releases(lock)
+	__acquires(lock)
 {
-	bool status = false;
+	int status = 0;
 	int i, j;
 	struct nfs4_session *session;
 	struct nfs4_slot_table *tbl;
@@ -468,8 +472,10 @@ static bool referring_call_exists(struct nfs_client *clp,
 
 		for (j = 0; j < rclist->rcl_nrefcalls; j++) {
 			ref = &rclist->rcl_refcalls[j];
+			spin_unlock(lock);
 			status = nfs4_slot_wait_on_seqid(tbl, ref->rc_slotid,
 					ref->rc_sequenceid, HZ >> 1) < 0;
+			spin_lock(lock);
 			if (status)
 				goto out;
 		}
@@ -546,7 +552,8 @@ __be32 nfs4_callback_sequence(void *argp, void *resp,
 	 * related callback was received before the response to the original
 	 * call.
 	 */
-	if (referring_call_exists(clp, args->csa_nrclists, args->csa_rclists)) {
+	if (referring_call_exists(clp, args->csa_nrclists, args->csa_rclists,
+				&tbl->slot_tbl_lock) < 0) {
 		status = htonl(NFS4ERR_DELAY);
 		goto out_unlock;
 	}
@@ -660,3 +667,57 @@ __be32 nfs4_callback_notify_lock(void *argp, void *resp,
 	return htonl(NFS4_OK);
 }
 #endif /* CONFIG_NFS_V4_1 */
+#ifdef CONFIG_NFS_V4_2
+static void nfs4_copy_cb_args(struct nfs4_copy_state *cp_state,
+				struct cb_offloadargs *args)
+{
+	cp_state->count = args->wr_count;
+	cp_state->error = args->error;
+	if (!args->error) {
+		cp_state->verf.committed = args->wr_writeverf.committed;
+		memcpy(&cp_state->verf.verifier.data[0],
+			&args->wr_writeverf.verifier.data[0],
+			NFS4_VERIFIER_SIZE);
+	}
+}
+
+__be32 nfs4_callback_offload(void *data, void *dummy,
+			     struct cb_process_state *cps)
+{
+	struct cb_offloadargs *args = data;
+	struct nfs_server *server;
+	struct nfs4_copy_state *copy;
+	bool found = false;
+
+	spin_lock(&cps->clp->cl_lock);
+	rcu_read_lock();
+	list_for_each_entry_rcu(server, &cps->clp->cl_superblocks,
+				client_link) {
+		list_for_each_entry(copy, &server->ss_copies, copies) {
+			if (memcmp(args->coa_stateid.other,
+					copy->stateid.other,
+					sizeof(args->coa_stateid.other)))
+				continue;
+			nfs4_copy_cb_args(copy, args);
+			complete(&copy->completion);
+			found = true;
+			goto out;
+		}
+	}
+out:
+	rcu_read_unlock();
+	if (!found) {
+		copy = kzalloc(sizeof(struct nfs4_copy_state), GFP_NOFS);
+		if (!copy) {
+			spin_unlock(&cps->clp->cl_lock);
+			return htonl(NFS4ERR_SERVERFAULT);
+		}
+		memcpy(&copy->stateid, &args->coa_stateid, NFS4_STATEID_SIZE);
+		nfs4_copy_cb_args(copy, args);
+		list_add_tail(&copy->copies, &cps->clp->pending_cb_stateids);
+	}
+	spin_unlock(&cps->clp->cl_lock);
+
+	return 0;
+}
+#endif /* CONFIG_NFS_V4_2 */
diff --git a/fs/nfs/callback_xdr.c b/fs/nfs/callback_xdr.c
index a813979..a87a562 100644
--- a/fs/nfs/callback_xdr.c
+++ b/fs/nfs/callback_xdr.c
@@ -38,6 +38,9 @@
 #define CB_OP_RECALLSLOT_RES_MAXSZ	(CB_OP_HDR_RES_MAXSZ)
 #define CB_OP_NOTIFY_LOCK_RES_MAXSZ	(CB_OP_HDR_RES_MAXSZ)
 #endif /* CONFIG_NFS_V4_1 */
+#ifdef CONFIG_NFS_V4_2
+#define CB_OP_OFFLOAD_RES_MAXSZ		(CB_OP_HDR_RES_MAXSZ)
+#endif /* CONFIG_NFS_V4_2 */
 
 #define NFSDBG_FACILITY NFSDBG_CALLBACK
 
@@ -527,7 +530,72 @@ static __be32 decode_notify_lock_args(struct svc_rqst *rqstp,
 }
 
 #endif /* CONFIG_NFS_V4_1 */
+#ifdef CONFIG_NFS_V4_2
+static __be32 decode_write_response(struct xdr_stream *xdr,
+					struct cb_offloadargs *args)
+{
+	__be32 *p;
 
+	/* skip the always zero field */
+	p = read_buf(xdr, 4);
+	if (unlikely(!p))
+		goto out;
+	p++;
+
+	/* decode count, stable_how, verifier */
+	p = xdr_inline_decode(xdr, 8 + 4);
+	if (unlikely(!p))
+		goto out;
+	p = xdr_decode_hyper(p, &args->wr_count);
+	args->wr_writeverf.committed = be32_to_cpup(p);
+	p = xdr_inline_decode(xdr, NFS4_VERIFIER_SIZE);
+	if (likely(p)) {
+		memcpy(&args->wr_writeverf.verifier.data[0], p,
+			NFS4_VERIFIER_SIZE);
+		return 0;
+	}
+out:
+	return htonl(NFS4ERR_RESOURCE);
+}
+
+static __be32 decode_offload_args(struct svc_rqst *rqstp,
+					struct xdr_stream *xdr,
+					void *data)
+{
+	struct cb_offloadargs *args = data;
+	__be32 *p;
+	__be32 status;
+
+	/* decode fh */
+	status = decode_fh(xdr, &args->coa_fh);
+	if (unlikely(status != 0))
+		return status;
+
+	/* decode stateid */
+	status = decode_stateid(xdr, &args->coa_stateid);
+	if (unlikely(status != 0))
+		return status;
+
+	/* decode status */
+	p = read_buf(xdr, 4);
+	if (unlikely(!p))
+		goto out;
+	args->error = ntohl(*p++);
+	if (!args->error) {
+		status = decode_write_response(xdr, args);
+		if (unlikely(status != 0))
+			return status;
+	} else {
+		p = xdr_inline_decode(xdr, 8);
+		if (unlikely(!p))
+			goto out;
+		p = xdr_decode_hyper(p, &args->wr_count);
+	}
+	return 0;
+out:
+	return htonl(NFS4ERR_RESOURCE);
+}
+#endif /* CONFIG_NFS_V4_2 */
 static __be32 encode_string(struct xdr_stream *xdr, unsigned int len, const char *str)
 {
 	if (unlikely(xdr_stream_encode_opaque(xdr, str, len) < 0))
@@ -773,7 +841,10 @@ preprocess_nfs42_op(int nop, unsigned int op_nr, struct callback_op **op)
 	if (status != htonl(NFS4ERR_OP_ILLEGAL))
 		return status;
 
-	if (op_nr == OP_CB_OFFLOAD)
+	if (op_nr == OP_CB_OFFLOAD) {
+		*op = &callback_ops[op_nr];
+		return htonl(NFS_OK);
+	} else
 		return htonl(NFS4ERR_NOTSUPP);
 	return htonl(NFS4ERR_OP_ILLEGAL);
 }
@@ -883,16 +954,21 @@ static __be32 nfs4_callback_compound(struct svc_rqst *rqstp)
 
 	if (hdr_arg.minorversion == 0) {
 		cps.clp = nfs4_find_client_ident(SVC_NET(rqstp), hdr_arg.cb_ident);
-		if (!cps.clp || !check_gss_callback_principal(cps.clp, rqstp))
+		if (!cps.clp || !check_gss_callback_principal(cps.clp, rqstp)) {
+			if (cps.clp)
+				nfs_put_client(cps.clp);
 			goto out_invalidcred;
+		}
 	}
 
 	cps.minorversion = hdr_arg.minorversion;
 	hdr_res.taglen = hdr_arg.taglen;
 	hdr_res.tag = hdr_arg.tag;
-	if (encode_compound_hdr_res(&xdr_out, &hdr_res) != 0)
+	if (encode_compound_hdr_res(&xdr_out, &hdr_res) != 0) {
+		if (cps.clp)
+			nfs_put_client(cps.clp);
 		return rpc_system_err;
-
+	}
 	while (status == 0 && nops != hdr_arg.nops) {
 		status = process_op(nops, rqstp, &xdr_in,
 				    rqstp->rq_argp, &xdr_out, rqstp->rq_resp,
@@ -969,6 +1045,13 @@ static struct callback_op callback_ops[] = {
 		.res_maxsize = CB_OP_NOTIFY_LOCK_RES_MAXSZ,
 	},
 #endif /* CONFIG_NFS_V4_1 */
+#ifdef CONFIG_NFS_V4_2
+	[OP_CB_OFFLOAD] = {
+		.process_op = nfs4_callback_offload,
+		.decode_args = decode_offload_args,
+		.res_maxsize = CB_OP_OFFLOAD_RES_MAXSZ,
+	},
+#endif /* CONFIG_NFS_V4_2 */
 };
 
 /*
diff --git a/fs/nfs/client.c b/fs/nfs/client.c
index 377a616..96d5f81 100644
--- a/fs/nfs/client.c
+++ b/fs/nfs/client.c
@@ -886,6 +886,7 @@ struct nfs_server *nfs_alloc_server(void)
 	INIT_LIST_HEAD(&server->delegations);
 	INIT_LIST_HEAD(&server->layouts);
 	INIT_LIST_HEAD(&server->state_owners_lru);
+	INIT_LIST_HEAD(&server->ss_copies);
 
 	atomic_set(&server->active, 0);
 
diff --git a/fs/nfs/dir.c b/fs/nfs/dir.c
index d7f158c..8bfaa65 100644
--- a/fs/nfs/dir.c
+++ b/fs/nfs/dir.c
@@ -904,23 +904,29 @@ static loff_t nfs_llseek_dir(struct file *filp, loff_t offset, int whence)
 	dfprintk(FILE, "NFS: llseek dir(%pD2, %lld, %d)\n",
 			filp, offset, whence);
 
-	inode_lock(inode);
 	switch (whence) {
-		case 1:
-			offset += filp->f_pos;
-		case 0:
-			if (offset >= 0)
-				break;
-		default:
-			offset = -EINVAL;
-			goto out;
+	default:
+		return -EINVAL;
+	case SEEK_SET:
+		if (offset < 0)
+			return -EINVAL;
+		inode_lock(inode);
+		break;
+	case SEEK_CUR:
+		if (offset == 0)
+			return filp->f_pos;
+		inode_lock(inode);
+		offset += filp->f_pos;
+		if (offset < 0) {
+			inode_unlock(inode);
+			return -EINVAL;
+		}
 	}
 	if (offset != filp->f_pos) {
 		filp->f_pos = offset;
 		dir_ctx->dir_cookie = 0;
 		dir_ctx->duped = 0;
 	}
-out:
 	inode_unlock(inode);
 	return offset;
 }
@@ -1032,7 +1038,7 @@ int nfs_lookup_verify_inode(struct inode *inode, unsigned int flags)
 	if (flags & LOOKUP_REVAL)
 		goto out_force;
 out:
-	return (inode->i_nlink == 0) ? -ENOENT : 0;
+	return (inode->i_nlink == 0) ? -ESTALE : 0;
 out_force:
 	if (flags & LOOKUP_RCU)
 		return -ECHILD;
@@ -2499,7 +2505,9 @@ static int nfs_execute_ok(struct inode *inode, int mask)
 	struct nfs_server *server = NFS_SERVER(inode);
 	int ret = 0;
 
-	if (nfs_check_cache_invalid(inode, NFS_INO_INVALID_ACCESS)) {
+	if (S_ISDIR(inode->i_mode))
+		return 0;
+	if (nfs_check_cache_invalid(inode, NFS_INO_INVALID_OTHER)) {
 		if (mask & MAY_NOT_BLOCK)
 			return -ECHILD;
 		ret = __nfs_revalidate_inode(server, inode);
diff --git a/fs/nfs/direct.c b/fs/nfs/direct.c
index 621c517..aa12c30 100644
--- a/fs/nfs/direct.c
+++ b/fs/nfs/direct.c
@@ -758,7 +758,7 @@ static void nfs_direct_write_schedule_work(struct work_struct *work)
 
 static void nfs_direct_write_complete(struct nfs_direct_req *dreq)
 {
-	schedule_work(&dreq->work); /* Calls nfs_direct_write_schedule_work */
+	queue_work(nfsiod_workqueue, &dreq->work); /* Calls nfs_direct_write_schedule_work */
 }
 
 static void nfs_direct_write_completion(struct nfs_pgio_header *hdr)
diff --git a/fs/nfs/file.c b/fs/nfs/file.c
index 81cca49..29553fd 100644
--- a/fs/nfs/file.c
+++ b/fs/nfs/file.c
@@ -532,13 +532,13 @@ const struct address_space_operations nfs_file_aops = {
  * writable, implying that someone is about to modify the page through a
  * shared-writable mapping
  */
-static int nfs_vm_page_mkwrite(struct vm_fault *vmf)
+static vm_fault_t nfs_vm_page_mkwrite(struct vm_fault *vmf)
 {
 	struct page *page = vmf->page;
 	struct file *filp = vmf->vma->vm_file;
 	struct inode *inode = file_inode(filp);
 	unsigned pagelen;
-	int ret = VM_FAULT_NOPAGE;
+	vm_fault_t ret = VM_FAULT_NOPAGE;
 	struct address_space *mapping;
 
 	dfprintk(PAGECACHE, "NFS: vm_page_mkwrite(%pD2(%lu), offset %lld)\n",
diff --git a/fs/nfs/flexfilelayout/flexfilelayout.c b/fs/nfs/flexfilelayout/flexfilelayout.c
index 8f00379..cae4333 100644
--- a/fs/nfs/flexfilelayout/flexfilelayout.c
+++ b/fs/nfs/flexfilelayout/flexfilelayout.c
@@ -812,7 +812,6 @@ ff_layout_pg_get_read(struct nfs_pageio_descriptor *pgio,
 		      struct nfs_page *req,
 		      bool strict_iomode)
 {
-retry_strict:
 	pnfs_put_lseg(pgio->pg_lseg);
 	pgio->pg_lseg = pnfs_update_layout(pgio->pg_inode,
 					   req->wb_context,
@@ -825,16 +824,6 @@ ff_layout_pg_get_read(struct nfs_pageio_descriptor *pgio,
 		pgio->pg_error = PTR_ERR(pgio->pg_lseg);
 		pgio->pg_lseg = NULL;
 	}
-
-	/* If we don't have checking, do get a IOMODE_RW
-	 * segment, and the server wants to avoid READs
-	 * there, then retry!
-	 */
-	if (pgio->pg_lseg && !strict_iomode &&
-	    ff_layout_avoid_read_on_rw(pgio->pg_lseg)) {
-		strict_iomode = true;
-		goto retry_strict;
-	}
 }
 
 static void
@@ -849,14 +838,16 @@ ff_layout_pg_init_read(struct nfs_pageio_descriptor *pgio,
 retry:
 	pnfs_generic_pg_check_layout(pgio);
 	/* Use full layout for now */
-	if (!pgio->pg_lseg)
+	if (!pgio->pg_lseg) {
 		ff_layout_pg_get_read(pgio, req, false);
-	else if (ff_layout_avoid_read_on_rw(pgio->pg_lseg))
+		if (!pgio->pg_lseg)
+			goto out_nolseg;
+	}
+	if (ff_layout_avoid_read_on_rw(pgio->pg_lseg)) {
 		ff_layout_pg_get_read(pgio, req, true);
-
-	/* If no lseg, fall back to read through mds */
-	if (pgio->pg_lseg == NULL)
-		goto out_mds;
+		if (!pgio->pg_lseg)
+			goto out_nolseg;
+	}
 
 	ds = ff_layout_choose_best_ds_for_read(pgio->pg_lseg, 0, &ds_idx);
 	if (!ds) {
@@ -878,6 +869,9 @@ ff_layout_pg_init_read(struct nfs_pageio_descriptor *pgio,
 	pgm->pg_bsize = mirror->mirror_ds->ds_versions[0].rsize;
 
 	return;
+out_nolseg:
+	if (pgio->pg_error < 0)
+		return;
 out_mds:
 	pnfs_put_lseg(pgio->pg_lseg);
 	pgio->pg_lseg = NULL;
@@ -1323,6 +1317,7 @@ static void ff_layout_read_record_layoutstats_done(struct rpc_task *task,
 			FF_LAYOUT_COMP(hdr->lseg, hdr->pgio_mirror_idx),
 			hdr->args.count,
 			hdr->res.count);
+	set_bit(NFS_LSEG_LAYOUTRETURN, &hdr->lseg->pls_flags);
 }
 
 static int ff_layout_read_prepare_common(struct rpc_task *task,
@@ -1507,6 +1502,7 @@ static void ff_layout_write_record_layoutstats_done(struct rpc_task *task,
 			FF_LAYOUT_COMP(hdr->lseg, hdr->pgio_mirror_idx),
 			hdr->args.count, hdr->res.count,
 			hdr->res.verf->committed);
+	set_bit(NFS_LSEG_LAYOUTRETURN, &hdr->lseg->pls_flags);
 }
 
 static int ff_layout_write_prepare_common(struct rpc_task *task,
@@ -1615,6 +1611,7 @@ static void ff_layout_commit_record_layoutstats_done(struct rpc_task *task,
 	nfs4_ff_layout_stat_io_end_write(task,
 			FF_LAYOUT_COMP(cdata->lseg, cdata->ds_commit_index),
 			count, count, NFS_FILE_SYNC);
+	set_bit(NFS_LSEG_LAYOUTRETURN, &cdata->lseg->pls_flags);
 }
 
 static void ff_layout_commit_prepare_common(struct rpc_task *task,
diff --git a/fs/nfs/nfs3acl.c b/fs/nfs/nfs3acl.c
index 7173a4e..9fce185 100644
--- a/fs/nfs/nfs3acl.c
+++ b/fs/nfs/nfs3acl.c
@@ -108,6 +108,7 @@ struct posix_acl *nfs3_get_acl(struct inode *inode, int type)
 		case -EPROTONOSUPPORT:
 			dprintk("NFS_V3_ACL extension not supported; disabling\n");
 			server->caps &= ~NFS_CAP_ACLS;
+			/* fall through */
 		case -ENOTSUPP:
 			status = -EOPNOTSUPP;
 		default:
@@ -229,6 +230,7 @@ static int __nfs3_proc_setacls(struct inode *inode, struct posix_acl *acl,
 			dprintk("NFS_V3_ACL SETACL RPC not supported"
 					"(will not retry)\n");
 			server->caps &= ~NFS_CAP_ACLS;
+			/* fall through */
 		case -ENOTSUPP:
 			status = -EOPNOTSUPP;
 	}
diff --git a/fs/nfs/nfs42proc.c b/fs/nfs/nfs42proc.c
index 5f59b6f..ac5b784 100644
--- a/fs/nfs/nfs42proc.c
+++ b/fs/nfs/nfs42proc.c
@@ -17,6 +17,7 @@
 #include "internal.h"
 
 #define NFSDBG_FACILITY NFSDBG_PROC
+static int nfs42_do_offload_cancel_async(struct file *dst, nfs4_stateid *std);
 
 static int _nfs42_proc_fallocate(struct rpc_message *msg, struct file *filep,
 		struct nfs_lock_context *lock, loff_t offset, loff_t len)
@@ -130,6 +131,91 @@ int nfs42_proc_deallocate(struct file *filep, loff_t offset, loff_t len)
 	return err;
 }
 
+static int handle_async_copy(struct nfs42_copy_res *res,
+			     struct nfs_server *server,
+			     struct file *src,
+			     struct file *dst,
+			     nfs4_stateid *src_stateid)
+{
+	struct nfs4_copy_state *copy;
+	int status = NFS4_OK;
+	bool found_pending = false;
+	struct nfs_open_context *ctx = nfs_file_open_context(dst);
+
+	spin_lock(&server->nfs_client->cl_lock);
+	list_for_each_entry(copy, &server->nfs_client->pending_cb_stateids,
+				copies) {
+		if (memcmp(&res->write_res.stateid, &copy->stateid,
+				NFS4_STATEID_SIZE))
+			continue;
+		found_pending = true;
+		list_del(&copy->copies);
+		break;
+	}
+	if (found_pending) {
+		spin_unlock(&server->nfs_client->cl_lock);
+		goto out;
+	}
+
+	copy = kzalloc(sizeof(struct nfs4_copy_state), GFP_NOFS);
+	if (!copy) {
+		spin_unlock(&server->nfs_client->cl_lock);
+		return -ENOMEM;
+	}
+	memcpy(&copy->stateid, &res->write_res.stateid, NFS4_STATEID_SIZE);
+	init_completion(&copy->completion);
+	copy->parent_state = ctx->state;
+
+	list_add_tail(&copy->copies, &server->ss_copies);
+	spin_unlock(&server->nfs_client->cl_lock);
+
+	status = wait_for_completion_interruptible(&copy->completion);
+	spin_lock(&server->nfs_client->cl_lock);
+	list_del_init(&copy->copies);
+	spin_unlock(&server->nfs_client->cl_lock);
+	if (status == -ERESTARTSYS) {
+		goto out_cancel;
+	} else if (copy->flags) {
+		status = -EAGAIN;
+		goto out_cancel;
+	}
+out:
+	res->write_res.count = copy->count;
+	memcpy(&res->write_res.verifier, &copy->verf, sizeof(copy->verf));
+	status = -copy->error;
+
+	kfree(copy);
+	return status;
+out_cancel:
+	nfs42_do_offload_cancel_async(dst, &copy->stateid);
+	kfree(copy);
+	return status;
+}
+
+static int process_copy_commit(struct file *dst, loff_t pos_dst,
+			       struct nfs42_copy_res *res)
+{
+	struct nfs_commitres cres;
+	int status = -ENOMEM;
+
+	cres.verf = kzalloc(sizeof(struct nfs_writeverf), GFP_NOFS);
+	if (!cres.verf)
+		goto out;
+
+	status = nfs4_proc_commit(dst, pos_dst, res->write_res.count, &cres);
+	if (status)
+		goto out_free;
+	if (nfs_write_verifier_cmp(&res->write_res.verifier.verifier,
+				    &cres.verf->verifier)) {
+		dprintk("commit verf differs from copy verf\n");
+		status = -EAGAIN;
+	}
+out_free:
+	kfree(cres.verf);
+out:
+	return status;
+}
+
 static ssize_t _nfs42_proc_copy(struct file *src,
 				struct nfs_lock_context *src_lock,
 				struct file *dst,
@@ -168,9 +254,16 @@ static ssize_t _nfs42_proc_copy(struct file *src,
 	if (status)
 		return status;
 
-	res->commit_res.verf = kzalloc(sizeof(struct nfs_writeverf), GFP_NOFS);
-	if (!res->commit_res.verf)
-		return -ENOMEM;
+	res->commit_res.verf = NULL;
+	if (args->sync) {
+		res->commit_res.verf =
+			kzalloc(sizeof(struct nfs_writeverf), GFP_NOFS);
+		if (!res->commit_res.verf)
+			return -ENOMEM;
+	}
+	set_bit(NFS_CLNT_DST_SSC_COPY_STATE,
+		&dst_lock->open_context->state->flags);
+
 	status = nfs4_call_sync(server->client, server, &msg,
 				&args->seq_args, &res->seq_res, 0);
 	if (status == -ENOTSUPP)
@@ -178,18 +271,34 @@ static ssize_t _nfs42_proc_copy(struct file *src,
 	if (status)
 		goto out;
 
-	if (nfs_write_verifier_cmp(&res->write_res.verifier.verifier,
+	if (args->sync &&
+		nfs_write_verifier_cmp(&res->write_res.verifier.verifier,
 				    &res->commit_res.verf->verifier)) {
 		status = -EAGAIN;
 		goto out;
 	}
 
+	if (!res->synchronous) {
+		status = handle_async_copy(res, server, src, dst,
+				&args->src_stateid);
+		if (status)
+			return status;
+	}
+
+	if ((!res->synchronous || !args->sync) &&
+			res->write_res.verifier.committed != NFS_FILE_SYNC) {
+		status = process_copy_commit(dst, pos_dst, res);
+		if (status)
+			return status;
+	}
+
 	truncate_pagecache_range(dst_inode, pos_dst,
 				 pos_dst + res->write_res.count);
 
 	status = res->write_res.count;
 out:
-	kfree(res->commit_res.verf);
+	if (args->sync)
+		kfree(res->commit_res.verf);
 	return status;
 }
 
@@ -206,6 +315,7 @@ ssize_t nfs42_proc_copy(struct file *src, loff_t pos_src,
 		.dst_fh		= NFS_FH(file_inode(dst)),
 		.dst_pos	= pos_dst,
 		.count		= count,
+		.sync		= false,
 	};
 	struct nfs42_copy_res res;
 	struct nfs4_exception src_exception = {
@@ -247,7 +357,11 @@ ssize_t nfs42_proc_copy(struct file *src, loff_t pos_src,
 		if (err == -ENOTSUPP) {
 			err = -EOPNOTSUPP;
 			break;
-		} if (err == -EAGAIN) {
+		} else if (err == -EAGAIN) {
+			dst_exception.retry = 1;
+			continue;
+		} else if (err == -NFS4ERR_OFFLOAD_NO_REQS && !args.sync) {
+			args.sync = true;
 			dst_exception.retry = 1;
 			continue;
 		}
@@ -264,6 +378,89 @@ ssize_t nfs42_proc_copy(struct file *src, loff_t pos_src,
 	return err;
 }
 
+struct nfs42_offloadcancel_data {
+	struct nfs_server *seq_server;
+	struct nfs42_offload_status_args args;
+	struct nfs42_offload_status_res res;
+};
+
+static void nfs42_offload_cancel_prepare(struct rpc_task *task, void *calldata)
+{
+	struct nfs42_offloadcancel_data *data = calldata;
+
+	nfs4_setup_sequence(data->seq_server->nfs_client,
+				&data->args.osa_seq_args,
+				&data->res.osr_seq_res, task);
+}
+
+static void nfs42_offload_cancel_done(struct rpc_task *task, void *calldata)
+{
+	struct nfs42_offloadcancel_data *data = calldata;
+
+	nfs41_sequence_done(task, &data->res.osr_seq_res);
+	if (task->tk_status &&
+		nfs4_async_handle_error(task, data->seq_server, NULL,
+			NULL) == -EAGAIN)
+		rpc_restart_call_prepare(task);
+}
+
+static void nfs42_free_offloadcancel_data(void *data)
+{
+	kfree(data);
+}
+
+static const struct rpc_call_ops nfs42_offload_cancel_ops = {
+	.rpc_call_prepare = nfs42_offload_cancel_prepare,
+	.rpc_call_done = nfs42_offload_cancel_done,
+	.rpc_release = nfs42_free_offloadcancel_data,
+};
+
+static int nfs42_do_offload_cancel_async(struct file *dst,
+					 nfs4_stateid *stateid)
+{
+	struct nfs_server *dst_server = NFS_SERVER(file_inode(dst));
+	struct nfs42_offloadcancel_data *data = NULL;
+	struct nfs_open_context *ctx = nfs_file_open_context(dst);
+	struct rpc_task *task;
+	struct rpc_message msg = {
+		.rpc_proc = &nfs4_procedures[NFSPROC4_CLNT_OFFLOAD_CANCEL],
+		.rpc_cred = ctx->cred,
+	};
+	struct rpc_task_setup task_setup_data = {
+		.rpc_client = dst_server->client,
+		.rpc_message = &msg,
+		.callback_ops = &nfs42_offload_cancel_ops,
+		.workqueue = nfsiod_workqueue,
+		.flags = RPC_TASK_ASYNC,
+	};
+	int status;
+
+	if (!(dst_server->caps & NFS_CAP_OFFLOAD_CANCEL))
+		return -EOPNOTSUPP;
+
+	data = kzalloc(sizeof(struct nfs42_offloadcancel_data), GFP_NOFS);
+	if (data == NULL)
+		return -ENOMEM;
+
+	data->seq_server = dst_server;
+	data->args.osa_src_fh = NFS_FH(file_inode(dst));
+	memcpy(&data->args.osa_stateid, stateid,
+		sizeof(data->args.osa_stateid));
+	msg.rpc_argp = &data->args;
+	msg.rpc_resp = &data->res;
+	task_setup_data.callback_data = data;
+	nfs4_init_sequence(&data->args.osa_seq_args, &data->res.osr_seq_res,
+			   1, 0);
+	task = rpc_run_task(&task_setup_data);
+	if (IS_ERR(task))
+		return PTR_ERR(task);
+	status = rpc_wait_for_completion_task(task);
+	if (status == -ENOTSUPP)
+		dst_server->caps &= ~NFS_CAP_OFFLOAD_CANCEL;
+	rpc_put_task(task);
+	return status;
+}
+
 static loff_t _nfs42_proc_llseek(struct file *filep,
 		struct nfs_lock_context *lock, loff_t offset, int whence)
 {
diff --git a/fs/nfs/nfs42xdr.c b/fs/nfs/nfs42xdr.c
index 5966e1e..69f72ed 100644
--- a/fs/nfs/nfs42xdr.c
+++ b/fs/nfs/nfs42xdr.c
@@ -26,6 +26,9 @@
 					 NFS42_WRITE_RES_SIZE + \
 					 1 /* cr_consecutive */ + \
 					 1 /* cr_synchronous */)
+#define encode_offload_cancel_maxsz	(op_encode_hdr_maxsz + \
+					 XDR_QUADLEN(NFS4_STATEID_SIZE))
+#define decode_offload_cancel_maxsz	(op_decode_hdr_maxsz)
 #define encode_deallocate_maxsz		(op_encode_hdr_maxsz + \
 					 encode_fallocate_maxsz)
 #define decode_deallocate_maxsz		(op_decode_hdr_maxsz)
@@ -75,6 +78,12 @@
 					 decode_putfh_maxsz + \
 					 decode_copy_maxsz + \
 					 decode_commit_maxsz)
+#define NFS4_enc_offload_cancel_sz	(compound_encode_hdr_maxsz + \
+					 encode_putfh_maxsz + \
+					 encode_offload_cancel_maxsz)
+#define NFS4_dec_offload_cancel_sz	(compound_decode_hdr_maxsz + \
+					 decode_putfh_maxsz + \
+					 decode_offload_cancel_maxsz)
 #define NFS4_enc_deallocate_sz		(compound_encode_hdr_maxsz + \
 					 encode_putfh_maxsz + \
 					 encode_deallocate_maxsz + \
@@ -141,10 +150,18 @@ static void encode_copy(struct xdr_stream *xdr,
 	encode_uint64(xdr, args->count);
 
 	encode_uint32(xdr, 1); /* consecutive = true */
-	encode_uint32(xdr, 1); /* synchronous = true */
+	encode_uint32(xdr, args->sync);
 	encode_uint32(xdr, 0); /* src server list */
 }
 
+static void encode_offload_cancel(struct xdr_stream *xdr,
+				  const struct nfs42_offload_status_args *args,
+				  struct compound_hdr *hdr)
+{
+	encode_op_hdr(xdr, OP_OFFLOAD_CANCEL, decode_offload_cancel_maxsz, hdr);
+	encode_nfs4_stateid(xdr, &args->osa_stateid);
+}
+
 static void encode_deallocate(struct xdr_stream *xdr,
 			      const struct nfs42_falloc_args *args,
 			      struct compound_hdr *hdr)
@@ -256,7 +273,27 @@ static void nfs4_xdr_enc_copy(struct rpc_rqst *req,
 	encode_savefh(xdr, &hdr);
 	encode_putfh(xdr, args->dst_fh, &hdr);
 	encode_copy(xdr, args, &hdr);
-	encode_copy_commit(xdr, args, &hdr);
+	if (args->sync)
+		encode_copy_commit(xdr, args, &hdr);
+	encode_nops(&hdr);
+}
+
+/*
+ * Encode OFFLOAD_CANEL request
+ */
+static void nfs4_xdr_enc_offload_cancel(struct rpc_rqst *req,
+					struct xdr_stream *xdr,
+					const void *data)
+{
+	const struct nfs42_offload_status_args *args = data;
+	struct compound_hdr hdr = {
+		.minorversion = nfs4_xdr_minorversion(&args->osa_seq_args),
+	};
+
+	encode_compound_hdr(xdr, req, &hdr);
+	encode_sequence(xdr, &args->osa_seq_args, &hdr);
+	encode_putfh(xdr, args->osa_src_fh, &hdr);
+	encode_offload_cancel(xdr, args, &hdr);
 	encode_nops(&hdr);
 }
 
@@ -353,21 +390,23 @@ static int decode_write_response(struct xdr_stream *xdr,
 				 struct nfs42_write_res *res)
 {
 	__be32 *p;
+	int status, count;
 
-	p = xdr_inline_decode(xdr, 4 + 8 + 4);
+	p = xdr_inline_decode(xdr, 4);
 	if (unlikely(!p))
 		goto out_overflow;
-
-	/*
-	 * We never use asynchronous mode, so warn if a server returns
-	 * a stateid.
-	 */
-	if (unlikely(*p != 0)) {
-		pr_err_once("%s: server has set unrequested "
-				"asynchronous mode\n", __func__);
+	count = be32_to_cpup(p);
+	if (count > 1)
 		return -EREMOTEIO;
+	else if (count == 1) {
+		status = decode_opaque_fixed(xdr, &res->stateid,
+				NFS4_STATEID_SIZE);
+		if (unlikely(status))
+			goto out_overflow;
 	}
-	p++;
+	p = xdr_inline_decode(xdr, 8 + 4);
+	if (unlikely(!p))
+		goto out_overflow;
 	p = xdr_decode_hyper(p, &res->count);
 	res->verifier.committed = be32_to_cpup(p);
 	return decode_verifier(xdr, &res->verifier.verifier);
@@ -413,6 +452,12 @@ static int decode_copy(struct xdr_stream *xdr, struct nfs42_copy_res *res)
 	return decode_copy_requirements(xdr, res);
 }
 
+static int decode_offload_cancel(struct xdr_stream *xdr,
+				 struct nfs42_offload_status_res *res)
+{
+	return decode_op_hdr(xdr, OP_OFFLOAD_CANCEL);
+}
+
 static int decode_deallocate(struct xdr_stream *xdr, struct nfs42_falloc_res *res)
 {
 	return decode_op_hdr(xdr, OP_DEALLOCATE);
@@ -507,7 +552,34 @@ static int nfs4_xdr_dec_copy(struct rpc_rqst *rqstp,
 	status = decode_copy(xdr, res);
 	if (status)
 		goto out;
-	status = decode_commit(xdr, &res->commit_res);
+	if (res->commit_res.verf)
+		status = decode_commit(xdr, &res->commit_res);
+out:
+	return status;
+}
+
+/*
+ * Decode OFFLOAD_CANCEL response
+ */
+static int nfs4_xdr_dec_offload_cancel(struct rpc_rqst *rqstp,
+				       struct xdr_stream *xdr,
+				       void *data)
+{
+	struct nfs42_offload_status_res *res = data;
+	struct compound_hdr hdr;
+	int status;
+
+	status = decode_compound_hdr(xdr, &hdr);
+	if (status)
+		goto out;
+	status = decode_sequence(xdr, &res->osr_seq_res, rqstp);
+	if (status)
+		goto out;
+	status = decode_putfh(xdr);
+	if (status)
+		goto out;
+	status = decode_offload_cancel(xdr, res);
+
 out:
 	return status;
 }
diff --git a/fs/nfs/nfs4_fs.h b/fs/nfs/nfs4_fs.h
index 51beb6e..3a69041 100644
--- a/fs/nfs/nfs4_fs.h
+++ b/fs/nfs/nfs4_fs.h
@@ -163,6 +163,9 @@ enum {
 	NFS_STATE_RECOVERY_FAILED,	/* OPEN stateid state recovery failed */
 	NFS_STATE_MAY_NOTIFY_LOCK,	/* server may CB_NOTIFY_LOCK */
 	NFS_STATE_CHANGE_WAIT,		/* A state changing operation is outstanding */
+#ifdef CONFIG_NFS_V4_2
+	NFS_CLNT_DST_SSC_COPY_STATE,    /* dst server open state on client*/
+#endif /* CONFIG_NFS_V4_2 */
 };
 
 struct nfs4_state {
@@ -273,6 +276,9 @@ int nfs4_replace_transport(struct nfs_server *server,
 
 /* nfs4proc.c */
 extern int nfs4_handle_exception(struct nfs_server *, int, struct nfs4_exception *);
+extern int nfs4_async_handle_error(struct rpc_task *task,
+				   struct nfs_server *server,
+				   struct nfs4_state *state, long *timeout);
 extern int nfs4_call_sync(struct rpc_clnt *, struct nfs_server *,
 			  struct rpc_message *, struct nfs4_sequence_args *,
 			  struct nfs4_sequence_res *, int);
@@ -505,7 +511,7 @@ extern int nfs4_sequence_done(struct rpc_task *task,
 			      struct nfs4_sequence_res *res);
 
 extern void nfs4_free_lock_state(struct nfs_server *server, struct nfs4_lock_state *lsp);
-
+extern int nfs4_proc_commit(struct file *dst, __u64 offset, __u32 count, struct nfs_commitres *res);
 extern const nfs4_stateid zero_stateid;
 extern const nfs4_stateid invalid_stateid;
 
diff --git a/fs/nfs/nfs4client.c b/fs/nfs/nfs4client.c
index 9796314..146e308 100644
--- a/fs/nfs/nfs4client.c
+++ b/fs/nfs/nfs4client.c
@@ -156,9 +156,23 @@ nfs4_shutdown_ds_clients(struct nfs_client *clp)
 	}
 }
 
+static void
+nfs4_cleanup_callback(struct nfs_client *clp)
+{
+	struct nfs4_copy_state *cp_state;
+
+	while (!list_empty(&clp->pending_cb_stateids)) {
+		cp_state = list_entry(clp->pending_cb_stateids.next,
+					struct nfs4_copy_state, copies);
+		list_del(&cp_state->copies);
+		kfree(cp_state);
+	}
+}
+
 void nfs41_shutdown_client(struct nfs_client *clp)
 {
 	if (nfs4_has_session(clp)) {
+		nfs4_cleanup_callback(clp);
 		nfs4_shutdown_ds_clients(clp);
 		nfs4_destroy_session(clp->cl_session);
 		nfs4_destroy_clientid(clp);
@@ -202,6 +216,7 @@ struct nfs_client *nfs4_alloc_client(const struct nfs_client_initdata *cl_init)
 #if IS_ENABLED(CONFIG_NFS_V4_1)
 	init_waitqueue_head(&clp->cl_lock_waitq);
 #endif
+	INIT_LIST_HEAD(&clp->pending_cb_stateids);
 	return clp;
 
 error:
@@ -1127,7 +1142,7 @@ struct nfs_server *nfs4_create_referral_server(struct nfs_clone_mount *data,
 	nfs_server_copy_userdata(server, parent_server);
 
 	/* Get a client representation */
-#ifdef CONFIG_SUNRPC_XPRT_RDMA
+#if IS_ENABLED(CONFIG_SUNRPC_XPRT_RDMA)
 	rpc_set_port(data->addr, NFS_RDMA_PORT);
 	error = nfs4_set_client(server, data->hostname,
 				data->addr,
@@ -1139,7 +1154,7 @@ struct nfs_server *nfs4_create_referral_server(struct nfs_clone_mount *data,
 				parent_client->cl_net);
 	if (!error)
 		goto init_server;
-#endif	/* CONFIG_SUNRPC_XPRT_RDMA */
+#endif	/* IS_ENABLED(CONFIG_SUNRPC_XPRT_RDMA) */
 
 	rpc_set_port(data->addr, NFS_PORT);
 	error = nfs4_set_client(server, data->hostname,
@@ -1153,7 +1168,7 @@ struct nfs_server *nfs4_create_referral_server(struct nfs_clone_mount *data,
 	if (error < 0)
 		goto error;
 
-#ifdef CONFIG_SUNRPC_XPRT_RDMA
+#if IS_ENABLED(CONFIG_SUNRPC_XPRT_RDMA)
 init_server:
 #endif
 	error = nfs_init_server_rpcclient(server, parent_server->client->cl_timeout, data->authflavor);
diff --git a/fs/nfs/nfs4file.c b/fs/nfs/nfs4file.c
index 6b3b372..4288a6e 100644
--- a/fs/nfs/nfs4file.c
+++ b/fs/nfs/nfs4file.c
@@ -133,10 +133,15 @@ static ssize_t nfs4_copy_file_range(struct file *file_in, loff_t pos_in,
 				    struct file *file_out, loff_t pos_out,
 				    size_t count, unsigned int flags)
 {
+	ssize_t ret;
+
 	if (file_inode(file_in) == file_inode(file_out))
 		return -EINVAL;
-
-	return nfs42_proc_copy(file_in, pos_in, file_out, pos_out, count);
+retry:
+	ret = nfs42_proc_copy(file_in, pos_in, file_out, pos_out, count);
+	if (ret == -EAGAIN)
+		goto retry;
+	return ret;
 }
 
 static loff_t nfs4_file_llseek(struct file *filep, loff_t offset, int whence)
@@ -149,6 +154,7 @@ static loff_t nfs4_file_llseek(struct file *filep, loff_t offset, int whence)
 		ret = nfs42_proc_llseek(filep, offset, whence);
 		if (ret != -ENOTSUPP)
 			return ret;
+		/* Fall through */
 	default:
 		return nfs_file_llseek(filep, offset, whence);
 	}
diff --git a/fs/nfs/nfs4idmap.c b/fs/nfs/nfs4idmap.c
index b6f9d84b..3f23b68 100644
--- a/fs/nfs/nfs4idmap.c
+++ b/fs/nfs/nfs4idmap.c
@@ -506,6 +506,7 @@ static int nfs_idmap_prepare_message(char *desc, struct idmap *idmap,
 	switch (token) {
 	case Opt_find_uid:
 		im->im_type = IDMAP_TYPE_USER;
+		/* Fall through */
 	case Opt_find_gid:
 		im->im_conv = IDMAP_CONV_NAMETOID;
 		ret = match_strlcpy(im->im_name, &substr, IDMAP_NAMESZ);
@@ -513,9 +514,12 @@ static int nfs_idmap_prepare_message(char *desc, struct idmap *idmap,
 
 	case Opt_find_user:
 		im->im_type = IDMAP_TYPE_USER;
+		/* Fall through */
 	case Opt_find_group:
 		im->im_conv = IDMAP_CONV_IDTONAME;
 		ret = match_int(&substr, &im->im_id);
+		if (ret)
+			goto out;
 		break;
 
 	default:
diff --git a/fs/nfs/nfs4proc.c b/fs/nfs/nfs4proc.c
index b790976..34830f6 100644
--- a/fs/nfs/nfs4proc.c
+++ b/fs/nfs/nfs4proc.c
@@ -449,6 +449,7 @@ static int nfs4_do_handle_exception(struct nfs_server *server,
 						stateid);
 				goto wait_on_recovery;
 			}
+			/* Fall through */
 		case -NFS4ERR_OPENMODE:
 			if (inode) {
 				int err;
@@ -501,8 +502,10 @@ static int nfs4_do_handle_exception(struct nfs_server *server,
 				ret = -EBUSY;
 				break;
 			}
+			/* Fall through */
 		case -NFS4ERR_DELAY:
 			nfs_inc_server_stats(server, NFSIOS_DELAY);
+			/* Fall through */
 		case -NFS4ERR_GRACE:
 		case -NFS4ERR_LAYOUTTRYLATER:
 		case -NFS4ERR_RECALLCONFLICT:
@@ -581,12 +584,19 @@ nfs4_async_handle_exception(struct rpc_task *task, struct nfs_server *server,
 		ret = -EIO;
 	return ret;
 out_retry:
-	if (ret == 0)
+	if (ret == 0) {
 		exception->retry = 1;
+		/*
+		 * For NFS4ERR_MOVED, the client transport will need to
+		 * be recomputed after migration recovery has completed.
+		 */
+		if (errorcode == -NFS4ERR_MOVED)
+			rpc_task_release_transport(task);
+	}
 	return ret;
 }
 
-static int
+int
 nfs4_async_handle_error(struct rpc_task *task, struct nfs_server *server,
 			struct nfs4_state *state, long *timeout)
 {
@@ -1071,15 +1081,30 @@ int nfs4_call_sync(struct rpc_clnt *clnt,
 	return nfs4_call_sync_sequence(clnt, server, msg, args, res);
 }
 
-static void update_changeattr(struct inode *dir, struct nfs4_change_info *cinfo,
-		unsigned long timestamp)
+static void
+nfs4_inc_nlink_locked(struct inode *inode)
+{
+	NFS_I(inode)->cache_validity |= NFS_INO_INVALID_OTHER;
+	inc_nlink(inode);
+}
+
+static void
+nfs4_dec_nlink_locked(struct inode *inode)
+{
+	NFS_I(inode)->cache_validity |= NFS_INO_INVALID_OTHER;
+	drop_nlink(inode);
+}
+
+static void
+update_changeattr_locked(struct inode *dir, struct nfs4_change_info *cinfo,
+		unsigned long timestamp, unsigned long cache_validity)
 {
 	struct nfs_inode *nfsi = NFS_I(dir);
 
-	spin_lock(&dir->i_lock);
 	nfsi->cache_validity |= NFS_INO_INVALID_CTIME
 		| NFS_INO_INVALID_MTIME
-		| NFS_INO_INVALID_DATA;
+		| NFS_INO_INVALID_DATA
+		| cache_validity;
 	if (cinfo->atomic && cinfo->before == inode_peek_iversion_raw(dir)) {
 		nfsi->cache_validity &= ~NFS_INO_REVAL_PAGECACHE;
 		nfsi->attrtimeo_timestamp = jiffies;
@@ -1092,7 +1117,16 @@ static void update_changeattr(struct inode *dir, struct nfs4_change_info *cinfo,
 	inode_set_iversion_raw(dir, cinfo->after);
 	nfsi->read_cache_jiffies = timestamp;
 	nfsi->attr_gencount = nfs_inc_attr_generation_counter();
+	nfsi->cache_validity &= ~NFS_INO_INVALID_CHANGE;
 	nfs_fscache_invalidate(dir);
+}
+
+static void
+update_changeattr(struct inode *dir, struct nfs4_change_info *cinfo,
+		unsigned long timestamp, unsigned long cache_validity)
+{
+	spin_lock(&dir->i_lock);
+	update_changeattr_locked(dir, cinfo, timestamp, cache_validity);
 	spin_unlock(&dir->i_lock);
 }
 
@@ -1354,6 +1388,7 @@ static int can_open_delegated(struct nfs_delegation *delegation, fmode_t fmode,
 	case NFS4_OPEN_CLAIM_PREVIOUS:
 		if (!test_bit(NFS_DELEGATION_NEED_RECLAIM, &delegation->flags))
 			break;
+		/* Fall through */
 	default:
 		return 0;
 	}
@@ -1773,6 +1808,10 @@ nfs4_opendata_check_deleg(struct nfs4_opendata *data, struct nfs4_state *state)
 				data->o_res.delegation_type,
 				&data->o_res.delegation,
 				data->o_res.pagemod_limit);
+
+	if (data->o_res.do_recall)
+		nfs_async_inode_return_delegation(state->inode,
+						  &data->o_res.delegation);
 }
 
 /*
@@ -2119,6 +2158,7 @@ int nfs4_open_delegation_recall(struct nfs_open_context *ctx,
 		err = nfs4_open_recover_helper(opendata, FMODE_WRITE);
 		if (err)
 			break;
+		/* Fall through */
 	case FMODE_READ:
 		err = nfs4_open_recover_helper(opendata, FMODE_READ);
 	}
@@ -2248,6 +2288,7 @@ static void nfs4_open_prepare(struct rpc_task *task, void *calldata)
 	case NFS4_OPEN_CLAIM_DELEG_CUR_FH:
 	case NFS4_OPEN_CLAIM_DELEG_PREV_FH:
 		data->o_arg.open_bitmap = &nfs4_open_noattr_bitmap[0];
+		/* Fall through */
 	case NFS4_OPEN_CLAIM_FH:
 		task->tk_msg.rpc_proc = &nfs4_procedures[NFSPROC4_CLNT_OPEN_NOATTR];
 	}
@@ -2481,7 +2522,7 @@ static int _nfs4_proc_open(struct nfs4_opendata *data,
 		if (data->file_created ||
 		    inode_peek_iversion_raw(dir) != o_res->cinfo.after)
 			update_changeattr(dir, &o_res->cinfo,
-					o_res->f_attr->time_start);
+					o_res->f_attr->time_start, 0);
 	}
 	if ((o_res->rflags & NFS4_OPEN_RESULT_LOCKTYPE_POSIX) == 0)
 		server->caps &= ~NFS_CAP_POSIX_LOCK;
@@ -2843,6 +2884,9 @@ static int _nfs4_open_and_get_state(struct nfs4_opendata *opendata,
 				nfs_save_change_attribute(d_inode(opendata->dir)));
 	}
 
+	/* Parse layoutget results before we check for access */
+	pnfs_parse_lgopen(state->inode, opendata->lgp, ctx);
+
 	ret = nfs4_opendata_access(sp->so_cred, opendata, state, fmode, flags);
 	if (ret != 0)
 		goto out;
@@ -2851,8 +2895,6 @@ static int _nfs4_open_and_get_state(struct nfs4_opendata *opendata,
 		nfs_inode_attach_open_context(ctx);
 		if (read_seqcount_retry(&sp->so_reclaim_seqcount, seq))
 			nfs4_schedule_stateid_recovery(server, state);
-		else
-			pnfs_parse_lgopen(state->inode, opendata->lgp, ctx);
 	}
 
 out:
@@ -3220,7 +3262,8 @@ static void nfs4_close_done(struct rpc_task *task, void *data)
 			calldata->res.lr_res = NULL;
 			break;
 		case -NFS4ERR_OLD_STATEID:
-			if (nfs4_refresh_layout_stateid(&calldata->arg.lr_args->stateid,
+			if (nfs4_layoutreturn_refresh_stateid(&calldata->arg.lr_args->stateid,
+						&calldata->arg.lr_args->range,
 						calldata->inode))
 				goto lr_restart;
 			/* Fallthrough */
@@ -4236,7 +4279,8 @@ nfs4_proc_create(struct inode *dir, struct dentry *dentry, struct iattr *sattr,
 	return status;
 }
 
-static int _nfs4_proc_remove(struct inode *dir, const struct qstr *name)
+static int
+_nfs4_proc_remove(struct inode *dir, const struct qstr *name, u32 ftype)
 {
 	struct nfs_server *server = NFS_SERVER(dir);
 	struct nfs_removeargs args = {
@@ -4255,8 +4299,14 @@ static int _nfs4_proc_remove(struct inode *dir, const struct qstr *name)
 	int status;
 
 	status = nfs4_call_sync(server->client, server, &msg, &args.seq_args, &res.seq_res, 1);
-	if (status == 0)
-		update_changeattr(dir, &res.cinfo, timestamp);
+	if (status == 0) {
+		spin_lock(&dir->i_lock);
+		update_changeattr_locked(dir, &res.cinfo, timestamp, 0);
+		/* Removing a directory decrements nlink in the parent */
+		if (ftype == NF4DIR && dir->i_nlink > 2)
+			nfs4_dec_nlink_locked(dir);
+		spin_unlock(&dir->i_lock);
+	}
 	return status;
 }
 
@@ -4273,7 +4323,7 @@ static int nfs4_proc_remove(struct inode *dir, struct dentry *dentry)
 			nfs4_inode_make_writeable(inode);
 	}
 	do {
-		err = _nfs4_proc_remove(dir, &dentry->d_name);
+		err = _nfs4_proc_remove(dir, &dentry->d_name, NF4REG);
 		trace_nfs4_remove(dir, &dentry->d_name, err);
 		err = nfs4_handle_exception(NFS_SERVER(dir), err,
 				&exception);
@@ -4287,7 +4337,7 @@ static int nfs4_proc_rmdir(struct inode *dir, const struct qstr *name)
 	int err;
 
 	do {
-		err = _nfs4_proc_remove(dir, name);
+		err = _nfs4_proc_remove(dir, name, NF4DIR);
 		trace_nfs4_remove(dir, name, err);
 		err = nfs4_handle_exception(NFS_SERVER(dir), err,
 				&exception);
@@ -4331,7 +4381,8 @@ static int nfs4_proc_unlink_done(struct rpc_task *task, struct inode *dir)
 				    &data->timeout) == -EAGAIN)
 		return 0;
 	if (task->tk_status == 0)
-		update_changeattr(dir, &res->cinfo, res->dir_attr->time_start);
+		update_changeattr(dir, &res->cinfo,
+				res->dir_attr->time_start, 0);
 	return 1;
 }
 
@@ -4373,9 +4424,18 @@ static int nfs4_proc_rename_done(struct rpc_task *task, struct inode *old_dir,
 		return 0;
 
 	if (task->tk_status == 0) {
-		update_changeattr(old_dir, &res->old_cinfo, res->old_fattr->time_start);
-		if (new_dir != old_dir)
-			update_changeattr(new_dir, &res->new_cinfo, res->new_fattr->time_start);
+		if (new_dir != old_dir) {
+			/* Note: If we moved a directory, nlink will change */
+			update_changeattr(old_dir, &res->old_cinfo,
+					res->old_fattr->time_start,
+					NFS_INO_INVALID_OTHER);
+			update_changeattr(new_dir, &res->new_cinfo,
+					res->new_fattr->time_start,
+					NFS_INO_INVALID_OTHER);
+		} else
+			update_changeattr(old_dir, &res->old_cinfo,
+					res->old_fattr->time_start,
+					0);
 	}
 	return 1;
 }
@@ -4416,7 +4476,7 @@ static int _nfs4_proc_link(struct inode *inode, struct inode *dir, const struct
 
 	status = nfs4_call_sync(server->client, server, &msg, &arg.seq_args, &res.seq_res, 1);
 	if (!status) {
-		update_changeattr(dir, &res.cinfo, res.fattr->time_start);
+		update_changeattr(dir, &res.cinfo, res.fattr->time_start, 0);
 		status = nfs_post_op_update_inode(inode, res.fattr);
 		if (!status)
 			nfs_setsecurity(inode, res.fattr, res.label);
@@ -4491,8 +4551,13 @@ static int nfs4_do_create(struct inode *dir, struct dentry *dentry, struct nfs4_
 	int status = nfs4_call_sync(NFS_SERVER(dir)->client, NFS_SERVER(dir), &data->msg,
 				    &data->arg.seq_args, &data->res.seq_res, 1);
 	if (status == 0) {
-		update_changeattr(dir, &data->res.dir_cinfo,
-				data->res.fattr->time_start);
+		spin_lock(&dir->i_lock);
+		update_changeattr_locked(dir, &data->res.dir_cinfo,
+				data->res.fattr->time_start, 0);
+		/* Creating a directory bumps nlink in the parent */
+		if (data->arg.ftype == NF4DIR)
+			nfs4_inc_nlink_locked(dir);
+		spin_unlock(&dir->i_lock);
 		status = nfs_instantiate(dentry, data->res.fh, data->res.fattr, data->res.label);
 	}
 	return status;
@@ -5073,6 +5138,40 @@ static void nfs4_proc_commit_setup(struct nfs_commit_data *data, struct rpc_mess
 	nfs4_state_protect(server->nfs_client, NFS_SP4_MACH_CRED_COMMIT, clnt, msg);
 }
 
+static int _nfs4_proc_commit(struct file *dst, struct nfs_commitargs *args,
+				struct nfs_commitres *res)
+{
+	struct inode *dst_inode = file_inode(dst);
+	struct nfs_server *server = NFS_SERVER(dst_inode);
+	struct rpc_message msg = {
+		.rpc_proc = &nfs4_procedures[NFSPROC4_CLNT_COMMIT],
+		.rpc_argp = args,
+		.rpc_resp = res,
+	};
+
+	args->fh = NFS_FH(dst_inode);
+	return nfs4_call_sync(server->client, server, &msg,
+			&args->seq_args, &res->seq_res, 1);
+}
+
+int nfs4_proc_commit(struct file *dst, __u64 offset, __u32 count, struct nfs_commitres *res)
+{
+	struct nfs_commitargs args = {
+		.offset = offset,
+		.count = count,
+	};
+	struct nfs_server *dst_server = NFS_SERVER(file_inode(dst));
+	struct nfs4_exception exception = { };
+	int status;
+
+	do {
+		status = _nfs4_proc_commit(dst, &args, res);
+		status = nfs4_handle_exception(dst_server, status, &exception);
+	} while (exception.retry);
+
+	return status;
+}
+
 struct nfs4_renewdata {
 	struct nfs_client	*client;
 	unsigned long		timestamp;
@@ -5902,7 +6001,8 @@ static void nfs4_delegreturn_done(struct rpc_task *task, void *calldata)
 			data->res.lr_res = NULL;
 			break;
 		case -NFS4ERR_OLD_STATEID:
-			if (nfs4_refresh_layout_stateid(&data->args.lr_args->stateid,
+			if (nfs4_layoutreturn_refresh_stateid(&data->args.lr_args->stateid,
+						&data->args.lr_args->range,
 						data->inode))
 				goto lr_restart;
 			/* Fallthrough */
@@ -6209,11 +6309,13 @@ static void nfs4_locku_done(struct rpc_task *task, void *data)
 			if (nfs4_update_lock_stateid(calldata->lsp,
 					&calldata->res.stateid))
 				break;
+			/* Fall through */
 		case -NFS4ERR_ADMIN_REVOKED:
 		case -NFS4ERR_EXPIRED:
 			nfs4_free_revoked_stateid(calldata->server,
 					&calldata->arg.stateid,
 					task->tk_msg.rpc_cred);
+			/* Fall through */
 		case -NFS4ERR_BAD_STATEID:
 		case -NFS4ERR_OLD_STATEID:
 		case -NFS4ERR_STALE_STATEID:
@@ -7727,7 +7829,7 @@ static int nfs4_sp4_select_mode(struct nfs_client *clp,
 	}
 out:
 	clp->cl_sp4_flags = flags;
-	return 0;
+	return ret;
 }
 
 struct nfs41_exchange_id_data {
@@ -8168,7 +8270,7 @@ static void nfs4_init_channel_attrs(struct nfs41_create_session_args *args,
 	args->bc_attrs.max_resp_sz = max_bc_payload;
 	args->bc_attrs.max_resp_sz_cached = 0;
 	args->bc_attrs.max_ops = NFS4_MAX_BACK_CHANNEL_OPS;
-	args->bc_attrs.max_reqs = min_t(unsigned short, max_session_cb_slots, 1);
+	args->bc_attrs.max_reqs = max_t(unsigned short, max_session_cb_slots, 1);
 
 	dprintk("%s: Back Channel : max_rqst_sz=%u max_resp_sz=%u "
 		"max_resp_sz_cached=%u max_ops=%u max_reqs=%u\n",
@@ -8851,7 +8953,8 @@ static void nfs4_layoutreturn_done(struct rpc_task *task, void *calldata)
 	server = NFS_SERVER(lrp->args.inode);
 	switch (task->tk_status) {
 	case -NFS4ERR_OLD_STATEID:
-		if (nfs4_refresh_layout_stateid(&lrp->args.stateid,
+		if (nfs4_layoutreturn_refresh_stateid(&lrp->args.stateid,
+					&lrp->args.range,
 					lrp->args.inode))
 			goto out_restart;
 		/* Fallthrough */
@@ -9554,6 +9657,7 @@ static const struct nfs4_minor_version_ops nfs_v4_2_minor_ops = {
 		| NFS_CAP_LGOPEN
 		| NFS_CAP_ALLOCATE
 		| NFS_CAP_COPY
+		| NFS_CAP_OFFLOAD_CANCEL
 		| NFS_CAP_DEALLOCATE
 		| NFS_CAP_SEEK
 		| NFS_CAP_LAYOUTSTATS
diff --git a/fs/nfs/nfs4state.c b/fs/nfs/nfs4state.c
index 2bf2eaa..3df0eb5 100644
--- a/fs/nfs/nfs4state.c
+++ b/fs/nfs/nfs4state.c
@@ -274,7 +274,7 @@ static int nfs4_drain_slot_tbl(struct nfs4_slot_table *tbl)
 static int nfs4_begin_drain_session(struct nfs_client *clp)
 {
 	struct nfs4_session *ses = clp->cl_session;
-	int ret = 0;
+	int ret;
 
 	if (clp->cl_slot_tbl)
 		return nfs4_drain_slot_tbl(clp->cl_slot_tbl);
@@ -1525,6 +1525,7 @@ static int nfs4_reclaim_locks(struct nfs4_state *state, const struct nfs4_state_
 		default:
 			pr_err("NFS: %s: unhandled error %d\n",
 					__func__, status);
+			/* Fall through */
 		case -ENOMEM:
 		case -NFS4ERR_DENIED:
 		case -NFS4ERR_RECLAIM_BAD:
@@ -1588,6 +1589,22 @@ static int nfs4_reclaim_open_state(struct nfs4_state_owner *sp, const struct nfs
 				}
 				clear_bit(NFS_STATE_RECLAIM_NOGRACE,
 					&state->flags);
+#ifdef CONFIG_NFS_V4_2
+				if (test_bit(NFS_CLNT_DST_SSC_COPY_STATE, &state->flags)) {
+					struct nfs4_copy_state *copy;
+
+					spin_lock(&sp->so_server->nfs_client->cl_lock);
+					list_for_each_entry(copy, &sp->so_server->ss_copies, copies) {
+						if (memcmp(&state->stateid.other, &copy->parent_state->stateid.other, NFS4_STATEID_SIZE))
+							continue;
+						copy->flags = 1;
+						complete(&copy->completion);
+						printk("AGLO: server rebooted waking up the copy\n");
+						break;
+					}
+					spin_unlock(&sp->so_server->nfs_client->cl_lock);
+				}
+#endif /* CONFIG_NFS_V4_2 */
 				nfs4_put_open_state(state);
 				spin_lock(&sp->so_lock);
 				goto restart;
@@ -1597,6 +1614,7 @@ static int nfs4_reclaim_open_state(struct nfs4_state_owner *sp, const struct nfs
 			default:
 				printk(KERN_ERR "NFS: %s: unhandled error %d\n",
 					__func__, status);
+				/* Fall through */
 			case -ENOENT:
 			case -ENOMEM:
 			case -EACCES:
@@ -1608,6 +1626,7 @@ static int nfs4_reclaim_open_state(struct nfs4_state_owner *sp, const struct nfs
 				break;
 			case -EAGAIN:
 				ssleep(1);
+				/* Fall through */
 			case -NFS4ERR_ADMIN_REVOKED:
 			case -NFS4ERR_STALE_STATEID:
 			case -NFS4ERR_OLD_STATEID:
@@ -1939,7 +1958,9 @@ static int nfs4_establish_lease(struct nfs_client *clp)
 		clp->cl_mvops->reboot_recovery_ops;
 	int status;
 
-	nfs4_begin_drain_session(clp);
+	status = nfs4_begin_drain_session(clp);
+	if (status != 0)
+		return status;
 	cred = nfs4_get_clid_cred(clp);
 	if (cred == NULL)
 		return -ENOENT;
@@ -2027,7 +2048,9 @@ static int nfs4_try_migration(struct nfs_server *server, struct rpc_cred *cred)
 		goto out;
 	}
 
-	nfs4_begin_drain_session(clp);
+	status = nfs4_begin_drain_session(clp);
+	if (status != 0)
+		return status;
 
 	status = nfs4_replace_transport(server, locations);
 	if (status != 0) {
@@ -2190,9 +2213,11 @@ int nfs4_discover_server_trunking(struct nfs_client *clp,
 	case -ETIMEDOUT:
 		if (clnt->cl_softrtry)
 			break;
+		/* Fall through */
 	case -NFS4ERR_DELAY:
 	case -EAGAIN:
 		ssleep(1);
+		/* Fall through */
 	case -NFS4ERR_STALE_CLIENTID:
 		dprintk("NFS: %s after status %d, retrying\n",
 			__func__, status);
@@ -2204,6 +2229,7 @@ int nfs4_discover_server_trunking(struct nfs_client *clp,
 		}
 		if (clnt->cl_auth->au_flavor == RPC_AUTH_UNIX)
 			break;
+		/* Fall through */
 	case -NFS4ERR_CLID_INUSE:
 	case -NFS4ERR_WRONGSEC:
 		/* No point in retrying if we already used RPC_AUTH_UNIX */
@@ -2374,7 +2400,9 @@ static int nfs4_reset_session(struct nfs_client *clp)
 
 	if (!nfs4_has_session(clp))
 		return 0;
-	nfs4_begin_drain_session(clp);
+	status = nfs4_begin_drain_session(clp);
+	if (status != 0)
+		return status;
 	cred = nfs4_get_clid_cred(clp);
 	status = nfs4_proc_destroy_session(clp->cl_session, cred);
 	switch (status) {
@@ -2417,7 +2445,9 @@ static int nfs4_bind_conn_to_session(struct nfs_client *clp)
 
 	if (!nfs4_has_session(clp))
 		return 0;
-	nfs4_begin_drain_session(clp);
+	ret = nfs4_begin_drain_session(clp);
+	if (ret != 0)
+		return ret;
 	cred = nfs4_get_clid_cred(clp);
 	ret = nfs4_proc_bind_conn_to_session(clp, cred);
 	if (cred)
diff --git a/fs/nfs/nfs4xdr.c b/fs/nfs/nfs4xdr.c
index cd41d25..b7bde12 100644
--- a/fs/nfs/nfs4xdr.c
+++ b/fs/nfs/nfs4xdr.c
@@ -7789,6 +7789,7 @@ const struct rpc_procinfo nfs4_procedures[] = {
 	PROC42(LAYOUTSTATS,	enc_layoutstats,	dec_layoutstats),
 	PROC42(CLONE,		enc_clone,		dec_clone),
 	PROC42(COPY,		enc_copy,		dec_copy),
+	PROC42(OFFLOAD_CANCEL,	enc_offload_cancel,	dec_offload_cancel),
 	PROC(LOOKUPP,		enc_lookupp,		dec_lookupp),
 };
 
diff --git a/fs/nfs/pagelist.c b/fs/nfs/pagelist.c
index 67d19cd..bb5476a 100644
--- a/fs/nfs/pagelist.c
+++ b/fs/nfs/pagelist.c
@@ -561,6 +561,7 @@ static void nfs_pgio_rpcsetup(struct nfs_pgio_header *hdr,
 	case FLUSH_COND_STABLE:
 		if (nfs_reqs_to_commit(cinfo))
 			break;
+		/* fall through */
 	default:
 		hdr->args.stable = NFS_FILE_SYNC;
 	}
diff --git a/fs/nfs/pnfs.c b/fs/nfs/pnfs.c
index bcc3add..e8f232d 100644
--- a/fs/nfs/pnfs.c
+++ b/fs/nfs/pnfs.c
@@ -361,18 +361,32 @@ pnfs_clear_lseg_state(struct pnfs_layout_segment *lseg,
 /*
  * Update the seqid of a layout stateid
  */
-bool nfs4_refresh_layout_stateid(nfs4_stateid *dst, struct inode *inode)
+bool nfs4_layoutreturn_refresh_stateid(nfs4_stateid *dst,
+		struct pnfs_layout_range *dst_range,
+		struct inode *inode)
 {
 	struct pnfs_layout_hdr *lo;
+	struct pnfs_layout_range range = {
+		.iomode = IOMODE_ANY,
+		.offset = 0,
+		.length = NFS4_MAX_UINT64,
+	};
 	bool ret = false;
+	LIST_HEAD(head);
+	int err;
 
 	spin_lock(&inode->i_lock);
 	lo = NFS_I(inode)->layout;
 	if (lo && nfs4_stateid_match_other(dst, &lo->plh_stateid)) {
-		dst->seqid = lo->plh_stateid.seqid;
-		ret = true;
+		err = pnfs_mark_matching_lsegs_return(lo, &head, &range, 0);
+		if (err != -EBUSY) {
+			dst->seqid = lo->plh_stateid.seqid;
+			*dst_range = range;
+			ret = true;
+		}
 	}
 	spin_unlock(&inode->i_lock);
+	pnfs_free_lseg_list(&head);
 	return ret;
 }
 
@@ -1018,7 +1032,6 @@ pnfs_alloc_init_layoutget_args(struct inode *ino,
 	nfs4_stateid_copy(&lgp->args.stateid, stateid);
 	lgp->gfp_flags = gfp_flags;
 	lgp->cred = get_rpccred(ctx->cred);
-	lgp->callback_count = raw_seqcount_begin(&server->nfs_client->cl_callback_count);
 	return lgp;
 }
 
@@ -1160,12 +1173,21 @@ static bool
 pnfs_layout_need_return(struct pnfs_layout_hdr *lo)
 {
 	struct pnfs_layout_segment *s;
+	enum pnfs_iomode iomode;
+	u32 seq;
 
 	if (!test_bit(NFS_LAYOUT_RETURN_REQUESTED, &lo->plh_flags))
 		return false;
 
-	/* Defer layoutreturn until all lsegs are done */
+	seq = lo->plh_return_seq;
+	iomode = lo->plh_return_iomode;
+
+	/* Defer layoutreturn until all recalled lsegs are done */
 	list_for_each_entry(s, &lo->plh_segs, pls_list) {
+		if (seq && pnfs_seqid_is_newer(s->pls_seq, seq))
+			continue;
+		if (iomode != IOMODE_ANY && s->pls_range.iomode != iomode)
+			continue;
 		if (test_bit(NFS_LSEG_LAYOUTRETURN, &s->pls_flags))
 			return false;
 	}
@@ -1609,7 +1631,7 @@ pnfs_lseg_range_match(const struct pnfs_layout_range *ls_range,
 	    (range->iomode != ls_range->iomode &&
 	     strict_iomode) ||
 	    !pnfs_lseg_range_intersecting(ls_range, range))
-		return 0;
+		return false;
 
 	/* range1 covers only the first byte in the range */
 	range1 = *range;
@@ -1631,7 +1653,6 @@ pnfs_find_lseg(struct pnfs_layout_hdr *lo,
 
 	list_for_each_entry(lseg, &lo->plh_segs, pls_list) {
 		if (test_bit(NFS_LSEG_VALID, &lseg->pls_flags) &&
-		    !test_bit(NFS_LSEG_LAYOUTRETURN, &lseg->pls_flags) &&
 		    pnfs_lseg_range_match(&lseg->pls_range, range,
 					  strict_iomode)) {
 			ret = pnfs_get_lseg(lseg);
@@ -1731,6 +1752,17 @@ static bool pnfs_prepare_to_retry_layoutget(struct pnfs_layout_hdr *lo)
 				   TASK_UNINTERRUPTIBLE);
 }
 
+static void nfs_layoutget_begin(struct pnfs_layout_hdr *lo)
+{
+	atomic_inc(&lo->plh_outstanding);
+}
+
+static void nfs_layoutget_end(struct pnfs_layout_hdr *lo)
+{
+	if (atomic_dec_and_test(&lo->plh_outstanding))
+		wake_up_var(&lo->plh_outstanding);
+}
+
 static void pnfs_clear_first_layoutget(struct pnfs_layout_hdr *lo)
 {
 	unsigned long *bitlock = &lo->plh_flags;
@@ -1791,12 +1823,6 @@ pnfs_update_layout(struct inode *ino,
 		goto out;
 	}
 
-	if (iomode == IOMODE_READ && i_size_read(ino) == 0) {
-		trace_pnfs_update_layout(ino, pos, count, iomode, lo, lseg,
-				 PNFS_UPDATE_LAYOUT_RD_ZEROLEN);
-		goto out;
-	}
-
 	if (pnfs_within_mdsthreshold(ctx, ino, iomode)) {
 		trace_pnfs_update_layout(ino, pos, count, iomode, lo, lseg,
 				 PNFS_UPDATE_LAYOUT_MDSTHRESH);
@@ -1830,6 +1856,21 @@ pnfs_update_layout(struct inode *ino,
 		goto out_unlock;
 	}
 
+	/*
+	 * If the layout segment list is empty, but there are outstanding
+	 * layoutget calls, then they might be subject to a layoutrecall.
+	 */
+	if (list_empty(&lo->plh_segs) &&
+	    atomic_read(&lo->plh_outstanding) != 0) {
+		spin_unlock(&ino->i_lock);
+		if (wait_var_event_killable(&lo->plh_outstanding,
+					atomic_read(&lo->plh_outstanding) == 0
+					|| !list_empty(&lo->plh_segs)))
+			goto out_put_layout_hdr;
+		pnfs_put_layout_hdr(lo);
+		goto lookup_again;
+	}
+
 	lseg = pnfs_find_lseg(lo, &arg, strict_iomode);
 	if (lseg) {
 		trace_pnfs_update_layout(ino, pos, count, iomode, lo, lseg,
@@ -1903,7 +1944,7 @@ pnfs_update_layout(struct inode *ino,
 				PNFS_UPDATE_LAYOUT_BLOCKED);
 		goto out_unlock;
 	}
-	atomic_inc(&lo->plh_outstanding);
+	nfs_layoutget_begin(lo);
 	spin_unlock(&ino->i_lock);
 
 	_add_to_server_list(lo, server);
@@ -1920,14 +1961,14 @@ pnfs_update_layout(struct inode *ino,
 	if (!lgp) {
 		trace_pnfs_update_layout(ino, pos, count, iomode, lo, NULL,
 					 PNFS_UPDATE_LAYOUT_NOMEM);
-		atomic_dec(&lo->plh_outstanding);
+		nfs_layoutget_end(lo);
 		goto out_put_layout_hdr;
 	}
 
 	lseg = nfs4_proc_layoutget(lgp, &timeout);
 	trace_pnfs_update_layout(ino, pos, count, iomode, lo, lseg,
 				 PNFS_UPDATE_LAYOUT_SEND_LAYOUTGET);
-	atomic_dec(&lo->plh_outstanding);
+	nfs_layoutget_end(lo);
 	if (IS_ERR(lseg)) {
 		switch(PTR_ERR(lseg)) {
 		case -EBUSY:
@@ -1935,15 +1976,6 @@ pnfs_update_layout(struct inode *ino,
 				lseg = NULL;
 			break;
 		case -ERECALLCONFLICT:
-			/* Huh? We hold no layouts, how is there a recall? */
-			if (first) {
-				lseg = NULL;
-				break;
-			}
-			/* Destroy the existing layout and start over */
-			if (time_after(jiffies, giveup))
-				pnfs_destroy_layout(NFS_I(ino));
-			/* Fallthrough */
 		case -EAGAIN:
 			break;
 		default:
@@ -2022,7 +2054,7 @@ _pnfs_grab_empty_layout(struct inode *ino, struct nfs_open_context *ctx)
 		goto out_unlock;
 	if (test_and_set_bit(NFS_LAYOUT_FIRST_LAYOUTGET, &lo->plh_flags))
 		goto out_unlock;
-	atomic_inc(&lo->plh_outstanding);
+	nfs_layoutget_begin(lo);
 	spin_unlock(&ino->i_lock);
 	_add_to_server_list(lo, NFS_SERVER(ino));
 	return lo;
@@ -2146,9 +2178,6 @@ void pnfs_parse_lgopen(struct inode *ino, struct nfs4_layoutget *lgp,
 	} else
 		lo = NFS_I(lgp->args.inode)->layout;
 
-	if (read_seqcount_retry(&srv->nfs_client->cl_callback_count,
-				lgp->callback_count))
-		return;
 	lseg = pnfs_layout_process(lgp);
 	if (!IS_ERR(lseg)) {
 		iomode = lgp->args.range.iomode;
@@ -2163,8 +2192,8 @@ void nfs4_lgopen_release(struct nfs4_layoutget *lgp)
 		struct inode *inode = lgp->args.inode;
 		if (inode) {
 			struct pnfs_layout_hdr *lo = NFS_I(inode)->layout;
-			atomic_dec(&lo->plh_outstanding);
 			pnfs_clear_first_layoutget(lo);
+			nfs_layoutget_end(lo);
 		}
 		pnfs_layoutget_free(lgp);
 	}
@@ -2238,15 +2267,31 @@ pnfs_layout_process(struct nfs4_layoutget *lgp)
 	return ERR_PTR(-EAGAIN);
 }
 
+static int
+mark_lseg_invalid_or_return(struct pnfs_layout_segment *lseg,
+		struct list_head *tmp_list)
+{
+	if (!mark_lseg_invalid(lseg, tmp_list))
+		return 0;
+	pnfs_cache_lseg_for_layoutreturn(lseg->pls_layout, lseg);
+	return 1;
+}
+
 /**
  * pnfs_mark_matching_lsegs_return - Free or return matching layout segments
  * @lo: pointer to layout header
  * @tmp_list: list header to be used with pnfs_free_lseg_list()
  * @return_range: describe layout segment ranges to be returned
+ * @seq: stateid seqid to match
  *
  * This function is mainly intended for use by layoutrecall. It attempts
  * to free the layout segment immediately, or else to mark it for return
  * as soon as its reference count drops to zero.
+ *
+ * Returns
+ * - 0: a layoutreturn needs to be scheduled.
+ * - EBUSY: there are layout segment that are still in use.
+ * - ENOENT: there are no layout segments that need to be returned.
  */
 int
 pnfs_mark_matching_lsegs_return(struct pnfs_layout_hdr *lo,
@@ -2259,9 +2304,6 @@ pnfs_mark_matching_lsegs_return(struct pnfs_layout_hdr *lo,
 
 	dprintk("%s:Begin lo %p\n", __func__, lo);
 
-	if (list_empty(&lo->plh_segs))
-		return 0;
-
 	assert_spin_locked(&lo->plh_inode->i_lock);
 
 	list_for_each_entry_safe(lseg, next, &lo->plh_segs, pls_list)
@@ -2271,16 +2313,23 @@ pnfs_mark_matching_lsegs_return(struct pnfs_layout_hdr *lo,
 				lseg, lseg->pls_range.iomode,
 				lseg->pls_range.offset,
 				lseg->pls_range.length);
-			if (mark_lseg_invalid(lseg, tmp_list))
+			if (mark_lseg_invalid_or_return(lseg, tmp_list))
 				continue;
 			remaining++;
 			set_bit(NFS_LSEG_LAYOUTRETURN, &lseg->pls_flags);
 		}
 
-	if (remaining)
+	if (remaining) {
 		pnfs_set_plh_return_info(lo, return_range->iomode, seq);
+		return -EBUSY;
+	}
 
-	return remaining;
+	if (!list_empty(&lo->plh_return_segs)) {
+		pnfs_set_plh_return_info(lo, return_range->iomode, seq);
+		return 0;
+	}
+
+	return -ENOENT;
 }
 
 void pnfs_error_mark_layout_for_return(struct inode *inode,
@@ -2305,7 +2354,7 @@ void pnfs_error_mark_layout_for_return(struct inode *inode,
 	 * segments at hand when sending layoutreturn. See pnfs_put_lseg()
 	 * for how it works.
 	 */
-	if (!pnfs_mark_matching_lsegs_return(lo, &lo->plh_return_segs, &range, 0)) {
+	if (pnfs_mark_matching_lsegs_return(lo, &lo->plh_return_segs, &range, 0) != -EBUSY) {
 		nfs4_stateid stateid;
 		enum pnfs_iomode iomode;
 
diff --git a/fs/nfs/pnfs.h b/fs/nfs/pnfs.h
index 3fe8142..ece367e 100644
--- a/fs/nfs/pnfs.h
+++ b/fs/nfs/pnfs.h
@@ -259,7 +259,9 @@ int pnfs_destroy_layouts_byfsid(struct nfs_client *clp,
 		bool is_recall);
 int pnfs_destroy_layouts_byclid(struct nfs_client *clp,
 		bool is_recall);
-bool nfs4_refresh_layout_stateid(nfs4_stateid *dst, struct inode *inode);
+bool nfs4_layoutreturn_refresh_stateid(nfs4_stateid *dst,
+		struct pnfs_layout_range *dst_range,
+		struct inode *inode);
 void pnfs_put_layout_hdr(struct pnfs_layout_hdr *lo);
 void pnfs_set_layout_stateid(struct pnfs_layout_hdr *lo,
 			     const nfs4_stateid *new,
@@ -780,7 +782,8 @@ static inline void nfs4_pnfs_v3_ds_connect_unload(void)
 {
 }
 
-static inline bool nfs4_refresh_layout_stateid(nfs4_stateid *dst,
+static inline bool nfs4_layoutreturn_refresh_stateid(nfs4_stateid *dst,
+		struct pnfs_layout_range *dst_range,
 		struct inode *inode)
 {
 	return false;
diff --git a/fs/nfs/pnfs_nfs.c b/fs/nfs/pnfs_nfs.c
index 32ba2d4..d5e4d3c 100644
--- a/fs/nfs/pnfs_nfs.c
+++ b/fs/nfs/pnfs_nfs.c
@@ -61,7 +61,7 @@ EXPORT_SYMBOL_GPL(pnfs_generic_commit_release);
 
 /* The generic layer is about to remove the req from the commit list.
  * If this will make the bucket empty, it will need to put the lseg reference.
- * Note this must be called holding i_lock
+ * Note this must be called holding nfsi->commit_mutex
  */
 void
 pnfs_generic_clear_request_commit(struct nfs_page *req,
@@ -149,9 +149,7 @@ void pnfs_generic_recover_commit_reqs(struct list_head *dst,
 		if (list_empty(&b->written)) {
 			freeme = b->wlseg;
 			b->wlseg = NULL;
-			spin_unlock(&cinfo->inode->i_lock);
 			pnfs_put_lseg(freeme);
-			spin_lock(&cinfo->inode->i_lock);
 			goto restart;
 		}
 	}
@@ -167,7 +165,7 @@ static void pnfs_generic_retry_commit(struct nfs_commit_info *cinfo, int idx)
 	LIST_HEAD(pages);
 	int i;
 
-	spin_lock(&cinfo->inode->i_lock);
+	mutex_lock(&NFS_I(cinfo->inode)->commit_mutex);
 	for (i = idx; i < fl_cinfo->nbuckets; i++) {
 		bucket = &fl_cinfo->buckets[i];
 		if (list_empty(&bucket->committing))
@@ -177,12 +175,12 @@ static void pnfs_generic_retry_commit(struct nfs_commit_info *cinfo, int idx)
 		list_for_each(pos, &bucket->committing)
 			cinfo->ds->ncommitting--;
 		list_splice_init(&bucket->committing, &pages);
-		spin_unlock(&cinfo->inode->i_lock);
+		mutex_unlock(&NFS_I(cinfo->inode)->commit_mutex);
 		nfs_retry_commit(&pages, freeme, cinfo, i);
 		pnfs_put_lseg(freeme);
-		spin_lock(&cinfo->inode->i_lock);
+		mutex_lock(&NFS_I(cinfo->inode)->commit_mutex);
 	}
-	spin_unlock(&cinfo->inode->i_lock);
+	mutex_unlock(&NFS_I(cinfo->inode)->commit_mutex);
 }
 
 static unsigned int
@@ -222,13 +220,13 @@ void pnfs_fetch_commit_bucket_list(struct list_head *pages,
 	struct list_head *pos;
 
 	bucket = &cinfo->ds->buckets[data->ds_commit_index];
-	spin_lock(&cinfo->inode->i_lock);
+	mutex_lock(&NFS_I(cinfo->inode)->commit_mutex);
 	list_for_each(pos, &bucket->committing)
 		cinfo->ds->ncommitting--;
 	list_splice_init(&bucket->committing, pages);
 	data->lseg = bucket->clseg;
 	bucket->clseg = NULL;
-	spin_unlock(&cinfo->inode->i_lock);
+	mutex_unlock(&NFS_I(cinfo->inode)->commit_mutex);
 
 }
 
diff --git a/fs/nfs/super.c b/fs/nfs/super.c
index 5e470e2..ac4b2f0 100644
--- a/fs/nfs/super.c
+++ b/fs/nfs/super.c
@@ -884,7 +884,7 @@ int nfs_show_stats(struct seq_file *m, struct dentry *root)
 #endif
 	seq_printf(m, "\n");
 
-	rpc_print_iostats(m, nfss->client);
+	rpc_clnt_show_stats(m, nfss->client);
 
 	return 0;
 }
@@ -2899,7 +2899,7 @@ static int param_set_portnr(const char *val, const struct kernel_param *kp)
 	if (!val)
 		return -EINVAL;
 	ret = kstrtoul(val, 0, &num);
-	if (ret == -EINVAL || num > NFS_CALLBACK_MAXPORTNR)
+	if (ret || num > NFS_CALLBACK_MAXPORTNR)
 		return -EINVAL;
 	*((unsigned int *)kp->arg) = num;
 	return 0;
diff --git a/fs/nfs/write.c b/fs/nfs/write.c
index a057b4f..586726a 100644
--- a/fs/nfs/write.c
+++ b/fs/nfs/write.c
@@ -1406,6 +1406,8 @@ static void nfs_async_write_error(struct list_head *head)
 static void nfs_async_write_reschedule_io(struct nfs_pgio_header *hdr)
 {
 	nfs_async_write_error(&hdr->pages);
+	filemap_fdatawrite_range(hdr->inode->i_mapping, hdr->args.offset,
+			hdr->args.offset + hdr->args.count - 1);
 }
 
 static const struct nfs_pgio_completion_ops nfs_async_write_completion_ops = {
diff --git a/include/linux/nfs4.h b/include/linux/nfs4.h
index 0877ed3..1b06f0b 100644
--- a/include/linux/nfs4.h
+++ b/include/linux/nfs4.h
@@ -535,6 +535,7 @@ enum {
 	NFSPROC4_CLNT_LAYOUTSTATS,
 	NFSPROC4_CLNT_CLONE,
 	NFSPROC4_CLNT_COPY,
+	NFSPROC4_CLNT_OFFLOAD_CANCEL,
 
 	NFSPROC4_CLNT_LOOKUPP,
 };
diff --git a/include/linux/nfs_fs.h b/include/linux/nfs_fs.h
index 2f129bb..a0831e9 100644
--- a/include/linux/nfs_fs.h
+++ b/include/linux/nfs_fs.h
@@ -185,6 +185,17 @@ struct nfs_inode {
 	struct inode		vfs_inode;
 };
 
+struct nfs4_copy_state {
+	struct list_head	copies;
+	nfs4_stateid		stateid;
+	struct completion	completion;
+	uint64_t		count;
+	struct nfs_writeverf	verf;
+	int			error;
+	int			flags;
+	struct nfs4_state	*parent_state;
+};
+
 /*
  * Access bit flags
  */
diff --git a/include/linux/nfs_fs_sb.h b/include/linux/nfs_fs_sb.h
index 74ae3e1..bf39d9c 100644
--- a/include/linux/nfs_fs_sb.h
+++ b/include/linux/nfs_fs_sb.h
@@ -28,7 +28,6 @@ struct nfs41_impl_id;
 struct nfs_client {
 	refcount_t		cl_count;
 	atomic_t		cl_mds_count;
-	seqcount_t		cl_callback_count;
 	int			cl_cons_state;	/* current construction state (-ve: init error) */
 #define NFS_CS_READY		0		/* ready to be used */
 #define NFS_CS_INITING		1		/* busy initialising */
@@ -122,6 +121,7 @@ struct nfs_client {
 #endif
 
 	struct net		*cl_net;
+	struct list_head	pending_cb_stateids;
 };
 
 /*
@@ -209,6 +209,7 @@ struct nfs_server {
 	struct list_head	state_owners_lru;
 	struct list_head	layouts;
 	struct list_head	delegations;
+	struct list_head	ss_copies;
 
 	unsigned long		mig_gen;
 	unsigned long		mig_status;
@@ -256,5 +257,6 @@ struct nfs_server {
 #define NFS_CAP_LAYOUTSTATS	(1U << 22)
 #define NFS_CAP_CLONE		(1U << 23)
 #define NFS_CAP_COPY		(1U << 24)
+#define NFS_CAP_OFFLOAD_CANCEL	(1U << 25)
 
 #endif
diff --git a/include/linux/nfs_xdr.h b/include/linux/nfs_xdr.h
index 712eed1..bd1c889 100644
--- a/include/linux/nfs_xdr.h
+++ b/include/linux/nfs_xdr.h
@@ -271,7 +271,6 @@ struct nfs4_layoutget {
 	struct nfs4_layoutget_args args;
 	struct nfs4_layoutget_res res;
 	struct rpc_cred *cred;
-	unsigned callback_count;
 	gfp_t gfp_flags;
 };
 
@@ -1389,9 +1388,11 @@ struct nfs42_copy_args {
 	u64				dst_pos;
 
 	u64				count;
+	bool				sync;
 };
 
 struct nfs42_write_res {
+	nfs4_stateid		stateid;
 	u64			count;
 	struct nfs_writeverf	verifier;
 };
@@ -1404,6 +1405,18 @@ struct nfs42_copy_res {
 	struct nfs_commitres		commit_res;
 };
 
+struct nfs42_offload_status_args {
+	struct nfs4_sequence_args	osa_seq_args;
+	struct nfs_fh			*osa_src_fh;
+	nfs4_stateid			osa_stateid;
+};
+
+struct nfs42_offload_status_res {
+	struct nfs4_sequence_res	osr_seq_res;
+	uint64_t			osr_count;
+	int				osr_status;
+};
+
 struct nfs42_seek_args {
 	struct nfs4_sequence_args	seq_args;
 
diff --git a/include/linux/sunrpc/auth.h b/include/linux/sunrpc/auth.h
index d9af474..58a6765 100644
--- a/include/linux/sunrpc/auth.h
+++ b/include/linux/sunrpc/auth.h
@@ -125,7 +125,8 @@ struct rpc_authops {
 	struct module		*owner;
 	rpc_authflavor_t	au_flavor;	/* flavor (RPC_AUTH_*) */
 	char *			au_name;
-	struct rpc_auth *	(*create)(struct rpc_auth_create_args *, struct rpc_clnt *);
+	struct rpc_auth *	(*create)(const struct rpc_auth_create_args *,
+					  struct rpc_clnt *);
 	void			(*destroy)(struct rpc_auth *);
 
 	int			(*hash_cred)(struct auth_cred *, unsigned int);
@@ -174,7 +175,7 @@ struct rpc_cred *	rpc_lookup_generic_cred(struct auth_cred *, int, gfp_t);
 struct rpc_cred *	rpc_lookup_machine_cred(const char *service_name);
 int			rpcauth_register(const struct rpc_authops *);
 int			rpcauth_unregister(const struct rpc_authops *);
-struct rpc_auth *	rpcauth_create(struct rpc_auth_create_args *,
+struct rpc_auth *	rpcauth_create(const struct rpc_auth_create_args *,
 				struct rpc_clnt *);
 void			rpcauth_release(struct rpc_auth *);
 rpc_authflavor_t	rpcauth_get_pseudoflavor(rpc_authflavor_t,
diff --git a/include/linux/sunrpc/clnt.h b/include/linux/sunrpc/clnt.h
index 9b11b6a..73d5c4a 100644
--- a/include/linux/sunrpc/clnt.h
+++ b/include/linux/sunrpc/clnt.h
@@ -156,6 +156,7 @@ int		rpc_switch_client_transport(struct rpc_clnt *,
 
 void		rpc_shutdown_client(struct rpc_clnt *);
 void		rpc_release_client(struct rpc_clnt *);
+void		rpc_task_release_transport(struct rpc_task *);
 void		rpc_task_release_client(struct rpc_task *);
 
 int		rpcb_create_local(struct net *);
diff --git a/include/linux/sunrpc/metrics.h b/include/linux/sunrpc/metrics.h
index 9baed7b..1b37513 100644
--- a/include/linux/sunrpc/metrics.h
+++ b/include/linux/sunrpc/metrics.h
@@ -82,7 +82,7 @@ void			rpc_count_iostats(const struct rpc_task *,
 					  struct rpc_iostats *);
 void			rpc_count_iostats_metrics(const struct rpc_task *,
 					  struct rpc_iostats *);
-void			rpc_print_iostats(struct seq_file *, struct rpc_clnt *);
+void			rpc_clnt_show_stats(struct seq_file *, struct rpc_clnt *);
 void			rpc_free_iostats(struct rpc_iostats *);
 
 #else  /*  CONFIG_PROC_FS  */
@@ -95,7 +95,7 @@ static inline void rpc_count_iostats_metrics(const struct rpc_task *task,
 {
 }
 
-static inline void rpc_print_iostats(struct seq_file *seq, struct rpc_clnt *clnt) {}
+static inline void rpc_clnt_show_stats(struct seq_file *seq, struct rpc_clnt *clnt) {}
 static inline void rpc_free_iostats(struct rpc_iostats *stats) {}
 
 #endif  /*  CONFIG_PROC_FS  */
diff --git a/net/sunrpc/auth.c b/net/sunrpc/auth.c
index d2623b9..305ecea 100644
--- a/net/sunrpc/auth.c
+++ b/net/sunrpc/auth.c
@@ -50,7 +50,7 @@ static int param_set_hashtbl_sz(const char *val, const struct kernel_param *kp)
 	if (!val)
 		goto out_inval;
 	ret = kstrtoul(val, 0, &num);
-	if (ret == -EINVAL)
+	if (ret)
 		goto out_inval;
 	nbits = fls(num - 1);
 	if (nbits > MAX_HASHTABLE_BITS || nbits < 2)
@@ -253,7 +253,7 @@ rpcauth_list_flavors(rpc_authflavor_t *array, int size)
 EXPORT_SYMBOL_GPL(rpcauth_list_flavors);
 
 struct rpc_auth *
-rpcauth_create(struct rpc_auth_create_args *args, struct rpc_clnt *clnt)
+rpcauth_create(const struct rpc_auth_create_args *args, struct rpc_clnt *clnt)
 {
 	struct rpc_auth		*auth;
 	const struct rpc_authops *ops;
diff --git a/net/sunrpc/auth_gss/auth_gss.c b/net/sunrpc/auth_gss/auth_gss.c
index 962afb8..21c0aa0 100644
--- a/net/sunrpc/auth_gss/auth_gss.c
+++ b/net/sunrpc/auth_gss/auth_gss.c
@@ -1016,7 +1016,7 @@ static void gss_pipe_free(struct gss_pipe *p)
  * parameters based on the input flavor (which must be a pseudoflavor)
  */
 static struct gss_auth *
-gss_create_new(struct rpc_auth_create_args *args, struct rpc_clnt *clnt)
+gss_create_new(const struct rpc_auth_create_args *args, struct rpc_clnt *clnt)
 {
 	rpc_authflavor_t flavor = args->pseudoflavor;
 	struct gss_auth *gss_auth;
@@ -1163,7 +1163,7 @@ gss_destroy(struct rpc_auth *auth)
  * (which is guaranteed to last as long as any of its descendants).
  */
 static struct gss_auth *
-gss_auth_find_or_add_hashed(struct rpc_auth_create_args *args,
+gss_auth_find_or_add_hashed(const struct rpc_auth_create_args *args,
 		struct rpc_clnt *clnt,
 		struct gss_auth *new)
 {
@@ -1200,7 +1200,8 @@ gss_auth_find_or_add_hashed(struct rpc_auth_create_args *args,
 }
 
 static struct gss_auth *
-gss_create_hashed(struct rpc_auth_create_args *args, struct rpc_clnt *clnt)
+gss_create_hashed(const struct rpc_auth_create_args *args,
+		  struct rpc_clnt *clnt)
 {
 	struct gss_auth *gss_auth;
 	struct gss_auth *new;
@@ -1219,7 +1220,7 @@ gss_create_hashed(struct rpc_auth_create_args *args, struct rpc_clnt *clnt)
 }
 
 static struct rpc_auth *
-gss_create(struct rpc_auth_create_args *args, struct rpc_clnt *clnt)
+gss_create(const struct rpc_auth_create_args *args, struct rpc_clnt *clnt)
 {
 	struct gss_auth *gss_auth;
 	struct rpc_xprt_switch *xps = rcu_access_pointer(clnt->cl_xpi.xpi_xpswitch);
@@ -1602,7 +1603,7 @@ static int gss_cred_is_negative_entry(struct rpc_cred *cred)
 	if (test_bit(RPCAUTH_CRED_NEGATIVE, &cred->cr_flags)) {
 		unsigned long now = jiffies;
 		unsigned long begin, expire;
-		struct gss_cred *gss_cred; 
+		struct gss_cred *gss_cred;
 
 		gss_cred = container_of(cred, struct gss_cred, gc_base);
 		begin = gss_cred->gc_upcall_timestamp;
diff --git a/net/sunrpc/auth_gss/gss_generic_token.c b/net/sunrpc/auth_gss/gss_generic_token.c
index 254defe..fe97f31 100644
--- a/net/sunrpc/auth_gss/gss_generic_token.c
+++ b/net/sunrpc/auth_gss/gss_generic_token.c
@@ -231,4 +231,3 @@ g_verify_token_header(struct xdr_netobj *mech, int *body_size,
 }
 
 EXPORT_SYMBOL_GPL(g_verify_token_header);
-
diff --git a/net/sunrpc/auth_gss/gss_krb5_crypto.c b/net/sunrpc/auth_gss/gss_krb5_crypto.c
index d31f868..0220e1c 100644
--- a/net/sunrpc/auth_gss/gss_krb5_crypto.c
+++ b/net/sunrpc/auth_gss/gss_krb5_crypto.c
@@ -1081,4 +1081,3 @@ krb5_rc4_setup_enc_key(struct krb5_ctx *kctx, struct crypto_skcipher *cipher,
 	dprintk("%s: returning %d\n", __func__, err);
 	return err;
 }
-
diff --git a/net/sunrpc/auth_gss/gss_krb5_keys.c b/net/sunrpc/auth_gss/gss_krb5_keys.c
index 8701331..f7fe2d2b85 100644
--- a/net/sunrpc/auth_gss/gss_krb5_keys.c
+++ b/net/sunrpc/auth_gss/gss_krb5_keys.c
@@ -324,4 +324,3 @@ u32 gss_krb5_aes_make_key(const struct gss_krb5_enctype *gk5e,
 err_out:
 	return ret;
 }
-
diff --git a/net/sunrpc/auth_gss/gss_krb5_seal.c b/net/sunrpc/auth_gss/gss_krb5_seal.c
index 94a2b3f..eaad9bc 100644
--- a/net/sunrpc/auth_gss/gss_krb5_seal.c
+++ b/net/sunrpc/auth_gss/gss_krb5_seal.c
@@ -229,4 +229,3 @@ gss_get_mic_kerberos(struct gss_ctx *gss_ctx, struct xdr_buf *text,
 		return gss_get_mic_v2(ctx, text, token);
 	}
 }
-
diff --git a/net/sunrpc/auth_gss/gss_krb5_unseal.c b/net/sunrpc/auth_gss/gss_krb5_unseal.c
index b601a73..ef2b25b 100644
--- a/net/sunrpc/auth_gss/gss_krb5_unseal.c
+++ b/net/sunrpc/auth_gss/gss_krb5_unseal.c
@@ -225,4 +225,3 @@ gss_verify_mic_kerberos(struct gss_ctx *gss_ctx,
 		return gss_verify_mic_v2(ctx, message_buffer, read_token);
 	}
 }
-
diff --git a/net/sunrpc/auth_gss/gss_krb5_wrap.c b/net/sunrpc/auth_gss/gss_krb5_wrap.c
index 9a1347f..39a2e67 100644
--- a/net/sunrpc/auth_gss/gss_krb5_wrap.c
+++ b/net/sunrpc/auth_gss/gss_krb5_wrap.c
@@ -621,4 +621,3 @@ gss_unwrap_kerberos(struct gss_ctx *gctx, int offset, struct xdr_buf *buf)
 		return gss_unwrap_kerberos_v2(kctx, offset, buf);
 	}
 }
-
diff --git a/net/sunrpc/auth_gss/svcauth_gss.c b/net/sunrpc/auth_gss/svcauth_gss.c
index 5089dbb..860f2a1b 100644
--- a/net/sunrpc/auth_gss/svcauth_gss.c
+++ b/net/sunrpc/auth_gss/svcauth_gss.c
@@ -1389,7 +1389,7 @@ static void destroy_use_gss_proxy_proc_entry(struct net *net)
 	struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
 
 	if (sn->use_gssp_proc) {
-		remove_proc_entry("use-gss-proxy", sn->proc_net_rpc); 
+		remove_proc_entry("use-gss-proxy", sn->proc_net_rpc);
 		clear_gssp_clnt(sn);
 	}
 }
diff --git a/net/sunrpc/auth_null.c b/net/sunrpc/auth_null.c
index 75d72e1..4b48228 100644
--- a/net/sunrpc/auth_null.c
+++ b/net/sunrpc/auth_null.c
@@ -19,7 +19,7 @@ static struct rpc_auth null_auth;
 static struct rpc_cred null_cred;
 
 static struct rpc_auth *
-nul_create(struct rpc_auth_create_args *args, struct rpc_clnt *clnt)
+nul_create(const struct rpc_auth_create_args *args, struct rpc_clnt *clnt)
 {
 	atomic_inc(&null_auth.au_count);
 	return &null_auth;
diff --git a/net/sunrpc/auth_unix.c b/net/sunrpc/auth_unix.c
index dafd6b8..185e56d 100644
--- a/net/sunrpc/auth_unix.c
+++ b/net/sunrpc/auth_unix.c
@@ -30,7 +30,7 @@ static struct rpc_auth		unix_auth;
 static const struct rpc_credops	unix_credops;
 
 static struct rpc_auth *
-unx_create(struct rpc_auth_create_args *args, struct rpc_clnt *clnt)
+unx_create(const struct rpc_auth_create_args *args, struct rpc_clnt *clnt)
 {
 	dprintk("RPC:       creating UNIX authenticator for client %p\n",
 			clnt);
diff --git a/net/sunrpc/backchannel_rqst.c b/net/sunrpc/backchannel_rqst.c
index c2c68a1..3c15a99 100644
--- a/net/sunrpc/backchannel_rqst.c
+++ b/net/sunrpc/backchannel_rqst.c
@@ -362,4 +362,3 @@ void xprt_complete_bc_request(struct rpc_rqst *req, uint32_t copied)
 	wake_up(&bc_serv->sv_cb_waitq);
 	spin_unlock(&bc_serv->sv_cb_lock);
 }
-
diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c
index d839c33..8ea2f5f 100644
--- a/net/sunrpc/clnt.c
+++ b/net/sunrpc/clnt.c
@@ -892,7 +892,7 @@ rpc_free_client(struct rpc_clnt *clnt)
 /*
  * Free an RPC client
  */
-static struct rpc_clnt * 
+static struct rpc_clnt *
 rpc_free_auth(struct rpc_clnt *clnt)
 {
 	if (clnt->cl_auth == NULL)
@@ -965,10 +965,20 @@ struct rpc_clnt *rpc_bind_new_program(struct rpc_clnt *old,
 }
 EXPORT_SYMBOL_GPL(rpc_bind_new_program);
 
+void rpc_task_release_transport(struct rpc_task *task)
+{
+	struct rpc_xprt *xprt = task->tk_xprt;
+
+	if (xprt) {
+		task->tk_xprt = NULL;
+		xprt_put(xprt);
+	}
+}
+EXPORT_SYMBOL_GPL(rpc_task_release_transport);
+
 void rpc_task_release_client(struct rpc_task *task)
 {
 	struct rpc_clnt *clnt = task->tk_client;
-	struct rpc_xprt *xprt = task->tk_xprt;
 
 	if (clnt != NULL) {
 		/* Remove from client task list */
@@ -979,12 +989,14 @@ void rpc_task_release_client(struct rpc_task *task)
 
 		rpc_release_client(clnt);
 	}
+	rpc_task_release_transport(task);
+}
 
-	if (xprt != NULL) {
-		task->tk_xprt = NULL;
-
-		xprt_put(xprt);
-	}
+static
+void rpc_task_set_transport(struct rpc_task *task, struct rpc_clnt *clnt)
+{
+	if (!task->tk_xprt)
+		task->tk_xprt = xprt_iter_get_next(&clnt->cl_xpi);
 }
 
 static
@@ -992,8 +1004,7 @@ void rpc_task_set_client(struct rpc_task *task, struct rpc_clnt *clnt)
 {
 
 	if (clnt != NULL) {
-		if (task->tk_xprt == NULL)
-			task->tk_xprt = xprt_iter_get_next(&clnt->cl_xpi);
+		rpc_task_set_transport(task, clnt);
 		task->tk_client = clnt;
 		atomic_inc(&clnt->cl_count);
 		if (clnt->cl_softrtry)
@@ -1512,6 +1523,7 @@ call_start(struct rpc_task *task)
 		clnt->cl_program->version[clnt->cl_vers]->counts[idx]++;
 	clnt->cl_stats->rpccnt++;
 	task->tk_action = call_reserve;
+	rpc_task_set_transport(task, clnt);
 }
 
 /*
diff --git a/net/sunrpc/rpcb_clnt.c b/net/sunrpc/rpcb_clnt.c
index c526f8f..c7872bc 100644
--- a/net/sunrpc/rpcb_clnt.c
+++ b/net/sunrpc/rpcb_clnt.c
@@ -213,7 +213,7 @@ static void rpcb_set_local(struct net *net, struct rpc_clnt *clnt,
 	sn->rpcb_local_clnt = clnt;
 	sn->rpcb_local_clnt4 = clnt4;
 	sn->rpcb_is_af_local = is_af_local ? 1 : 0;
-	smp_wmb(); 
+	smp_wmb();
 	sn->rpcb_users = 1;
 	dprintk("RPC:       created new rpcb local clients (rpcb_local_clnt: "
 		"%p, rpcb_local_clnt4: %p) for net %x%s\n",
diff --git a/net/sunrpc/stats.c b/net/sunrpc/stats.c
index f68aa46..71166b3 100644
--- a/net/sunrpc/stats.c
+++ b/net/sunrpc/stats.c
@@ -208,13 +208,39 @@ static void _print_name(struct seq_file *seq, unsigned int op,
 		seq_printf(seq, "\t%12u: ", op);
 }
 
-void rpc_print_iostats(struct seq_file *seq, struct rpc_clnt *clnt)
+static void _add_rpc_iostats(struct rpc_iostats *a, struct rpc_iostats *b)
 {
-	struct rpc_iostats *stats = clnt->cl_metrics;
+	a->om_ops += b->om_ops;
+	a->om_ntrans += b->om_ntrans;
+	a->om_timeouts += b->om_timeouts;
+	a->om_bytes_sent += b->om_bytes_sent;
+	a->om_bytes_recv += b->om_bytes_recv;
+	a->om_queue = ktime_add(a->om_queue, b->om_queue);
+	a->om_rtt = ktime_add(a->om_rtt, b->om_rtt);
+	a->om_execute = ktime_add(a->om_execute, b->om_execute);
+}
+
+static void _print_rpc_iostats(struct seq_file *seq, struct rpc_iostats *stats,
+			       int op, const struct rpc_procinfo *procs)
+{
+	_print_name(seq, op, procs);
+	seq_printf(seq, "%lu %lu %lu %Lu %Lu %Lu %Lu %Lu\n",
+		   stats->om_ops,
+		   stats->om_ntrans,
+		   stats->om_timeouts,
+		   stats->om_bytes_sent,
+		   stats->om_bytes_recv,
+		   ktime_to_ms(stats->om_queue),
+		   ktime_to_ms(stats->om_rtt),
+		   ktime_to_ms(stats->om_execute));
+}
+
+void rpc_clnt_show_stats(struct seq_file *seq, struct rpc_clnt *clnt)
+{
 	struct rpc_xprt *xprt;
 	unsigned int op, maxproc = clnt->cl_maxproc;
 
-	if (!stats)
+	if (!clnt->cl_metrics)
 		return;
 
 	seq_printf(seq, "\tRPC iostats version: %s  ", RPC_IOSTATS_VERS);
@@ -229,20 +255,18 @@ void rpc_print_iostats(struct seq_file *seq, struct rpc_clnt *clnt)
 
 	seq_printf(seq, "\tper-op statistics\n");
 	for (op = 0; op < maxproc; op++) {
-		struct rpc_iostats *metrics = &stats[op];
-		_print_name(seq, op, clnt->cl_procinfo);
-		seq_printf(seq, "%lu %lu %lu %Lu %Lu %Lu %Lu %Lu\n",
-				metrics->om_ops,
-				metrics->om_ntrans,
-				metrics->om_timeouts,
-				metrics->om_bytes_sent,
-				metrics->om_bytes_recv,
-				ktime_to_ms(metrics->om_queue),
-				ktime_to_ms(metrics->om_rtt),
-				ktime_to_ms(metrics->om_execute));
+		struct rpc_iostats stats = {};
+		struct rpc_clnt *next = clnt;
+		do {
+			_add_rpc_iostats(&stats, &next->cl_metrics[op]);
+			if (next == next->cl_parent)
+				break;
+			next = next->cl_parent;
+		} while (next);
+		_print_rpc_iostats(seq, &stats, op, clnt->cl_procinfo);
 	}
 }
-EXPORT_SYMBOL_GPL(rpc_print_iostats);
+EXPORT_SYMBOL_GPL(rpc_clnt_show_stats);
 
 /*
  * Register/unregister RPC proc files
@@ -310,4 +334,3 @@ void rpc_proc_exit(struct net *net)
 	dprintk("RPC:       unregistering /proc/net/rpc\n");
 	remove_proc_entry("rpc", net->proc_net);
 }
-
diff --git a/net/sunrpc/sunrpc.h b/net/sunrpc/sunrpc.h
index 09a0315..c9bacb3 100644
--- a/net/sunrpc/sunrpc.h
+++ b/net/sunrpc/sunrpc.h
@@ -57,4 +57,3 @@ int svc_send_common(struct socket *sock, struct xdr_buf *xdr,
 int rpc_clients_notifier_register(void);
 void rpc_clients_notifier_unregister(void);
 #endif /* _NET_SUNRPC_SUNRPC_H */
-
diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c
index 3fabf9f6..a8db2e3f 100644
--- a/net/sunrpc/xprt.c
+++ b/net/sunrpc/xprt.c
@@ -880,7 +880,7 @@ static void xprt_wait_on_pinned_rqst(struct rpc_rqst *req)
 __must_hold(&req->rq_xprt->recv_lock)
 {
 	struct rpc_task *task = req->rq_task;
-	
+
 	if (task && test_bit(RPC_TASK_MSG_RECV, &task->tk_runstate)) {
 		spin_unlock(&req->rq_xprt->recv_lock);
 		set_bit(RPC_TASK_MSG_RECV_WAIT, &task->tk_runstate);
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index 5efeba0..956a5ea 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -280,7 +280,6 @@ rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
 		++xprt->rx_xprt.connect_cookie;
 		connstate = -ECONNABORTED;
 connected:
-		xprt->rx_buf.rb_credits = 1;
 		ep->rep_connected = connstate;
 		rpcrdma_conn_func(ep);
 		wake_up_all(&ep->rep_connect_wait);
@@ -755,6 +754,7 @@ rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
 	}
 
 	ep->rep_connected = 0;
+	rpcrdma_post_recvs(r_xprt, true);
 
 	rc = rdma_connect(ia->ri_id, &ep->rep_remote_cma);
 	if (rc) {
@@ -773,8 +773,6 @@ rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
 
 	dprintk("RPC:       %s: connected\n", __func__);
 
-	rpcrdma_post_recvs(r_xprt, true);
-
 out:
 	if (rc)
 		ep->rep_connected = rc;
@@ -1171,6 +1169,7 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
 		list_add(&req->rl_list, &buf->rb_send_bufs);
 	}
 
+	buf->rb_credits = 1;
 	buf->rb_posted_receives = 0;
 	INIT_LIST_HEAD(&buf->rb_recv_bufs);
 
diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c
index 9e1c502..6b7539c 100644
--- a/net/sunrpc/xprtsock.c
+++ b/net/sunrpc/xprtsock.c
@@ -3375,4 +3375,3 @@ module_param_named(tcp_max_slot_table_entries, xprt_max_tcp_slot_table_entries,
 		   max_slot_table_size, 0644);
 module_param_named(udp_slot_table_entries, xprt_udp_slot_table_entries,
 		   slot_table_size, 0644);
-