All of lore.kernel.org
 help / color / mirror / Atom feed
* [PATCH for-next v2 0/8] RDMA/rxe: Correct qp reference counting
@ 2023-03-01  4:51 Bob Pearson
  2023-03-01  4:51 ` [PATCH for-next v2 1/8] RDMA/rxe: Convert tasklet args to queue pairs Bob Pearson
                   ` (7 more replies)
  0 siblings, 8 replies; 9+ messages in thread
From: Bob Pearson @ 2023-03-01  4:51 UTC (permalink / raw)
  To: jgg, zyjzyj2000, jhack, matsuda-daisuke, linux-rdma; +Cc: Bob Pearson

This patch series corrects qp reference counting issues
related to deferred execution of tasklets. These issues were
discovered in attempting to resolve soft lockups of the rxe
driver observed by Daisuke Matsuda in a version of the driver
using work queues where the workqueue implementation was based
on the current tasklet based driver. An attempt to find the
root cause of those lockups lead to an error in the tasklet
implementation that has been present since the driver went
upstream. This patch series corrects that error.

With this patch series applied the rxe driver is more stable and
has run the test cases reported by Matsuda for over 24 hours without
errors.

The series also corrects some errors in qp reference counting
related to qp cleanup.

This series depends on the RDMA/rxe: Add error logging to rxe"
series as a prerequisite.

Link: https://lore.kernel.org/linux-rdma/TYCPR01MB845522FD536170D75068DD41E5099@TYCPR01MB8455.jpnprd01.prod.outlook.com/
Signed-off-by: Bob Pearson <rpearsonhpe@gmail.com>

v2:
  This version of this series split off the changes to rxe debug code
  which have been submitted as "RDMA/rxe: Add error logging to rxe".
  One unrelated patch was dropped and other patches earlier included
  in a series to convert from tasklets to workqueues were moved into
  this series because they are relevant both for the tasklet version
  and the workqueue version of the driver.

Bob Pearson (8):
  RDMA/rxe: Convert tasklet args to queue pairs
  RDMA/rxe: warn if refcnt zero in rxe_put
  RDMA/rxe: Cleanup reset state handling in rxe_resp.c
  RDMA/rxe: Cleanup error state handling in rxe_comp.c
  RDMA/rxe: Remove qp reference counting in tasks
  RDMA/rxe: Remove __rxe_do_task()
  RDMA/rxe: Make tasks schedule each other
  RDMA/rxe: Rewrite rxe_task.c

 drivers/infiniband/sw/rxe/rxe.h      |   1 -
 drivers/infiniband/sw/rxe/rxe_comp.c |  71 +++++--
 drivers/infiniband/sw/rxe/rxe_loc.h  |   6 +-
 drivers/infiniband/sw/rxe/rxe_pool.c |   2 +
 drivers/infiniband/sw/rxe/rxe_qp.c   |  56 ++----
 drivers/infiniband/sw/rxe/rxe_req.c  |  12 +-
 drivers/infiniband/sw/rxe/rxe_resp.c | 114 ++++++------
 drivers/infiniband/sw/rxe/rxe_task.c | 268 +++++++++++++++++++++------
 drivers/infiniband/sw/rxe/rxe_task.h |  23 ++-
 9 files changed, 352 insertions(+), 201 deletions(-)


base-commit: bceed5834cd43a0ed67e35ec16197a5c882d3a6d
-- 
2.37.2


^ permalink raw reply	[flat|nested] 9+ messages in thread

* [PATCH for-next v2 1/8] RDMA/rxe: Convert tasklet args to queue pairs
  2023-03-01  4:51 [PATCH for-next v2 0/8] RDMA/rxe: Correct qp reference counting Bob Pearson
@ 2023-03-01  4:51 ` Bob Pearson
  2023-03-01  4:51 ` [PATCH for-next v2 2/8] RDMA/rxe: Warn if refcnt zero in rxe_put Bob Pearson
                   ` (6 subsequent siblings)
  7 siblings, 0 replies; 9+ messages in thread
From: Bob Pearson @ 2023-03-01  4:51 UTC (permalink / raw)
  To: jgg, zyjzyj2000, jhack, matsuda-daisuke, linux-rdma; +Cc: Bob Pearson

Originally is was thought that the tasklet machinery in rxe_task.c
would be used in other applications but that has not happened for
years. This patch replaces the 'void *arg' by struct 'rxe_qp *qp' in
the parameters to the tasklet calls. This change will have no
affect on performance but may make the code a little clearer.

Signed-off-by: Bob Pearson <rpearsonhpe@gmail.com>
---
 drivers/infiniband/sw/rxe/rxe_comp.c |  3 +--
 drivers/infiniband/sw/rxe/rxe_loc.h  |  6 +++---
 drivers/infiniband/sw/rxe/rxe_req.c  |  3 +--
 drivers/infiniband/sw/rxe/rxe_resp.c |  3 +--
 drivers/infiniband/sw/rxe/rxe_task.c | 11 ++++++-----
 drivers/infiniband/sw/rxe/rxe_task.h |  9 +++++----
 6 files changed, 17 insertions(+), 18 deletions(-)

diff --git a/drivers/infiniband/sw/rxe/rxe_comp.c b/drivers/infiniband/sw/rxe/rxe_comp.c
index 876057e3ee3c..cbfa16b3a490 100644
--- a/drivers/infiniband/sw/rxe/rxe_comp.c
+++ b/drivers/infiniband/sw/rxe/rxe_comp.c
@@ -575,9 +575,8 @@ static void free_pkt(struct rxe_pkt_info *pkt)
 	ib_device_put(dev);
 }
 
-int rxe_completer(void *arg)
+int rxe_completer(struct rxe_qp *qp)
 {
-	struct rxe_qp *qp = (struct rxe_qp *)arg;
 	struct rxe_dev *rxe = to_rdev(qp->ibqp.device);
 	struct rxe_send_wqe *wqe = NULL;
 	struct sk_buff *skb = NULL;
diff --git a/drivers/infiniband/sw/rxe/rxe_loc.h b/drivers/infiniband/sw/rxe/rxe_loc.h
index 839de34cf4c9..804b15e929dd 100644
--- a/drivers/infiniband/sw/rxe/rxe_loc.h
+++ b/drivers/infiniband/sw/rxe/rxe_loc.h
@@ -170,9 +170,9 @@ void rxe_srq_cleanup(struct rxe_pool_elem *elem);
 
 void rxe_dealloc(struct ib_device *ib_dev);
 
-int rxe_completer(void *arg);
-int rxe_requester(void *arg);
-int rxe_responder(void *arg);
+int rxe_completer(struct rxe_qp *qp);
+int rxe_requester(struct rxe_qp *qp);
+int rxe_responder(struct rxe_qp *qp);
 
 /* rxe_icrc.c */
 int rxe_icrc_init(struct rxe_dev *rxe);
diff --git a/drivers/infiniband/sw/rxe/rxe_req.c b/drivers/infiniband/sw/rxe/rxe_req.c
index 899c8779f800..f2dc2d191e16 100644
--- a/drivers/infiniband/sw/rxe/rxe_req.c
+++ b/drivers/infiniband/sw/rxe/rxe_req.c
@@ -635,9 +635,8 @@ static int rxe_do_local_ops(struct rxe_qp *qp, struct rxe_send_wqe *wqe)
 	return 0;
 }
 
-int rxe_requester(void *arg)
+int rxe_requester(struct rxe_qp *qp)
 {
-	struct rxe_qp *qp = (struct rxe_qp *)arg;
 	struct rxe_dev *rxe = to_rdev(qp->ibqp.device);
 	struct rxe_pkt_info pkt;
 	struct sk_buff *skb;
diff --git a/drivers/infiniband/sw/rxe/rxe_resp.c b/drivers/infiniband/sw/rxe/rxe_resp.c
index 4217eec03a94..7cb1b962d665 100644
--- a/drivers/infiniband/sw/rxe/rxe_resp.c
+++ b/drivers/infiniband/sw/rxe/rxe_resp.c
@@ -1443,9 +1443,8 @@ static void rxe_drain_req_pkts(struct rxe_qp *qp, bool notify)
 		queue_advance_consumer(q, q->type);
 }
 
-int rxe_responder(void *arg)
+int rxe_responder(struct rxe_qp *qp)
 {
-	struct rxe_qp *qp = (struct rxe_qp *)arg;
 	struct rxe_dev *rxe = to_rdev(qp->ibqp.device);
 	enum resp_states state;
 	struct rxe_pkt_info *pkt = NULL;
diff --git a/drivers/infiniband/sw/rxe/rxe_task.c b/drivers/infiniband/sw/rxe/rxe_task.c
index 60b90e33a884..959cc6229a34 100644
--- a/drivers/infiniband/sw/rxe/rxe_task.c
+++ b/drivers/infiniband/sw/rxe/rxe_task.c
@@ -11,7 +11,7 @@ int __rxe_do_task(struct rxe_task *task)
 {
 	int ret;
 
-	while ((ret = task->func(task->arg)) == 0)
+	while ((ret = task->func(task->qp)) == 0)
 		;
 
 	task->ret = ret;
@@ -29,7 +29,7 @@ static void do_task(struct tasklet_struct *t)
 	int cont;
 	int ret;
 	struct rxe_task *task = from_tasklet(task, t, tasklet);
-	struct rxe_qp *qp = (struct rxe_qp *)task->arg;
+	struct rxe_qp *qp = (struct rxe_qp *)task->qp;
 	unsigned int iterations = RXE_MAX_ITERATIONS;
 
 	spin_lock_bh(&task->lock);
@@ -54,7 +54,7 @@ static void do_task(struct tasklet_struct *t)
 
 	do {
 		cont = 0;
-		ret = task->func(task->arg);
+		ret = task->func(task->qp);
 
 		spin_lock_bh(&task->lock);
 		switch (task->state) {
@@ -91,9 +91,10 @@ static void do_task(struct tasklet_struct *t)
 	task->ret = ret;
 }
 
-int rxe_init_task(struct rxe_task *task, void *arg, int (*func)(void *))
+int rxe_init_task(struct rxe_task *task, struct rxe_qp *qp,
+		  int (*func)(struct rxe_qp *))
 {
-	task->arg	= arg;
+	task->qp	= qp;
 	task->func	= func;
 	task->destroyed	= false;
 
diff --git a/drivers/infiniband/sw/rxe/rxe_task.h b/drivers/infiniband/sw/rxe/rxe_task.h
index 7b88129702ac..41efd5fd49b0 100644
--- a/drivers/infiniband/sw/rxe/rxe_task.h
+++ b/drivers/infiniband/sw/rxe/rxe_task.h
@@ -22,18 +22,19 @@ struct rxe_task {
 	struct tasklet_struct	tasklet;
 	int			state;
 	spinlock_t		lock;
-	void			*arg;
-	int			(*func)(void *arg);
+	struct rxe_qp		*qp;
+	int			(*func)(struct rxe_qp *qp);
 	int			ret;
 	bool			destroyed;
 };
 
 /*
  * init rxe_task structure
- *	arg  => parameter to pass to fcn
+ *	qp  => parameter to pass to func
  *	func => function to call until it returns != 0
  */
-int rxe_init_task(struct rxe_task *task, void *arg, int (*func)(void *));
+int rxe_init_task(struct rxe_task *task, struct rxe_qp *qp,
+		  int (*func)(struct rxe_qp *));
 
 /* cleanup task */
 void rxe_cleanup_task(struct rxe_task *task);
-- 
2.37.2


^ permalink raw reply related	[flat|nested] 9+ messages in thread

* [PATCH for-next v2 2/8] RDMA/rxe: Warn if refcnt zero in rxe_put
  2023-03-01  4:51 [PATCH for-next v2 0/8] RDMA/rxe: Correct qp reference counting Bob Pearson
  2023-03-01  4:51 ` [PATCH for-next v2 1/8] RDMA/rxe: Convert tasklet args to queue pairs Bob Pearson
