All of lore.kernel.org
 help / color / mirror / Atom feed
From: Chuck Lever <chuck.lever-QHcLZuEGTsvQT0dZR+AlfA@public.gmane.org>
To: linux-rdma-u79uwXL29TY76Z2rM5mHXA@public.gmane.org,
	linux-nfs-u79uwXL29TY76Z2rM5mHXA@public.gmane.org
Subject: [PATCH v2 05/13] xprtrdma: Fix client lock-up after application signal fires
Date: Thu, 08 Jun 2017 11:52:20 -0400	[thread overview]
Message-ID: <20170608155220.18945.33047.stgit@manet.1015granger.net> (raw)
In-Reply-To: <20170608154339.18945.5500.stgit-FYjufvaPoItvLzlybtyyYzGyq/o6K9yX@public.gmane.org>

After a signal, the RPC client aborts synchronous RPCs running on
behalf of the signaled application.

The server is still executing those RPCs, and will write the results
back into the client's memory when it's done. By the time the server
writes the results, that memory is likely being used for other
purposes. Therefore xprtrdma has to immediately invalidate all
memory regions used by those aborted RPCs to prevent the server's
writes from clobbering that re-used memory.

With FMR memory registration, invalidation takes a relatively long
time. In fact, the invalidation is often still running when the
server tries to write the results into the memory regions that are
being invalidated.

This sets up a race between two processes:

1.  After the signal, xprt_rdma_free calls ro_unmap_safe.
2.  While ro_unmap_safe is still running, the server replies and
    rpcrdma_reply_handler runs, calling ro_unmap_sync.

Both processes invoke ib_unmap_fmr on the same FMR.

The mlx4 driver allows two ib_unmap_fmr calls on the same FMR at
the same time, but HCAs generally don't tolerate this. Sometimes
this can result in a system crash.

If the HCA happens to survive, rpcrdma_reply_handler continues. It
removes the rpc_rqst from rq_list and releases the transport_lock.
This enables xprt_rdma_free to run in another process, and the
rpc_rqst is released while rpcrdma_reply_handler is still waiting
for the ib_unmap_fmr call to finish.

But further down in rpcrdma_reply_handler, the transport_lock is
taken again, and "rqst" is dereferenced. If "rqst" has already been
released, this triggers a general protection fault. Since bottom-
halves are disabled, the system locks up.

Address both issues by reversing the order of the xprt_lookup_rqst
call and the ro_unmap_sync call. Introduce a separate lookup
mechanism for rpcrdma_req's to enable calling ro_unmap_sync before
xprt_lookup_rqst. Now the handler takes the transport_lock once
and holds it for the XID lookup and RPC completion.

BugLink: https://bugzilla.linux-nfs.org/show_bug.cgi?id=305
Fixes: 68791649a725 ('xprtrdma: Invalidate in the RPC reply ... ')
Signed-off-by: Chuck Lever <chuck.lever-QHcLZuEGTsvQT0dZR+AlfA@public.gmane.org>
---
 net/sunrpc/xprtrdma/rpc_rdma.c  |   79 +++++++++++++++++++++++++--------------
 net/sunrpc/xprtrdma/transport.c |    3 +
 net/sunrpc/xprtrdma/verbs.c     |    3 +
 net/sunrpc/xprtrdma/xprt_rdma.h |   30 +++++++++++++++
 4 files changed, 84 insertions(+), 31 deletions(-)

diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index c88132d..b6584ae 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -734,6 +734,9 @@ static bool rpcrdma_results_inline(struct rpcrdma_xprt *r_xprt,
 		rpclen = 0;
 	}
 