@ 2023-03-01  4:51 ` Bob Pearson
  2023-03-01  4:51 ` [PATCH for-next v2 3/8] RDMA/rxe: Cleanup reset state handling in rxe_resp.c Bob Pearson
                   ` (5 subsequent siblings)
  7 siblings, 0 replies; 9+ messages in thread
From: Bob Pearson @ 2023-03-01  4:51 UTC (permalink / raw)
  To: jgg, zyjzyj2000, jhack, matsuda-daisuke, linux-rdma; +Cc: Bob Pearson

This patch adds a WARN_ON if the reference count of the object
passed to __rxe_put() is <= 0. This can only happen if there is a
bug in the rxe driver but has bad consequences if there is.

Signed-off-by: Bob Pearson <rpearsonhpe@gmail.com>
---
 drivers/infiniband/sw/rxe/rxe_pool.c | 2 ++
 1 file changed, 2 insertions(+)

diff --git a/drivers/infiniband/sw/rxe/rxe_pool.c b/drivers/infiniband/sw/rxe/rxe_pool.c
index 3f6bd672cc2d..1b160e36b751 100644
--- a/drivers/infiniband/sw/rxe/rxe_pool.c
+++ b/drivers/infiniband/sw/rxe/rxe_pool.c
@@ -244,6 +244,8 @@ int __rxe_get(struct rxe_pool_elem *elem)
 
 int __rxe_put(struct rxe_pool_elem *elem)
 {
+	if (WARN_ON(kref_read(&elem->ref_cnt) <= 0))
+		return 0;
 	return kref_put(&elem->ref_cnt, rxe_elem_release);
 }
 
-- 
2.37.2


^ permalink raw reply related	[flat|nested] 9+ messages in thread

* [PATCH for-next v2 3/8] RDMA/rxe: Cleanup reset state handling in rxe_resp.c
  2023-03-01  4:51 [PATCH for-next v2 0/8] RDMA/rxe: Correct qp reference counting Bob Pearson
  2023-03-01  4:51 ` [PATCH for-next v2 1/8] RDMA/rxe: Convert tasklet args to queue pairs Bob Pearson
  2023-03-01  4:51 ` [PATCH for-next v2 2/8] RDMA/rxe: Warn if refcnt zero in rxe_put Bob Pearson
@ 2023-03-01  4:51 ` Bob Pearson
  2023-03-01  4:51 ` [PATCH for-next v2 4/8] RDMA/rxe: Cleanup error state handling in rxe_comp.c Bob Pearson
                   ` (4 subsequent siblings)
  7 siblings, 0 replies; 9+ messages in thread
From: Bob Pearson @ 2023-03-01  4:51 UTC (permalink / raw)
  To: jgg, zyjzyj2000, jhack, matsuda-daisuke, linux-rdma
  Cc: Bob Pearson, Ian Ziemba

Cleanup the handling of qp in the error state, reset state and
during rxe_qp_do_cleanup. The error state does about the same
thing as the others but has code spread all over.

This patch combines them in a cleaner way.

Signed-off-by: Ian Ziemba <ian.ziemba@hpe.com>
Signed-off-by: Bob Pearson <rpearsonhpe@gmail.com>
---
 drivers/infiniband/sw/rxe/rxe.h      |   1 -
 drivers/infiniband/sw/rxe/rxe_resp.c | 107 ++++++++++++++-------------
 2 files changed, 57 insertions(+), 51 deletions(-)

diff --git a/drivers/infiniband/sw/rxe/rxe.h b/drivers/infiniband/sw/rxe/rxe.h
index bd8a8ea4ea8f..d33dd6cf83d3 100644
--- a/drivers/infiniband/sw/rxe/rxe.h
+++ b/drivers/infiniband/sw/rxe/rxe.h
@@ -133,7 +133,6 @@ enum resp_states {
 	RESPST_ERR_LENGTH,
 	RESPST_ERR_CQ_OVERFLOW,
 	RESPST_ERROR,
-	RESPST_RESET,
 	RESPST_DONE,
 	RESPST_EXIT,
 };
diff --git a/drivers/infiniband/sw/rxe/rxe_resp.c b/drivers/infiniband/sw/rxe/rxe_resp.c
index 7cb1b962d665..8f9bbb14fa7a 100644
--- a/drivers/infiniband/sw/rxe/rxe_resp.c
+++ b/drivers/infiniband/sw/rxe/rxe_resp.c
@@ -42,7 +42,6 @@ static char *resp_state_name[] = {
 	[RESPST_ERR_LENGTH]			= "ERR_LENGTH",
 	[RESPST_ERR_CQ_OVERFLOW]		= "ERR_CQ_OVERFLOW",
 	[RESPST_ERROR]				= "ERROR",
-	[RESPST_RESET]				= "RESET",
 	[RESPST_DONE]				= "DONE",
 	[RESPST_EXIT]				= "EXIT",
 };
@@ -69,17 +68,6 @@ static inline enum resp_states get_req(struct rxe_qp *qp,
 {
 	struct sk_buff *skb;
 
-	if (qp->resp.state == QP_STATE_ERROR) {
-		while ((skb = skb_dequeue(&qp->req_pkts))) {
-			rxe_put(qp);
-			kfree_skb(skb);
-			ib_device_put(qp->ibqp.device);
-		}
-
-		/* go drain recv wr queue */
-		return RESPST_CHK_RESOURCE;
-	}
-
 	skb = skb_peek(&qp->req_pkts);
 	if (!skb)
 		return RESPST_EXIT;
@@ -334,24 +322,6 @@ static enum resp_states check_resource(struct rxe_qp *qp,
 {
 	struct rxe_srq *srq = qp->srq;
 
-	if (qp->resp.state == QP_STATE_ERROR) {
-		if (qp->resp.wqe) {
-			qp->resp.status = IB_WC_WR_FLUSH_ERR;
-			return RESPST_COMPLETE;
-		} else if (!srq) {
-			qp->resp.wqe = queue_head(qp->rq.queue,
-					QUEUE_TYPE_FROM_CLIENT);
-			if (qp->resp.wqe) {
-				qp->resp.status = IB_WC_WR_FLUSH_ERR;
-				return RESPST_COMPLETE;
-			} else {
-				return RESPST_EXIT;
-			}
-		} else {
-			return RESPST_EXIT;
-		}
-	}
-
 	if (pkt->mask & (RXE_READ_OR_ATOMIC_MASK | RXE_ATOMIC_WRITE_MASK)) {
 		/* it is the requesters job to not send
 		 * too many read/atomic ops, we just
@@ -1425,22 +1395,66 @@ static enum resp_states do_class_d1e_error(struct rxe_qp *qp)
 	}
 }
 
-static void rxe_drain_req_pkts(struct rxe_qp *qp, bool notify)
+/* drain incoming request packet queue */
+static void rxe_drain_req_pkts(struct rxe_qp *qp)
 {
 	struct sk_buff *skb;
-	struct rxe_queue *q = qp->rq.queue;
 
 	while ((skb = skb_dequeue(&qp->req_pkts))) {
 		rxe_put(qp);
 		kfree_skb(skb);
 		ib_device_put(qp->ibqp.device);
 	}
+}
+
+/* complete receive wqe with flush error */
+static int complete_flush(struct rxe_qp *qp, struct rxe_recv_wqe *wqe)
+{
+	struct rxe_cqe cqe = {};
+	struct ib_wc *wc = &cqe.ibwc;
+	struct ib_uverbs_wc *uwc = &cqe.uibwc;
+
+	if (qp->rcq->is_user) {
+		uwc->status = IB_WC_WR_FLUSH_ERR;
+		uwc->qp_num = qp_num(qp);
+		uwc->wr_id = wqe->wr_id;
+	} else {
+		wc->status = IB_WC_WR_FLUSH_ERR;
+		wc->qp = &qp->ibqp;
+		wc->wr_id = wqe->wr_id;
+	}
+
+	if (rxe_cq_post(qp->rcq, &cqe, 0))
+		return -ENOMEM;
+
+	return 0;
+}
+
+/* drain and optionally complete the recive queue
+ * if unable to complete a wqe stop completing and
+ * just flush the remaining wqes
+ */
+static void rxe_drain_recv_queue(struct rxe_qp *qp, bool notify)
+{
+	struct rxe_queue *q = qp->rq.queue;
+	struct rxe_recv_wqe *wqe;
+	int err;
 
-	if (notify)
+	if (qp->srq)
 		return;
 
-	while (!qp->srq && q && queue_head(q, q->type))
+	while ((wqe = queue_head(q, q->type))) {
+		if (notify) {
+			err = complete_flush(qp, wqe);
+			if (err) {
+				rxe_dbg_qp(qp, "complete failed for recv wqe");
+				notify = 0;
+			}
+		}
 		queue_advance_consumer(q, q->type);
+	}
+
+	qp->resp.wqe = NULL;
 }
 
 int rxe_responder(struct rxe_qp *qp)
@@ -1453,20 +1467,18 @@ int rxe_responder(struct rxe_qp *qp)
 	if (!rxe_get(qp))
 		return -EAGAIN;
 
-	qp->resp.aeth_syndrome = AETH_ACK_UNLIMITED;
-
-	if (!qp->valid)
+	if (!qp->valid || qp->resp.state == QP_STATE_ERROR ||
+	    qp->resp.state == QP_STATE_RESET) {
+		bool notify = qp->valid &&
+				(qp->resp.state == QP_STATE_ERROR);
+		rxe_drain_req_pkts(qp);
+		rxe_drain_recv_queue(qp, notify);
 		goto exit;
+	}
 
-	switch (qp->resp.state) {
-	case QP_STATE_RESET:
-		state = RESPST_RESET;
-		break;
+	qp->resp.aeth_syndrome = AETH_ACK_UNLIMITED;
 
-	default:
-		state = RESPST_GET_REQ;
-		break;
-	}
+	state = RESPST_GET_REQ;
 
 	while (1) {
 		rxe_dbg_qp(qp, "state = %s\n", resp_state_name[state]);
@@ -1625,11 +1637,6 @@ int rxe_responder(struct rxe_qp *qp)
 
 			goto exit;
 
-		case RESPST_RESET:
-			rxe_drain_req_pkts(qp, false);
-			qp->resp.wqe = NULL;
-			goto exit;
-
 		case RESPST_ERROR:
 			qp->resp.goto_error = 0;
 			rxe_dbg_qp(qp, "moved to error state\n");
-- 
2.37.2


^ permalink raw reply related	[flat|nested] 9+ messages in thread

* [PATCH for-next v2 4/8] RDMA/rxe: Cleanup error state handling in rxe_comp.c
  2023-03-01  4:51 [PATCH for-next v2 0/8] RDMA/rxe: Correct qp reference counting Bob Pearson
                   ` (2 preceding siblings ...)
  2023-03-01  4:51 ` [PATCH for-next v2 3/8] RDMA/rxe: Cleanup reset state handling in rxe_resp.c Bob Pearson
@ 2023-03-01  4:51 ` Bob Pearson
  2023-03-01  4:51 ` [PATCH for-next v2 5/8] RDMA/rxe: Remove qp reference counting in tasks Bob Pearson
                   ` (3 subsequent siblings)
  7 siblings, 0 replies; 9+ messages in thread
From: Bob Pearson @ 2023-03-01  4:51 UTC (permalink / raw)
  To: jgg, zyjzyj2000, jhack, matsuda-daisuke, linux-rdma
  Cc: Bob Pearson, Ian Ziemba

Cleanup the handling of qp in the error state, reset state and
during rxe_qp_do_cleanup. Make the same as rxe_resp.c

Signed-off-by: Ian Ziemba <ian.ziemba@hpe.com>
Signed-off-by: Bob Pearson <rpearsonhpe@gmail.com>
---
 drivers/infiniband/sw/rxe/rxe_comp.c | 55 +++++++++++++++++++++++-----
 drivers/infiniband/sw/rxe/rxe_resp.c | 28 +++++++-------
 2 files changed, 60 insertions(+), 23 deletions(-)

diff --git a/drivers/infiniband/sw/rxe/rxe_comp.c b/drivers/infiniband/sw/rxe/rxe_comp.c
index cbfa16b3a490..ebece584a020 100644
--- a/drivers/infiniband/sw/rxe/rxe_comp.c
+++ b/drivers/infiniband/sw/rxe/rxe_comp.c
@@ -542,25 +542,60 @@ static inline enum comp_state complete_wqe(struct rxe_qp *qp,
 	return COMPST_GET_WQE;
 }
 
-static void rxe_drain_resp_pkts(struct rxe_qp *qp, bool notify)
+/* drain incoming response packet queue */
+static void drain_resp_pkts(struct rxe_qp *qp)
 {
 	struct sk_buff *skb;
-	struct rxe_send_wqe *wqe;
-	struct rxe_queue *q = qp->sq.queue;
 
 	while ((skb = skb_dequeue(&qp->resp_pkts))) {
 		rxe_put(qp);
 		kfree_skb(skb);
 		ib_device_put(qp->ibqp.device);
 	}
+}
+
+/* complete send wqe with flush error */
+static int flush_send_wqe(struct rxe_qp *qp, struct rxe_send_wqe *wqe)
+{
+	struct rxe_cqe cqe = {};
+	struct ib_wc *wc = &cqe.ibwc;
+	struct ib_uverbs_wc *uwc = &cqe.uibwc;
+	int err;
+
+	if (qp->is_user) {
+		uwc->wr_id = wqe->wr.wr_id;
+		uwc->status = wqe->status;
+		uwc->qp_num = qp->ibqp.qp_num;
+	} else {
+		wc->wr_id = wqe->wr.wr_id;
+		wc->status = wqe->status;
+		wc->qp = &qp->ibqp;
+	}
+
+	err = rxe_cq_post(qp->scq, &cqe, 0);
+	if (err)
+		rxe_dbg_cq(qp->scq, "post cq failed, err = %d", err);
+
+	return err;
+}
+
+/* drain and optionally complete the send queue
+ * if unable to complete a wqe stop completing
+ * and flush the remaining wqes
+ */
+static void flush_send_queue(struct rxe_qp *qp, bool notify)
+{
+	struct rxe_send_wqe *wqe;
+	struct rxe_queue *q = qp->sq.queue;
+	int err;
 
 	while ((wqe = queue_head(q, q->type))) {
 		if (notify) {
-			wqe->status = IB_WC_WR_FLUSH_ERR;
-			do_complete(qp, wqe);
-		} else {
-			queue_advance_consumer(q, q->type);
+			err = flush_send_wqe(qp, wqe);
+			if (err)
+				notify = 0;
 		}
+		queue_advance_consumer(q, q->type);
 	}
 }
 
@@ -589,8 +624,10 @@ int rxe_completer(struct rxe_qp *qp)
 
 	if (!qp->valid || qp->comp.state == QP_STATE_ERROR ||
 	    qp->comp.state == QP_STATE_RESET) {
-		rxe_drain_resp_pkts(qp, qp->valid &&
-				    qp->comp.state == QP_STATE_ERROR);
+		bool notify = qp->valid &&
+				(qp->comp.state == QP_STATE_ERROR);
+		drain_resp_pkts(qp);
+		flush_send_queue(qp, notify);
 		goto exit;
 	}
 
diff --git a/drivers/infiniband/sw/rxe/rxe_resp.c b/drivers/infiniband/sw/rxe/rxe_resp.c
index 8f9bbb14fa7a..2f71183449f9 100644
--- a/drivers/infiniband/sw/rxe/rxe_resp.c
+++ b/drivers/infiniband/sw/rxe/rxe_resp.c
@@ -1396,7 +1396,7 @@ static enum resp_states do_class_d1e_error(struct rxe_qp *qp)
 }
 
 /* drain incoming request packet queue */
-static void rxe_drain_req_pkts(struct rxe_qp *qp)
+static void drain_req_pkts(struct rxe_qp *qp)
 {
 	struct sk_buff *skb;
 
@@ -1408,33 +1408,35 @@ static void rxe_drain_req_pkts(struct rxe_qp *qp)
 }
 
 /* complete receive wqe with flush error */
-static int complete_flush(struct rxe_qp *qp, struct rxe_recv_wqe *wqe)
+static int flush_recv_wqe(struct rxe_qp *qp, struct rxe_recv_wqe *wqe)
 {
 	struct rxe_cqe cqe = {};
 	struct ib_wc *wc = &cqe.ibwc;
 	struct ib_uverbs_wc *uwc = &cqe.uibwc;
+	int err;
 
 	if (qp->rcq->is_user) {
+		uwc->wr_id = wqe->wr_id;
 		uwc->status = IB_WC_WR_FLUSH_ERR;
 		uwc->qp_num = qp_num(qp);
-		uwc->wr_id = wqe->wr_id;
 	} else {
+		wc->wr_id = wqe->wr_id;
 		wc->status = IB_WC_WR_FLUSH_ERR;
 		wc->qp = &qp->ibqp;
-		wc->wr_id = wqe->wr_id;
 	}
 
-	if (rxe_cq_post(qp->rcq, &cqe, 0))
-		return -ENOMEM;
+	err = rxe_cq_post(qp->rcq, &cqe, 0);
+	if (err)
+		rxe_dbg_cq(qp->rcq, "post cq failed err = %d", err);
 
-	return 0;
+	return err;
 }
 
 /* drain and optionally complete the recive queue
  * if unable to complete a wqe stop completing and
  * just flush the remaining wqes
  */
-static void rxe_drain_recv_queue(struct rxe_qp *qp, bool notify)
+static void flush_recv_queue(struct rxe_qp *qp, bool notify)
 {
 	struct rxe_queue *q = qp->rq.queue;
 	struct rxe_recv_wqe *wqe;
@@ -1445,11 +1447,9 @@ static void rxe_drain_recv_queue(struct rxe_qp *qp, bool notify)
 
 	while ((wqe = queue_head(q, q->type))) {
 		if (notify) {
-			err = complete_flush(qp, wqe);
-			if (err) {
-				rxe_dbg_qp(qp, "complete failed for recv wqe");
+			err = flush_recv_wqe(qp, wqe);
+			if (err)
 				notify = 0;
-			}
 		}
 		queue_advance_consumer(q, q->type);
 	}
@@ -1471,8 +1471,8 @@ int rxe_responder(struct rxe_qp *qp)
 	    qp->resp.state == QP_STATE_RESET) {
 		bool notify = qp->valid &&
 				(qp->resp.state == QP_STATE_ERROR);
-		rxe_drain_req_pkts(qp);
-		rxe_drain_recv_queue(qp, notify);
+		drain_req_pkts(qp);
+		flush_recv_queue(qp, notify);
 		goto exit;
 	}
 
-- 
2.37.2


^ permalink raw reply related	[flat|nested] 9+ messages in thread

* [PATCH for-next v2 5/8] RDMA/rxe: Remove qp reference counting in tasks
  2023-03-01  4:51 [PATCH for-next v2 0/8] RDMA/rxe: Correct qp reference counting Bob Pearson
                   ` (3 preceding siblings ...)
  2023-03-01  4:51 ` [PATCH for-next v2 4/8] RDMA/rxe: Cleanup error state handling in rxe_comp.c Bob Pearson
@ 2023-03-01  4:51 ` Bob Pearson
  2023-03-01  4:51 ` [PATCH for-next v2 6/8] RDMA/rxe: Remove __rxe_do_task() Bob Pearson
                   ` (2 subsequent siblings)
  7 siblings, 0 replies; 9+ messages in thread
From: Bob Pearson @ 2023-03-01  4:51 UTC (permalink / raw)
  To: jgg, zyjzyj2000, jhack, matsuda-daisuke, linux-rdma; +Cc: Bob Pearson

Currently each of the three tasklets requester, completer and
responder in the rxe driver take and release a reference to the
qp argument at the beginning and end of the subroutines. The
caller passing in the qp argument should be responsible for holding
a reference to qp so these are not required. Further doing so
breaks the qp cleanup code in rxe_qp_do_cleanup which calls these
routines after all the references have been dropped so they cannot
drain the packet and work request queues as intended.

In fact if these routines are deferred by calling tasklet_schedule
there is no guarantee that the calling code does have a qp reference.
That is a bug in rxe_task.c which will be fixed later in this series.

Signed-off-by: Bob Pearson <rpearsonhpe@gmail.com>
---
 drivers/infiniband/sw/rxe/rxe_comp.c | 5 -----
 drivers/infiniband/sw/rxe/rxe_req.c  | 5 -----
 drivers/infiniband/sw/rxe/rxe_resp.c | 4 ----
 3 files changed, 14 deletions(-)

diff --git a/drivers/infiniband/sw/rxe/rxe_comp.c b/drivers/infiniband/sw/rxe/rxe_comp.c
index ebece584a020..fa864c6704ac 100644
--- a/drivers/infiniband/sw/rxe/rxe_comp.c
+++ b/drivers/infiniband/sw/rxe/rxe_comp.c
@@ -619,9 +619,6 @@ int rxe_completer(struct rxe_qp *qp)
 	enum comp_state state;
 	int ret;
 
-	if (!rxe_get(qp))
-		return -EAGAIN;
-
 	if (!qp->valid || qp->comp.state == QP_STATE_ERROR ||
 	    qp->comp.state == QP_STATE_RESET) {
 		bool notify = qp->valid &&
@@ -824,7 +821,5 @@ int rxe_completer(struct rxe_qp *qp)
 out:
 	if (pkt)
 		free_pkt(pkt);
-	rxe_put(qp);
-
 	return ret;
 }
diff --git a/drivers/infiniband/sw/rxe/rxe_req.c b/drivers/infiniband/sw/rxe/rxe_req.c
index f2dc2d191e16..abc65c54bfd6 100644
--- a/drivers/infiniband/sw/rxe/rxe_req.c
+++ b/drivers/infiniband/sw/rxe/rxe_req.c
@@ -653,9 +653,6 @@ int rxe_requester(struct rxe_qp *qp)
 	struct rxe_ah *ah;
 	struct rxe_av *av;
 
-	if (!rxe_get(qp))
-		return -EAGAIN;
-
 	if (unlikely(!qp->valid))
 		goto exit;
 
@@ -844,7 +841,5 @@ int rxe_requester(struct rxe_qp *qp)
 exit:
 	ret = -EAGAIN;
 out:
-	rxe_put(qp);
-
 	return ret;
 }
diff --git a/drivers/infiniband/sw/rxe/rxe_resp.c b/drivers/infiniband/sw/rxe/rxe_resp.c
index 2f71183449f9..01e3cbea8445 100644
--- a/drivers/infiniband/sw/rxe/rxe_resp.c
+++ b/drivers/infiniband/sw/rxe/rxe_resp.c
@@ -1464,9 +1464,6 @@ int rxe_responder(struct rxe_qp *qp)
 	struct rxe_pkt_info *pkt = NULL;
 	int ret;
 
-	if (!rxe_get(qp))
-		return -EAGAIN;
-
 	if (!qp->valid || qp->resp.state == QP_STATE_ERROR ||
 	    qp->resp.state == QP_STATE_RESET) {
 		bool notify = qp->valid &&
@@ -1658,6 +1655,5 @@ int rxe_responder(struct rxe_qp *qp)
 exit:
 	ret = -EAGAIN;
 out:
-	rxe_put(qp);
 	return ret;
 }
-- 
2.37.2


^ permalink raw reply related	[flat|nested] 9+ messages in thread

* [PATCH for-next v2 6/8] RDMA/rxe: Remove __rxe_do_task()
  2023-03-01  4:51 [PATCH for-next v2 0/8] RDMA/rxe: Correct qp reference counting Bob Pearson
                   ` (4 preceding siblings ...)
  2023-03-01  4:51 ` [PATCH for-next v2 5/8] RDMA/rxe: Remove qp reference counting in tasks Bob Pearson
@ 2023-03-01  4:51 ` Bob Pearson
  2023-03-01  4:51 ` [PATCH for-next v2 7/8] RDMA/rxe: Make tasks schedule each other Bob Pearson
  2023-03-01  4:51 ` [PATCH for-next v2 8/8] RDMA/rxe: Rewrite rxe_task.c Bob Pearson
  7 siblings, 0 replies; 9+ messages in thread
From: Bob Pearson @ 2023-03-01  4:51 UTC (permalink / raw)
  To: jgg, zyjzyj2000, jhack, matsuda-daisuke, linux-rdma
  Cc: Bob Pearson, Ian Ziemba

The subroutine __rxe_do_task is not thread safe and it has
no way to guarantee that the tasks, which are designed
with the assumption that they are non-reentrant, are not
reentered. All of its uses are non-performance critical.

This patch replaces calls to __rxe_do_task with calls to
rxe_sched_task. It also removes irrelevant or unneeded
if tests.

Instead of calling the task machinery a single call to
the tasklet function (rxe_requester, etc.) is sufficient
to draing the queues if task execution has been disabled
or stopped.

Together these changes allow the removal of __rxe_do_task.

Signed-off-by: Ian Ziemba <ian.ziemba@hpe.com>
Signed-off-by: Bob Pearson <rpearsonhpe@gmail.com>
---
 drivers/infiniband/sw/rxe/rxe_qp.c   | 56 +++++++++-------------------
 drivers/infiniband/sw/rxe/rxe_task.c | 13 -------
 drivers/infiniband/sw/rxe/rxe_task.h |  6 ---
 3 files changed, 17 insertions(+), 58 deletions(-)

diff --git a/drivers/infiniband/sw/rxe/rxe_qp.c b/drivers/infiniband/sw/rxe/rxe_qp.c
index c954dd9394ba..49891f8ed4e6 100644
--- a/drivers/infiniband/sw/rxe/rxe_qp.c
+++ b/drivers/infiniband/sw/rxe/rxe_qp.c
@@ -473,29 +473,23 @@ static void rxe_qp_reset(struct rxe_qp *qp)
 {
 	/* stop tasks from running */
 	rxe_disable_task(&qp->resp.task);
-
-	/* stop request/comp */
-	if (qp->sq.queue) {
-		if (qp_type(qp) == IB_QPT_RC)
-			rxe_disable_task(&qp->comp.task);
-		rxe_disable_task(&qp->req.task);
-	}
+	rxe_disable_task(&qp->comp.task);
+	rxe_disable_task(&qp->req.task);
 
 	/* move qp to the reset state */
 	qp->req.state = QP_STATE_RESET;
 	qp->comp.state = QP_STATE_RESET;
 	qp->resp.state = QP_STATE_RESET;
 
-	/* let state machines reset themselves drain work and packet queues
-	 * etc.
-	 */
-	__rxe_do_task(&qp->resp.task);
+	/* drain work and packet queuesc */
+	rxe_requester(qp);
+	rxe_completer(qp);
+	rxe_responder(qp);
 
-	if (qp->sq.queue) {
-		__rxe_do_task(&qp->comp.task);
-		__rxe_do_task(&qp->req.task);
+	if (qp->rq.queue)
+		rxe_queue_reset(qp->rq.queue);
+	if (qp->sq.queue)
 		rxe_queue_reset(qp->sq.queue);
-	}
 
 	/* cleanup attributes */
 	atomic_set(&qp->ssn, 0);
@@ -518,13 +512,8 @@ static void rxe_qp_reset(struct rxe_qp *qp)
 
 	/* reenable tasks */
 	rxe_enable_task(&qp->resp.task);
-
-	if (qp->sq.queue) {
-		if (qp_type(qp) == IB_QPT_RC)
-			rxe_enable_task(&qp->comp.task);
-
-		rxe_enable_task(&qp->req.task);
-	}
+	rxe_enable_task(&qp->comp.task);
+	rxe_enable_task(&qp->req.task);
 }
 
 /* drain the send queue */
@@ -533,10 +522,7 @@ static void rxe_qp_drain(struct rxe_qp *qp)
 	if (qp->sq.queue) {
 		if (qp->req.state != QP_STATE_DRAINED) {
 			qp->req.state = QP_STATE_DRAIN;
-			if (qp_type(qp) == IB_QPT_RC)
-				rxe_sched_task(&qp->comp.task);
-			else
-				__rxe_do_task(&qp->comp.task);
+			rxe_sched_task(&qp->comp.task);
 			rxe_sched_task(&qp->req.task);
 		}
 	}
@@ -552,11 +538,7 @@ void rxe_qp_error(struct rxe_qp *qp)
 
 	/* drain work and packet queues */
 	rxe_sched_task(&qp->resp.task);
-
-	if (qp_type(qp) == IB_QPT_RC)
-		rxe_sched_task(&qp->comp.task);
-	else
-		__rxe_do_task(&qp->comp.task);
+	rxe_sched_task(&qp->comp.task);
 	rxe_sched_task(&qp->req.task);
 }
 
@@ -773,24 +755,20 @@ static void rxe_qp_do_cleanup(struct work_struct *work)
 
 	qp->valid = 0;
 	qp->qp_timeout_jiffies = 0;
-	rxe_cleanup_task(&qp->resp.task);
 
 	if (qp_type(qp) == IB_QPT_RC) {
 		del_timer_sync(&qp->retrans_timer);
 		del_timer_sync(&qp->rnr_nak_timer);
 	}
 
+	rxe_cleanup_task(&qp->resp.task);
 	rxe_cleanup_task(&qp->req.task);
 	rxe_cleanup_task(&qp->comp.task);
 
 	/* flush out any receive wr's or pending requests */
-	if (qp->req.task.func)
-		__rxe_do_task(&qp->req.task);
-
-	if (qp->sq.queue) {
-		__rxe_do_task(&qp->comp.task);
-		__rxe_do_task(&qp->req.task);
-	}
+	rxe_requester(qp);
+	rxe_completer(qp);
+	rxe_responder(qp);
 
 	if (qp->sq.queue)
 		rxe_queue_cleanup(qp->sq.queue);
diff --git a/drivers/infiniband/sw/rxe/rxe_task.c b/drivers/infiniband/sw/rxe/rxe_task.c
index 959cc6229a34..a67f48545443 100644
--- a/drivers/infiniband/sw/rxe/rxe_task.c
+++ b/drivers/infiniband/sw/rxe/rxe_task.c
@@ -6,19 +6,6 @@
 
 #include "rxe.h"
 
-int __rxe_do_task(struct rxe_task *task)
-
-{
-	int ret;
-
-	while ((ret = task->func(task->qp)) == 0)
-		;
-
-	task->ret = ret;
-
-	return ret;
-}
-
 /*
  * this locking is due to a potential race where
  * a second caller finds the task already running
diff --git a/drivers/infiniband/sw/rxe/rxe_task.h b/drivers/infiniband/sw/rxe/rxe_task.h
index 41efd5fd49b0..99585e40cef9 100644
--- a/drivers/infiniband/sw/rxe/rxe_task.h
+++ b/drivers/infiniband/sw/rxe/rxe_task.h
@@ -39,12 +39,6 @@ int rxe_init_task(struct rxe_task *task, struct rxe_qp *qp,
 /* cleanup task */
 void rxe_cleanup_task(struct rxe_task *task);
 
-/*
- * raw call to func in loop without any checking
- * can call when tasklets are disabled
- */
-int __rxe_do_task(struct rxe_task *task);
-
 void rxe_run_task(struct rxe_task *task);
 
 void rxe_sched_task(struct rxe_task *task);
-- 
2.37.2


^ permalink raw reply related	[flat|nested] 9+ messages in thread

* [PATCH for-next v2 7/8] RDMA/rxe: Make tasks schedule each other
  2023-03-01  4:51 [PATCH for-next v2 0/8] RDMA/rxe: Correct qp reference counting Bob Pearson
                   ` (5 preceding siblings ...)
  2023-03-01  4:51 ` [PATCH for-next v2 6/8] RDMA/rxe: Remove __rxe_do_task() Bob Pearson
@ 2023-03-01  4:51 ` Bob Pearson
  2023-03-01  4:51 ` [PATCH for-next v2 8/8] RDMA/rxe: Rewrite rxe_task.c Bob Pearson
  7 siblings, 0 replies; 9+ messages in thread
From: Bob Pearson @ 2023-03-01  4:51 UTC (permalink / raw)
  To: jgg, zyjzyj2000, jhack, matsuda-daisuke, linux-rdma
  Cc: Bob Pearson, Ian Ziemba

Replace rxe_run_task() by rxe_sched_task() when tasks call each other.
These are not performance critical and mainly involve error paths but
they run the risk of causing deadlocks.

Signed-off-by: Ian Ziemba <ian.ziemba@hpe.com>
Signed-off-by: Bob Pearson <rpearsonhpe@gmail.com>
---
 drivers/infiniband/sw/rxe/rxe_comp.c | 8 ++++----
 drivers/infiniband/sw/rxe/rxe_req.c  | 4 ++--
 2 files changed, 6 insertions(+), 6 deletions(-)

diff --git a/drivers/infiniband/sw/rxe/rxe_comp.c b/drivers/infiniband/sw/rxe/rxe_comp.c
index fa864c6704ac..6371ab140ad9 100644
--- a/drivers/infiniband/sw/rxe/rxe_comp.c
+++ b/drivers/infiniband/sw/rxe/rxe_comp.c
@@ -322,7 +322,7 @@ static inline enum comp_state check_ack(struct rxe_qp *qp,
 					qp->comp.psn = pkt->psn;
 					if (qp->req.wait_psn) {
 						qp->req.wait_psn = 0;
-						rxe_run_task(&qp->req.task);
+						rxe_sched_task(&qp->req.task);
 					}
 				}
 				return COMPST_ERROR_RETRY;
@@ -473,7 +473,7 @@ static void do_complete(struct rxe_qp *qp, struct rxe_send_wqe *wqe)
 	 */
 	if (qp->req.wait_fence) {
 		qp->req.wait_fence = 0;
-		rxe_run_task(&qp->req.task);
+		rxe_sched_task(&qp->req.task);
 	}
 }
 
@@ -487,7 +487,7 @@ static inline enum comp_state complete_ack(struct rxe_qp *qp,
 		if (qp->req.need_rd_atomic) {
 			qp->comp.timeout_retry = 0;
 			qp->req.need_rd_atomic = 0;
-			rxe_run_task(&qp->req.task);
+			rxe_sched_task(&qp->req.task);
 		}
 	}
 
@@ -767,7 +767,7 @@ int rxe_completer(struct rxe_qp *qp)
 							RXE_CNT_COMP_RETRY);
 					qp->req.need_retry = 1;
 					qp->comp.started_retry = 1;
-					rxe_run_task(&qp->req.task);
+					rxe_sched_task(&qp->req.task);
 				}
 				goto done;
 
diff --git a/drivers/infiniband/sw/rxe/rxe_req.c b/drivers/infiniband/sw/rxe/rxe_req.c
index abc65c54bfd6..745731140a54 100644
--- a/drivers/infiniband/sw/rxe/rxe_req.c
+++ b/drivers/infiniband/sw/rxe/rxe_req.c
@@ -753,7 +753,7 @@ int rxe_requester(struct rxe_qp *qp)
 						       qp->req.wqe_index);
 			wqe->state = wqe_state_done;
 			wqe->status = IB_WC_SUCCESS;
-			rxe_run_task(&qp->comp.task);
+			rxe_sched_task(&qp->comp.task);
 			goto done;
 		}
 		payload = mtu;
@@ -837,7 +837,7 @@ int rxe_requester(struct rxe_qp *qp)
 	qp->req.wqe_index = queue_next_index(qp->sq.queue, qp->req.wqe_index);
 	wqe->state = wqe_state_error;
 	qp->req.state = QP_STATE_ERROR;
-	rxe_run_task(&qp->comp.task);
+	rxe_sched_task(&qp->comp.task);
 exit:
 	ret = -EAGAIN;
 out:
-- 
2.37.2


^ permalink raw reply related	[flat|nested] 9+ messages in thread

* [PATCH for-next v2 8/8] RDMA/rxe: Rewrite rxe_task.c
  2023-03-01  4:51 [PATCH for-next v2 0/8] RDMA/rxe: Correct qp reference counting Bob Pearson
                   ` (6 preceding siblings ...)
  2023-03-01  4:51 ` [PATCH for-next v2 7/8] RDMA/rxe: Make tasks schedule each other Bob Pearson
@ 2023-03-01  4:51 ` Bob Pearson
  7 siblings, 0 replies; 9+ messages in thread
From: Bob Pearson @ 2023-03-01  4:51 UTC (permalink / raw)
  To: jgg, zyjzyj2000, jhack, matsuda-daisuke, linux-rdma; +Cc: Bob Pearson

This patch is a major rewrite of the tasklet routines in rxe_task.c.
The main motivation for this is the realization that the code
violates the safety of the qp pointer by correct reference counting.
When a tasklet is scheduled from a verbs API the calling thread
has a valid reference to the qp and schedules the tasklet to run
at a later time carrying a pointer to the qp. Once the calling
code returns however the qp can be destroyed at any time. In order
to correct this a reference to the qp must be taken when the task
is scheduled and held until it finishes running. This is complicated
by the tasklet library not alwys running a task that is scheduled
depending on whether someone else has scheduled it.

This patch moves the logic for deciding whether to run or schedule
a task outside of do_task() and guarantees that there is only
one copy of the task scheduled or running at a time.

Secondly the separate flags controlling teardown and draining of
the task are included in the task state machine and all references
to the state are protected by spinlocks to avoid consistency and
memory barrier issues.

Signed-off-by: Bob Pearson <rpearsonhpe@gmail.com>
---
 drivers/infiniband/sw/rxe/rxe_task.c | 266 +++++++++++++++++++++------
 drivers/infiniband/sw/rxe/rxe_task.h |   8 +-
 2 files changed, 218 insertions(+), 56 deletions(-)

diff --git a/drivers/infiniband/sw/rxe/rxe_task.c b/drivers/infiniband/sw/rxe/rxe_task.c
index a67f48545443..fea9a517c8d9 100644
--- a/drivers/infiniband/sw/rxe/rxe_task.c
+++ b/drivers/infiniband/sw/rxe/rxe_task.c
@@ -6,56 +6,128 @@
 
 #include "rxe.h"
 
-/*
- * this locking is due to a potential race where
- * a second caller finds the task already running
- * but looks just after the last call to func
+/* Check if task is idle i.e. not running, not scheduled in
+ * tasklet queue and not draining. If so move to busy to
+ * reserve a slot in do_task() by setting to busy and taking
+ * a qp reference to cover the gap from now until the task finishes.
+ * state will move out of busy if task returns a non zero value
+ * in do_task(). If state is already busy it is raised to armed
+ * to indicate to do_task that additional pass should be made
+ * over the task.
+ * Context: caller should hold task->lock.
+ * Returns: true if state transitioned from idle to busy else false.
+ */
+static bool __reserve_if_idle(struct rxe_task *task)
+{
+	WARN_ON(rxe_read(task->qp) <= 0);
+
+	if (task->tasklet.state & TASKLET_STATE_SCHED)
+		return false;
+
+	if (task->state == TASK_STATE_IDLE) {
+		rxe_get(task->qp);
+		task->state = TASK_STATE_BUSY;
+		task->num_sched++;
+		return true;
+	}
+
+	if (task->state == TASK_STATE_BUSY)
+		task->state = TASK_STATE_ARMED;
+
+	return false;
+}
+
+/* check if task is idle or drained and not currently
+ * scheduled in the tasklet queue. This routine is
+ * called by rxe_cleanup_task or rxe_disable_task to
+ * see if the queue is empty.
+ * Context: caller should hold task->lock.
+ * Returns true if done else false.
+ */
+static bool __is_done(struct rxe_task *task)
+{
+	if (task->tasklet.state & TASKLET_STATE_SCHED)
+		return false;
+
+	if (task->state == TASK_STATE_IDLE ||
+	    task->state == TASK_STATE_DRAINED) {
+		return true;
+	}
+
+	return false;
+}
+
+/* a locked version of __is_done */
+static bool is_done(struct rxe_task *task)
+{
+	unsigned long flags;
+	int done;
+
+	spin_lock_irqsave(&task->lock, flags);
+	done = __is_done(task);
+	spin_unlock_irqrestore(&task->lock, flags);
+
+	return done;
+}
+
+/* do_task is a wrapper for the three tasks (requester,
+ * completer, responder) and calls them in a loop until
+ * they return a non-zero value. It is called either
+ * directly by rxe_run_task or indirectly if rxe_sched_task
+ * schedules the task. They must call __reserve_if_idle to
+ * move the task to busy before calling or scheduling.
+ * The task can also be moved to drained or invalid
+ * by calls to rxe-cleanup_task or rxe_disable_task.
+ * In that case tasks which get here are not executed but
+ * just flushed. The tasks are designed to look to see if
+ * there is work to do and do part of it before returning
+ * here with a return value of zero until all the work
+ * has been consumed then it retuens a non-zero value.
+ * The number of times the task can be run is limited by
+ * max iterations so one task cannot hold the cpu forever.
  */
 static void do_task(struct tasklet_struct *t)
 {
 	int cont;
 	int ret;
 	struct rxe_task *task = from_tasklet(task, t, tasklet);
-	struct rxe_qp *qp = (struct rxe_qp *)task->qp;
-	unsigned int iterations = RXE_MAX_ITERATIONS;
+	unsigned int iterations;
+	unsigned long flags;
+	int resched = 0;
 
-	spin_lock_bh(&task->lock);
-	switch (task->state) {
-	case TASK_STATE_START:
-		task->state = TASK_STATE_BUSY;
-		spin_unlock_bh(&task->lock);
-		break;
-
-	case TASK_STATE_BUSY:
-		task->state = TASK_STATE_ARMED;
-		fallthrough;
-	case TASK_STATE_ARMED:
-		spin_unlock_bh(&task->lock);
-		return;
+	WARN_ON(rxe_read(task->qp) <= 0);
 
-	default:
-		spin_unlock_bh(&task->lock);
-		rxe_dbg_qp(qp, "failed with bad state %d\n", task->state);
+	spin_lock_irqsave(&task->lock, flags);
+	if (task->state >= TASK_STATE_DRAINED) {
+		rxe_put(task->qp);
+		task->num_done++;
+		spin_unlock_irqrestore(&task->lock, flags);
 		return;
 	}
+	spin_unlock_irqrestore(&task->lock, flags);
 
 	do {
+		iterations = RXE_MAX_ITERATIONS;
 		cont = 0;
-		ret = task->func(task->qp);
 
-		spin_lock_bh(&task->lock);
+		do {
+			ret = task->func(task->qp);
+		} while (ret == 0 && iterations-- > 0);
+
+		spin_lock_irqsave(&task->lock, flags);
 		switch (task->state) {
 		case TASK_STATE_BUSY:
 			if (ret) {
-				task->state = TASK_STATE_START;
-			} else if (iterations--) {
-				cont = 1;
+				task->state = TASK_STATE_IDLE;
 			} else {
-				/* reschedule the tasklet and exit
+				/* This can happen if the client
+				 * can add work faster than the
+				 * tasklet can finish it.
+				 * Reschedule the tasklet and exit
 				 * the loop to give up the cpu
 				 */
-				tasklet_schedule(&task->tasklet);
-				task->state = TASK_STATE_START;
+				task->state = TASK_STATE_IDLE;
+				resched = 1;
 			}
 			break;
 
@@ -68,72 +140,158 @@ static void do_task(struct tasklet_struct *t)
 			cont = 1;
 			break;
 
+		case TASK_STATE_DRAINING:
+			if (ret)
+				task->state = TASK_STATE_DRAINED;
+			else
+				cont = 1;
+			break;
+
 		default:
-			rxe_dbg_qp(qp, "failed with bad state %d\n",
-					task->state);
+			WARN_ON(1);
+			rxe_info_qp(task->qp, "unexpected task state = %d", task->state);
+		}
+
+		if (!cont) {
+			task->num_done++;
+			if (WARN_ON(task->num_done != task->num_sched))
+				rxe_err_qp(task->qp, "%ld tasks scheduled, %ld tasks done",
+					   task->num_sched, task->num_done);
 		}
-		spin_unlock_bh(&task->lock);
+		spin_unlock_irqrestore(&task->lock, flags);
 	} while (cont);
 
 	task->ret = ret;
+
+	if (resched)
+		rxe_sched_task(task);
+
+	rxe_put(task->qp);
 }
 
 int rxe_init_task(struct rxe_task *task, struct rxe_qp *qp,
 		  int (*func)(struct rxe_qp *))
 {
-	task->qp	= qp;
-	task->func	= func;
-	task->destroyed	= false;
+	WARN_ON(rxe_read(qp) <= 0);
+
+	task->qp = qp;
+	task->func = func;
 
 	tasklet_setup(&task->tasklet, do_task);
 
-	task->state = TASK_STATE_START;
+	task->state = TASK_STATE_IDLE;
 	spin_lock_init(&task->lock);
 
 	return 0;
 }
 
+/* rxe_cleanup_task is only called from rxe_do_qp_cleanup in
+ * process context. The qp is already completed with no
+ * remaining references. Once the queue is drained the
+ * task is moved to invalid and returns. The qp cleanup
+ * code then calls the task functions directly without
+ * using the task struct to drain any late arriving packets
+ * or work requests.
+ */
 void rxe_cleanup_task(struct rxe_task *task)
 {
-	bool idle;
+	unsigned long flags;
 
-	/*
-	 * Mark the task, then wait for it to finish. It might be
-	 * running in a non-tasklet (direct call) context.
-	 */
-	task->destroyed = true;
+	spin_lock_irqsave(&task->lock, flags);
+	if (!__is_done(task) && task->state < TASK_STATE_DRAINED) {
+		task->state = TASK_STATE_DRAINING;
+	} else {
+		task->state = TASK_STATE_INVALID;
+		spin_unlock_irqrestore(&task->lock, flags);
+		return;
+	}
+	spin_unlock_irqrestore(&task->lock, flags);
 
-	do {
-		spin_lock_bh(&task->lock);
-		idle = (task->state == TASK_STATE_START);
-		spin_unlock_bh(&task->lock);
-	} while (!idle);
+	/* now the task cannot be scheduled or run just wait
+	 * for the previously scheduled tasks to finish.
+	 */
+	while (!is_done(task))
+		cond_resched();
 
 	tasklet_kill(&task->tasklet);
+
+	spin_lock_irqsave(&task->lock, flags);
+	task->state = TASK_STATE_INVALID;
+	spin_unlock_irqrestore(&task->lock, flags);
 }
 
+/* run the task inline if it is currently idle
+ * cannot call do_task holding the lock
+ */
 void rxe_run_task(struct rxe_task *task)
 {
-	if (task->destroyed)
-		return;
+	unsigned long flags;
+	int run;
+
+	WARN_ON(rxe_read(task->qp) <= 0);
 
-	do_task(&task->tasklet);
+	spin_lock_irqsave(&task->lock, flags);
+	run = __reserve_if_idle(task);
+	spin_unlock_irqrestore(&task->lock, flags);
+
+	if (run)
+		do_task(&task->tasklet);
 }
 
+/* schedule the task to run later as a tasklet.
+ * the tasklet)schedule call can be called holding
+ * the lock.
+ */
 void rxe_sched_task(struct rxe_task *task)
 {
-	if (task->destroyed)
-		return;
+	unsigned long flags;
+
+	WARN_ON(rxe_read(task->qp) <= 0);
 
-	tasklet_schedule(&task->tasklet);
+	spin_lock_irqsave(&task->lock, flags);
+	if (__reserve_if_idle(task))
+		tasklet_schedule(&task->tasklet);
+	spin_unlock_irqrestore(&task->lock, flags);
 }
 
+/* rxe_disable/enable_task are only called from
+ * rxe_modify_qp in process context. Task is moved
+ * to the drained state by do_task.
+ */
 void rxe_disable_task(struct rxe_task *task)
 {
+	unsigned long flags;
+
+	WARN_ON(rxe_read(task->qp) <= 0);
+
+	spin_lock_irqsave(&task->lock, flags);
+	if (!__is_done(task) && task->state < TASK_STATE_DRAINED) {
+		task->state = TASK_STATE_DRAINING;
+	} else {
+		task->state = TASK_STATE_DRAINED;
+		spin_unlock_irqrestore(&task->lock, flags);
+		return;
+	}
+	spin_unlock_irqrestore(&task->lock, flags);
+
+	while (!is_done(task))
+		cond_resched();
+
 	tasklet_disable(&task->tasklet);
 }
 
 void rxe_enable_task(struct rxe_task *task)
 {
+	unsigned long flags;
+
+	WARN_ON(rxe_read(task->qp) <= 0);
+
+	spin_lock_irqsave(&task->lock, flags);
+	if (task->state == TASK_STATE_INVALID) {
+		spin_unlock_irqrestore(&task->lock, flags);
+		return;
+	}
+	task->state = TASK_STATE_IDLE;
 	tasklet_enable(&task->tasklet);
+	spin_unlock_irqrestore(&task->lock, flags);
 }
diff --git a/drivers/infiniband/sw/rxe/rxe_task.h b/drivers/infiniband/sw/rxe/rxe_task.h
index 99585e40cef9..facb7c8e3729 100644
--- a/drivers/infiniband/sw/rxe/rxe_task.h
+++ b/drivers/infiniband/sw/rxe/rxe_task.h
@@ -8,9 +8,12 @@
 #define RXE_TASK_H
 
 enum {
-	TASK_STATE_START	= 0,
+	TASK_STATE_IDLE		= 0,
 	TASK_STATE_BUSY		= 1,
 	TASK_STATE_ARMED	= 2,
+	TASK_STATE_DRAINING	= 3,
+	TASK_STATE_DRAINED	= 4,
+	TASK_STATE_INVALID	= 5,
 };
 
 /*
@@ -25,7 +28,8 @@ struct rxe_task {
 	struct rxe_qp		*qp;
 	int			(*func)(struct rxe_qp *qp);
 	int			ret;
-	bool			destroyed;
+	long			num_sched;
+	long			num_done;
 };
 
 /*
-- 
2.37.2


^ permalink raw reply related	[flat|nested] 9+ messages in thread

end of thread, other threads:[~2023-03-01  4:52 UTC | newest]

Thread overview: 9+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2023-03-01  4:51 [PATCH for-next v2 0/8] RDMA/rxe: Correct qp reference counting Bob Pearson
2023-03-01  4:51 ` [PATCH for-next v2 1/8] RDMA/rxe: Convert tasklet args to queue pairs Bob Pearson
2023-03-01  4:51 ` [PATCH for-next v2 2/8] RDMA/rxe: Warn if refcnt zero in rxe_put Bob Pearson
2023-03-01  4:51 ` [PATCH for-next v2 3/8] RDMA/rxe: Cleanup reset state handling in rxe_resp.c Bob Pearson
2023-03-01  4:51 ` [PATCH for-next v2 4/8] RDMA/rxe: Cleanup error state handling in rxe_comp.c Bob Pearson
2023-03-01  4:51 ` [PATCH for-next v2 5/8] RDMA/rxe: Remove qp reference counting in tasks Bob Pearson
2023-03-01  4:51 ` [PATCH for-next v2 6/8] RDMA/rxe: Remove __rxe_do_task() Bob Pearson
2023-03-01  4:51 ` [PATCH for-next v2 7/8] RDMA/rxe: Make tasks schedule each other Bob Pearson
2023-03-01  4:51 ` [PATCH for-next v2 8/8] RDMA/rxe: Rewrite rxe_task.c Bob Pearson

This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.