+	req->rl_xid = rqst->rq_xid;
+	rpcrdma_insert_req(&r_xprt->rx_buf, req);
+
 	/* This implementation supports the following combinations
 	 * of chunk lists in one RPC-over-RDMA Call message:
 	 *
@@ -987,11 +990,12 @@ static bool rpcrdma_results_inline(struct rpcrdma_xprt *r_xprt,
 {
 	struct rpcrdma_rep *rep =
 			container_of(work, struct rpcrdma_rep, rr_work);
+	struct rpcrdma_xprt *r_xprt = rep->rr_rxprt;
+	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
+	struct rpc_xprt *xprt = &r_xprt->rx_xprt;
 	struct rpcrdma_msg *headerp;
 	struct rpcrdma_req *req;
 	struct rpc_rqst *rqst;
-	struct rpcrdma_xprt *r_xprt = rep->rr_rxprt;
-	struct rpc_xprt *xprt = &r_xprt->rx_xprt;
 	__be32 *iptr;
 	int rdmalen, status, rmerr;
 	unsigned long cwnd;
@@ -1013,28 +1017,45 @@ static bool rpcrdma_results_inline(struct rpcrdma_xprt *r_xprt,
 	/* Match incoming rpcrdma_rep to an rpcrdma_req to
 	 * get context for handling any incoming chunks.
 	 */
-	spin_lock_bh(&xprt->transport_lock);
-	rqst = xprt_lookup_rqst(xprt, headerp->rm_xid);
-	if (!rqst)
+	spin_lock(&buf->rb_lock);
+	req = rpcrdma_lookup_req_locked(&r_xprt->rx_buf,
+					headerp->rm_xid);
+	if (!req)
 		goto out_nomatch;
-
-	req = rpcr_to_rdmar(rqst);
 	if (req->rl_reply)
 		goto out_duplicate;
 
-	/* Sanity checking has passed. We are now committed
-	 * to complete this transaction.
-	 */
 	list_replace_init(&req->rl_registered, &mws);
 	rpcrdma_mark_remote_invalidation(&mws, rep);
-	list_del_init(&rqst->rq_list);
+
+	/* Avoid races with signals and duplicate replies
+	 * by marking this req as matched.
+	 */
 	req->rl_reply = rep;
-	spin_unlock_bh(&xprt->transport_lock);
+	spin_unlock(&buf->rb_lock);
+
 	dprintk("RPC:       %s: reply %p completes request %p (xid 0x%08x)\n",
 		__func__, rep, req, be32_to_cpu(headerp->rm_xid));
 
-	xprt->reestablish_timeout = 0;
+	/* Invalidate and unmap the data payloads before waking the
+	 * waiting application. This guarantees the memory regions
+	 * are properly fenced from the server before the application
+	 * accesses the data. It also ensures proper send flow control:
+	 * waking the next RPC waits until this RPC has relinquished
+	 * all its Send Queue entries.
+	 */
+	if (!list_empty(&mws))
+		r_xprt->rx_ia.ri_ops->ro_unmap_sync(r_xprt, &mws);
 
+	/* Perform XID lookup, reconstruction of the RPC reply, and
+	 * RPC completion while holding the transport lock to ensure
+	 * the rep, rqst, and rq_task pointers remain stable.
+	 */
+	spin_lock_bh(&xprt->transport_lock);
+	rqst = xprt_lookup_rqst(xprt, headerp->rm_xid);
+	if (!rqst)
+		goto out_norqst;
+	xprt->reestablish_timeout = 0;
 	if (headerp->rm_vers != rpcrdma_version)
 		goto out_badversion;
 
@@ -1109,17 +1130,6 @@ static bool rpcrdma_results_inline(struct rpcrdma_xprt *r_xprt,
 	}
 
 out:
-	/* Invalidate and flush the data payloads before waking the
-	 * waiting application. This guarantees the memory region is
-	 * properly fenced from the server before the application
-	 * accesses the data. It also ensures proper send flow
-	 * control: waking the next RPC waits until this RPC has
-	 * relinquished all its Send Queue entries.
-	 */
-	if (!list_empty(&mws))
-		r_xprt->rx_ia.ri_ops->ro_unmap_sync(r_xprt, &mws);
-
-	spin_lock_bh(&xprt->transport_lock);
 	cwnd = xprt->cwnd;
 	xprt->cwnd = atomic_read(&r_xprt->rx_buf.rb_credits) << RPC_CWNDSHIFT;
 	if (xprt->cwnd > cwnd)
@@ -1128,7 +1138,7 @@ static bool rpcrdma_results_inline(struct rpcrdma_xprt *r_xprt,
 	xprt_complete_rqst(rqst->rq_task, status);
 	spin_unlock_bh(&xprt->transport_lock);
 	dprintk("RPC:       %s: xprt_complete_rqst(0x%p, 0x%p, %d)\n",
-			__func__, xprt, rqst, status);
+		__func__, xprt, rqst, status);
 	return;
 
 out_badstatus:
@@ -1177,26 +1187,37 @@ static bool rpcrdma_results_inline(struct rpcrdma_xprt *r_xprt,
 	r_xprt->rx_stats.bad_reply_count++;
 	goto out;
 
-/* If no pending RPC transaction was matched, post a replacement
- * receive buffer before returning.
+/* The req was still available, but by the time the transport_lock
+ * was acquired, the rqst and task had been released. Thus the RPC
+ * has already been terminated.
  */
+out_norqst:
+	spin_unlock_bh(&xprt->transport_lock);
+	rpcrdma_buffer_put(req);
+	dprintk("RPC:       %s: race, no rqst left for req %p\n",
+		__func__, req);
+	return;
+
 out_shortreply:
 	dprintk("RPC:       %s: short/invalid reply\n", __func__);
 	goto repost;
 
 out_nomatch:
-	spin_unlock_bh(&xprt->transport_lock);
+	spin_unlock(&buf->rb_lock);
 	dprintk("RPC:       %s: no match for incoming xid 0x%08x len %d\n",
 		__func__, be32_to_cpu(headerp->rm_xid),
 		rep->rr_len);
 	goto repost;
 
 out_duplicate:
-	spin_unlock_bh(&xprt->transport_lock);
+	spin_unlock(&buf->rb_lock);
 	dprintk("RPC:       %s: "
 		"duplicate reply %p to RPC request %p: xid 0x%08x\n",
 		__func__, rep, req, be32_to_cpu(headerp->rm_xid));
 
+/* If no pending RPC transaction was matched, post a replacement
+ * receive buffer before returning.
+ */
 repost:
 	r_xprt->rx_stats.bad_reply_count++;
 	if (rpcrdma_ep_post_recv(&r_xprt->rx_ia, rep))
diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
index 62ecbcc..d1c458e 100644
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -684,7 +684,8 @@
 
 	dprintk("RPC:       %s: called on 0x%p\n", __func__, req->rl_reply);
 
-	if (unlikely(!list_empty(&req->rl_registered)))
+	rpcrdma_remove_req(&r_xprt->rx_buf, req);
+	if (!list_empty(&req->rl_registered))
 		ia->ri_ops->ro_unmap_safe(r_xprt, req, !RPC_IS_ASYNC(task));
 	rpcrdma_unmap_sges(ia, req);
 	rpcrdma_buffer_put(req);
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index df72224..a215a87 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -1032,6 +1032,7 @@ struct rpcrdma_rep *
 	spin_lock_init(&buf->rb_recovery_lock);
 	INIT_LIST_HEAD(&buf->rb_mws);
 	INIT_LIST_HEAD(&buf->rb_all);
+	INIT_LIST_HEAD(&buf->rb_pending);
 	INIT_LIST_HEAD(&buf->rb_stale_mrs);
 	INIT_DELAYED_WORK(&buf->rb_refresh_worker,
 			  rpcrdma_mr_refresh_worker);
@@ -1084,7 +1085,7 @@ struct rpcrdma_rep *
 
 	req = list_first_entry(&buf->rb_send_bufs,
 			       struct rpcrdma_req, rl_list);
-	list_del(&req->rl_list);
+	list_del_init(&req->rl_list);
 	return req;
 }
 
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index ad918c8..b282d3f 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -341,6 +341,7 @@ enum {
 struct rpcrdma_buffer;
 struct rpcrdma_req {
 	struct list_head	rl_list;
+	__be32			rl_xid;
 	unsigned int		rl_mapped_sges;
 	unsigned int		rl_connect_cookie;
 	struct rpcrdma_buffer	*rl_buffer;
@@ -402,6 +403,7 @@ struct rpcrdma_buffer {
 	int			rb_send_count, rb_recv_count;
 	struct list_head	rb_send_bufs;
 	struct list_head	rb_recv_bufs;
+	struct list_head	rb_pending;
 	u32			rb_max_requests;
 	atomic_t		rb_credits;	/* most recent credit grant */
 
@@ -550,6 +552,34 @@ int rpcrdma_ep_post(struct rpcrdma_ia *, struct rpcrdma_ep *,
 int rpcrdma_buffer_create(struct rpcrdma_xprt *);
 void rpcrdma_buffer_destroy(struct rpcrdma_buffer *);
 
+static inline void
+rpcrdma_insert_req(struct rpcrdma_buffer *buffers, struct rpcrdma_req *req)
+{
+	spin_lock(&buffers->rb_lock);
+	if (list_empty(&req->rl_list))
+		list_add_tail(&req->rl_list, &buffers->rb_pending);
+	spin_unlock(&buffers->rb_lock);
+}
+
+static inline struct rpcrdma_req *
+rpcrdma_lookup_req_locked(struct rpcrdma_buffer *buffers, __be32 xid)
+{
+	struct rpcrdma_req *pos;
+
+	list_for_each_entry(pos, &buffers->rb_pending, rl_list)
+		if (pos->rl_xid == xid)
+			return pos;
+	return NULL;
+}
+
+static inline void
+rpcrdma_remove_req(struct rpcrdma_buffer *buffers, struct rpcrdma_req *req)
+{
+	spin_lock(&buffers->rb_lock);
+	list_del(&req->rl_list);
+	spin_unlock(&buffers->rb_lock);
+}
+
 struct rpcrdma_mw *rpcrdma_get_mw(struct rpcrdma_xprt *);
 void rpcrdma_put_mw(struct rpcrdma_xprt *, struct rpcrdma_mw *);
 struct rpcrdma_req *rpcrdma_buffer_get(struct rpcrdma_buffer *);

--
To unsubscribe from this list: send the line "unsubscribe linux-nfs" in
the body of a message to majordomo-u79uwXL29TY76Z2rM5mHXA@public.gmane.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html

WARNING: multiple messages have this Message-ID (diff)
From: Chuck Lever <chuck.lever@oracle.com>
To: linux-rdma@vger.kernel.org, linux-nfs@vger.kernel.org
Subject: [PATCH v2 05/13] xprtrdma: Fix client lock-up after application signal fires
Date: Thu, 08 Jun 2017 11:52:20 -0400	[thread overview]
Message-ID: <20170608155220.18945.33047.stgit@manet.1015granger.net> (raw)
In-Reply-To: <20170608154339.18945.5500.stgit@manet.1015granger.net>

After a signal, the RPC client aborts synchronous RPCs running on
behalf of the signaled application.

The server is still executing those RPCs, and will write the results
back into the client's memory when it's done. By the time the server
writes the results, that memory is likely being used for other
purposes. Therefore xprtrdma has to immediately invalidate all
memory regions used by those aborted RPCs to prevent the server's
writes from clobbering that re-used memory.

With FMR memory registration, invalidation takes a relatively long
time. In fact, the invalidation is often still running when the
server tries to write the results into the memory regions that are
being invalidated.

This sets up a race between two processes:

1.  After the signal, xprt_rdma_free calls ro_unmap_safe.
2.  While ro_unmap_safe is still running, the server replies and
    rpcrdma_reply_handler runs, calling ro_unmap_sync.

Both processes invoke ib_unmap_fmr on the same FMR.

The mlx4 driver allows two ib_unmap_fmr calls on the same FMR at
the same time, but HCAs generally don't tolerate this. Sometimes
this can result in a system crash.

If the HCA happens to survive, rpcrdma_reply_handler continues. It
removes the rpc_rqst from rq_list and releases the transport_lock.
This enables xprt_rdma_free to run in another process, and the
rpc_rqst is released while rpcrdma_reply_handler is still waiting
for the ib_unmap_fmr call to finish.

But further down in rpcrdma_reply_handler, the transport_lock is
taken again, and "rqst" is dereferenced. If "rqst" has already been
released, this triggers a general protection fault. Since bottom-
halves are disabled, the system locks up.

Address both issues by reversing the order of the xprt_lookup_rqst
call and the ro_unmap_sync call. Introduce a separate lookup
mechanism for rpcrdma_req's to enable calling ro_unmap_sync before
xprt_lookup_rqst. Now the handler takes the transport_lock once
and holds it for the XID lookup and RPC completion.

BugLink: https://bugzilla.linux-nfs.org/show_bug.cgi?id=305
Fixes: 68791649a725 ('xprtrdma: Invalidate in the RPC reply ... ')
Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
---
 net/sunrpc/xprtrdma/rpc_rdma.c  |   79 +++++++++++++++++++++++++--------------
 net/sunrpc/xprtrdma/transport.c |    3 +
 net/sunrpc/xprtrdma/verbs.c     |    3 +
 net/sunrpc/xprtrdma/xprt_rdma.h |   30 +++++++++++++++
 4 files changed, 84 insertions(+), 31 deletions(-)

diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index c88132d..b6584ae 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -734,6 +734,9 @@ static bool rpcrdma_results_inline(struct rpcrdma_xprt *r_xprt,
 		rpclen = 0;
 	}
 
+	req->rl_xid = rqst->rq_xid;
+	rpcrdma_insert_req(&r_xprt->rx_buf, req);
+
 	/* This implementation supports the following combinations
 	 * of chunk lists in one RPC-over-RDMA Call message:
 	 *
@@ -987,11 +990,12 @@ static bool rpcrdma_results_inline(struct rpcrdma_xprt *r_xprt,
 {
 	struct rpcrdma_rep *rep =
 			container_of(work, struct rpcrdma_rep, rr_work);
+	struct rpcrdma_xprt *r_xprt = rep->rr_rxprt;
+	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
+	struct rpc_xprt *xprt = &r_xprt->rx_xprt;
 	struct rpcrdma_msg *headerp;
 	struct rpcrdma_req *req;
 	struct rpc_rqst *rqst;
-	struct rpcrdma_xprt *r_xprt = rep->rr_rxprt;
-	struct rpc_xprt *xprt = &r_xprt->rx_xprt;
 	__be32 *iptr;
 	int rdmalen, status, rmerr;
 	unsigned long cwnd;
@@ -1013,28 +1017,45 @@ static bool rpcrdma_results_inline(struct rpcrdma_xprt *r_xprt,
 	/* Match incoming rpcrdma_rep to an rpcrdma_req to
 	 * get context for handling any incoming chunks.
 	 */
-	spin_lock_bh(&xprt->transport_lock);
-	rqst = xprt_lookup_rqst(xprt, headerp->rm_xid);
-	if (!rqst)
+	spin_lock(&buf->rb_lock);
+	req = rpcrdma_lookup_req_locked(&r_xprt->rx_buf,
+					headerp->rm_xid);
+	if (!req)
 		goto out_nomatch;
-
-	req = rpcr_to_rdmar(rqst);
 	if (req->rl_reply)
 		goto out_duplicate;
 
-	/* Sanity checking has passed. We are now committed
-	 * to complete this transaction.
-	 */
 	list_replace_init(&req->rl_registered, &mws);
 	rpcrdma_mark_remote_invalidation(&mws, rep);
-	list_del_init(&rqst->rq_list);
+
+	/* Avoid races with signals and duplicate replies
+	 * by marking this req as matched.
+	 */
 	req->rl_reply = rep;
-	spin_unlock_bh(&xprt->transport_lock);
+	spin_unlock(&buf->rb_lock);
+
 	dprintk("RPC:       %s: reply %p completes request %p (xid 0x%08x)\n",
 		__func__, rep, req, be32_to_cpu(headerp->rm_xid));
 
-	xprt->reestablish_timeout = 0;
+	/* Invalidate and unmap the data payloads before waking the
+	 * waiting application. This guarantees the memory regions
+	 * are properly fenced from the server before the application
+	 * accesses the data. It also ensures proper send flow control:
+	 * waking the next RPC waits until this RPC has relinquished
+	 * all its Send Queue entries.
+	 */
+	if (!list_empty(&mws))
+		r_xprt->rx_ia.ri_ops->ro_unmap_sync(r_xprt, &mws);
 
+	/* Perform XID lookup, reconstruction of the RPC reply, and
+	 * RPC completion while holding the transport lock to ensure
+	 * the rep, rqst, and rq_task pointers remain stable.
+	 */
+	spin_lock_bh(&xprt->transport_lock);
+	rqst = xprt_lookup_rqst(xprt, headerp->rm_xid);
+	if (!rqst)
+		goto out_norqst;
+	xprt->reestablish_timeout = 0;
 	if (headerp->rm_vers != rpcrdma_version)
 		goto out_badversion;
 
@@ -1109,17 +1130,6 @@ static bool rpcrdma_results_inline(struct rpcrdma_xprt *r_xprt,
 	}
 
 out:
-	/* Invalidate and flush the data payloads before waking the
-	 * waiting application. This guarantees the memory region is
-	 * properly fenced from the server before the application
-	 * accesses the data. It also ensures proper send flow
-	 * control: waking the next RPC waits until this RPC has
-	 * relinquished all its Send Queue entries.
-	 */
-	if (!list_empty(&mws))
-		r_xprt->rx_ia.ri_ops->ro_unmap_sync(r_xprt, &mws);
-
-	spin_lock_bh(&xprt->transport_lock);
 	cwnd = xprt->cwnd;
 	xprt->cwnd = atomic_read(&r_xprt->rx_buf.rb_credits) << RPC_CWNDSHIFT;
 	if (xprt->cwnd > cwnd)
@@ -1128,7 +1138,7 @@ static bool rpcrdma_results_inline(struct rpcrdma_xprt *r_xprt,
 	xprt_complete_rqst(rqst->rq_task, status);
 	spin_unlock_bh(&xprt->transport_lock);
 	dprintk("RPC:       %s: xprt_complete_rqst(0x%p, 0x%p, %d)\n",
-			__func__, xprt, rqst, status);
+		__func__, xprt, rqst, status);
 	return;
 
 out_badstatus:
@@ -1177,26 +1187,37 @@ static bool rpcrdma_results_inline(struct rpcrdma_xprt *r_xprt,
 	r_xprt->rx_stats.bad_reply_count++;
 	goto out;
 
-/* If no pending RPC transaction was matched, post a replacement
- * receive buffer before returning.
+/* The req was still available, but by the time the transport_lock
+ * was acquired, the rqst and task had been released. Thus the RPC
+ * has already been terminated.
  */
+out_norqst:
+	spin_unlock_bh(&xprt->transport_lock);
+	rpcrdma_buffer_put(req);
+	dprintk("RPC:       %s: race, no rqst left for req %p\n",
+		__func__, req);
+	return;
+
 out_shortreply:
 	dprintk("RPC:       %s: short/invalid reply\n", __func__);
 	goto repost;
 
 out_nomatch:
-	spin_unlock_bh(&xprt->transport_lock);
+	spin_unlock(&buf->rb_lock);
 	dprintk("RPC:       %s: no match for incoming xid 0x%08x len %d\n",
 		__func__, be32_to_cpu(headerp->rm_xid),
 		rep->rr_len);
 	goto repost;
 
 out_duplicate:
-	spin_unlock_bh(&xprt->transport_lock);
+	spin_unlock(&buf->rb_lock);
 	dprintk("RPC:       %s: "
 		"duplicate reply %p to RPC request %p: xid 0x%08x\n",
 		__func__, rep, req, be32_to_cpu(headerp->rm_xid));
 
+/* If no pending RPC transaction was matched, post a replacement
+ * receive buffer before returning.
+ */
 repost:
 	r_xprt->rx_stats.bad_reply_count++;
 	if (rpcrdma_ep_post_recv(&r_xprt->rx_ia, rep))
diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
index 62ecbcc..d1c458e 100644
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -684,7 +684,8 @@
 
 	dprintk("RPC:       %s: called on 0x%p\n", __func__, req->rl_reply);
 
-	if (unlikely(!list_empty(&req->rl_registered)))
+	rpcrdma_remove_req(&r_xprt->rx_buf, req);
+	if (!list_empty(&req->rl_registered))
 		ia->ri_ops->ro_unmap_safe(r_xprt, req, !RPC_IS_ASYNC(task));
 	rpcrdma_unmap_sges(ia, req);
 	rpcrdma_buffer_put(req);
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index df72224..a215a87 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -1032,6 +1032,7 @@ struct rpcrdma_rep *
 	spin_lock_init(&buf->rb_recovery_lock);
 	INIT_LIST_HEAD(&buf->rb_mws);
 	INIT_LIST_HEAD(&buf->rb_all);
+	INIT_LIST_HEAD(&buf->rb_pending);
 	INIT_LIST_HEAD(&buf->rb_stale_mrs);
 	INIT_DELAYED_WORK(&buf->rb_refresh_worker,
 			  rpcrdma_mr_refresh_worker);
@@ -1084,7 +1085,7 @@ struct rpcrdma_rep *
 
 	req = list_first_entry(&buf->rb_send_bufs,
 			       struct rpcrdma_req, rl_list);
-	list_del(&req->rl_list);
+	list_del_init(&req->rl_list);
 	return req;
 }
 
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index ad918c8..b282d3f 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -341,6 +341,7 @@ enum {
 struct rpcrdma_buffer;
 struct rpcrdma_req {
 	struct list_head	rl_list;
+	__be32			rl_xid;
 	unsigned int		rl_mapped_sges;
 	unsigned int		rl_connect_cookie;
 	struct rpcrdma_buffer	*rl_buffer;
@@ -402,6 +403,7 @@ struct rpcrdma_buffer {
 	int			rb_send_count, rb_recv_count;
 	struct list_head	rb_send_bufs;
 	struct list_head	rb_recv_bufs;
+	struct list_head	rb_pending;
 	u32			rb_max_requests;
 	atomic_t		rb_credits;	/* most recent credit grant */
 
@@ -550,6 +552,34 @@ int rpcrdma_ep_post(struct rpcrdma_ia *, struct rpcrdma_ep *,
 int rpcrdma_buffer_create(struct rpcrdma_xprt *);
 void rpcrdma_buffer_destroy(struct rpcrdma_buffer *);
 
+static inline void
+rpcrdma_insert_req(struct rpcrdma_buffer *buffers, struct rpcrdma_req *req)
+{
+	spin_lock(&buffers->rb_lock);
+	if (list_empty(&req->rl_list))
+		list_add_tail(&req->rl_list, &buffers->rb_pending);
+	spin_unlock(&buffers->rb_lock);
+}
+
+static inline struct rpcrdma_req *
+rpcrdma_lookup_req_locked(struct rpcrdma_buffer *buffers, __be32 xid)
+{
+	struct rpcrdma_req *pos;
+
+	list_for_each_entry(pos, &buffers->rb_pending, rl_list)
+		if (pos->rl_xid == xid)
+			return pos;
+	return NULL;
+}
+
+static inline void
+rpcrdma_remove_req(struct rpcrdma_buffer *buffers, struct rpcrdma_req *req)
+{
+	spin_lock(&buffers->rb_lock);
+	list_del(&req->rl_list);
+	spin_unlock(&buffers->rb_lock);
+}
+
 struct rpcrdma_mw *rpcrdma_get_mw(struct rpcrdma_xprt *);
 void rpcrdma_put_mw(struct rpcrdma_xprt *, struct rpcrdma_mw *);
 struct rpcrdma_req *rpcrdma_buffer_get(struct rpcrdma_buffer *);


  parent reply	other threads:[~2017-06-08 15:52 UTC|newest]

Thread overview: 36+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2017-06-08 15:51 [PATCH v2 00/13] NFS/RDMA client-side patches proposed for v4.13 Chuck Lever
2017-06-08 15:51 ` Chuck Lever
     [not found] ` <20170608154339.18945.5500.stgit-FYjufvaPoItvLzlybtyyYzGyq/o6K9yX@public.gmane.org>
2017-06-08 15:51   ` [PATCH v2 01/13] xprtrdma: On invalidation failure, remove MWs from rl_registered Chuck Lever
2017-06-08 15:51     ` Chuck Lever
2017-06-08 15:51   ` [PATCH v2 02/13] xprtrdma: Pre-mark remotely invalidated MRs Chuck Lever
2017-06-08 15:51     ` Chuck Lever
2017-06-08 15:52   ` [PATCH v2 03/13] xprtrdma: Pass only the list of registered MRs to ro_unmap_sync Chuck Lever
2017-06-08 15:52     ` Chuck Lever
2017-06-08 15:52   ` [PATCH v2 04/13] xprtrdma: Rename rpcrdma_req::rl_free Chuck Lever
2017-06-08 15:52     ` Chuck Lever
     [not found]     ` <20170608155212.18945.37327.stgit-FYjufvaPoItvLzlybtyyYzGyq/o6K9yX@public.gmane.org>
2017-06-09 18:58       ` Anna Schumaker
2017-06-09 18:58         ` Anna Schumaker
     [not found]         ` <7ee9b1fd-9628-2767-444e-90c6736d63f9-ZwjVKphTwtPQT0dZR+AlfA@public.gmane.org>
2017-06-09 19:03           ` Chuck Lever
2017-06-09 19:03             ` Chuck Lever
     [not found]             ` <5332A4D9-ADC4-4DA4-A20E-ACC6019F83BF-QHcLZuEGTsvQT0dZR+AlfA@public.gmane.org>
2017-06-09 19:12               ` Chuck Lever
2017-06-09 19:12                 ` Chuck Lever
     [not found]                 ` <C0C5BD8B-9D97-48F4-A0AD-C7FC17DA9C00-QHcLZuEGTsvQT0dZR+AlfA@public.gmane.org>
2017-06-09 19:15                   ` Anna Schumaker
2017-06-09 19:15                     ` Anna Schumaker
2017-06-08 15:52   ` Chuck Lever [this message]
2017-06-08 15:52     ` [PATCH v2 05/13] xprtrdma: Fix client lock-up after application signal fires Chuck Lever
2017-06-08 15:52   ` [PATCH v2 06/13] xprtrdma: Fix FRWR invalidation error recovery Chuck Lever
2017-06-08 15:52     ` Chuck Lever
2017-06-08 15:52   ` [PATCH v2 07/13] xprtrdma: Don't defer MR recovery if ro_map fails Chuck Lever
2017-06-08 15:52     ` Chuck Lever
2017-06-08 15:52   ` [PATCH v2 08/13] NFSv4.1: Handle EXCHGID4_FLAG_CONFIRMED_R during NFSv4.1 migration Chuck Lever
2017-06-08 15:52     ` Chuck Lever
2017-06-08 15:52   ` [PATCH v2 09/13] NFSv4.1: Use seqid returned by EXCHANGE_ID after state migration Chuck Lever
2017-06-08 15:52     ` Chuck Lever
2017-06-08 15:53   ` [PATCH v2 10/13] xprtrdma: Demote "connect" log messages Chuck Lever
2017-06-08 15:53     ` Chuck Lever
2017-06-08 15:53   ` [PATCH v2 11/13] xprtrdma: FMR does not need list_del_init() Chuck Lever
2017-06-08 15:53     ` Chuck Lever
2017-06-08 15:53   ` [PATCH v2 12/13] xprtrdma: Replace PAGE_MASK with offset_in_page() Chuck Lever
2017-06-08 15:53     ` Chuck Lever
2017-06-08 15:53   ` [PATCH v2 13/13] xprtrdma: Fix documenting comments in frwr_ops.c Chuck Lever
2017-06-08 15:53     ` Chuck Lever

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20170608155220.18945.33047.stgit@manet.1015granger.net \
    --to=chuck.lever-qhclzuegtsvqt0dzr+alfa@public.gmane.org \
    --cc=linux-nfs-u79uwXL29TY76Z2rM5mHXA@public.gmane.org \
    --cc=linux-rdma-u79uwXL29TY76Z2rM5mHXA@public.gmane.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.