ceph-devel.vger.kernel.org archive mirror
 help / color / mirror / Atom feed
* [RFC PATCH 0/2] libceph: submit new messages under spinlock instead of mutex
@ 2021-09-15 13:26 Jeff Layton
  2021-09-15 13:26 ` [RFC PATCH 1/2] libceph: defer clearing standby state to work function Jeff Layton
  2021-09-15 13:26 ` [RFC PATCH 2/2] libceph: allow tasks to submit messages without taking con->mutex Jeff Layton
  0 siblings, 2 replies; 3+ messages in thread
From: Jeff Layton @ 2021-09-15 13:26 UTC (permalink / raw)
  To: ceph-devel; +Cc: idryomov, mnelson

Several weeks ago, I was chatting with Mark Nelson and he mentioned that
his testing showed that the kclient would plateau out at a much lower
throughput than the hardware he was using was capable of.

While I don't have any details about the bottleneck he was hitting, I
noticed that libceph is heavily serialized on mutexes, the con->mutex,
in particular. This patchset is is an attempt to get the con->mutex out
of the outgoing message submission codepath.

In addition to potentially helping performance, this may also help to
close some potential races in the cephfs client. Right now, we have to
drop some spinlocks in order to send cap messages, but this may allow
us to avoid doing that.

My main concern with the set is whether this might open up some races
wrt changes to the con->state. I didn't hit any problems like that in
testing, but it's possibly an issue. We should be able to work around
that if it's a problem though, possibly by adding a new list_head to
stage submissions before adding them to the out_queue.

I've done some performance testing with the set, but the results were
inconclusive. I suspect my home test rig is hitting other bottlenecks
before con->mutex contention becomes an issue. It would be nice to do
some testing on a setup that allows for higher throughput to see if it
helps.

Comments and suggestions welcome...

Jeff Layton (2):
  libceph: defer clearing standby state to work function
  libceph: allow tasks to submit messages without taking con->mutex

 include/linux/ceph/messenger.h |  2 ++
 net/ceph/messenger.c           | 48 ++++++++++++++++++++--------------
 net/ceph/messenger_v1.c        | 35 +++++++++++++------------
 net/ceph/messenger_v2.c        |  5 ++++
 4 files changed, 54 insertions(+), 36 deletions(-)

-- 
2.31.1


^ permalink raw reply	[flat|nested] 3+ messages in thread

* [RFC PATCH 1/2] libceph: defer clearing standby state to work function
  2021-09-15 13:26 [RFC PATCH 0/2] libceph: submit new messages under spinlock instead of mutex Jeff Layton
@ 2021-09-15 13:26 ` Jeff Layton
  2021-09-15 13:26 ` [RFC PATCH 2/2] libceph: allow tasks to submit messages without taking con->mutex Jeff Layton
  1 sibling, 0 replies; 3+ messages in thread
From: Jeff Layton @ 2021-09-15 13:26 UTC (permalink / raw)
  To: ceph-devel; +Cc: idryomov, mnelson

In both cases where we call clear_standby, we queue the workqueue job
just afterward.

Add a new flag and to the con and set that instead of calling
clear_standby immediately.  When the workqueue job runs, test_and_clear
the flag and call clear_standby if it was set.

Signed-off-by: Jeff Layton <jlayton@kernel.org>
---
 include/linux/ceph/messenger.h |  1 +
 net/ceph/messenger.c           | 32 ++++++++++++++++++--------------
 2 files changed, 19 insertions(+), 14 deletions(-)

diff --git a/include/linux/ceph/messenger.h b/include/linux/ceph/messenger.h
index c9675ee33f51..0a455b05f17e 100644
--- a/include/linux/ceph/messenger.h
+++ b/include/linux/ceph/messenger.h
@@ -284,6 +284,7 @@ struct ceph_msg {
 #define CEPH_CON_F_SOCK_CLOSED		3  /* socket state changed to closed */
 #define CEPH_CON_F_BACKOFF		4  /* need to retry queuing delayed
 					      work */
+#define CEPH_CON_F_CLEAR_STANDBY	5  /* clear standby state */
 
 /* ceph connection fault delay defaults, for exponential backoff */
 #define BASE_DELAY_INTERVAL	(HZ / 4)
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index c93d103fe343..d14ff578cace 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -90,6 +90,7 @@ static bool con_flag_valid(unsigned long con_flag)
 	case CEPH_CON_F_WRITE_PENDING:
 	case CEPH_CON_F_SOCK_CLOSED:
 	case CEPH_CON_F_BACKOFF:
+	case CEPH_CON_F_CLEAR_STANDBY:
 		return true;
 	default:
 		return false;
@@ -1488,6 +1489,18 @@ static void con_fault_finish(struct ceph_connection *con)
 		con->ops->fault(con);
 }
 
+static void clear_standby(struct ceph_connection *con)
+{
+	/* come back from STANDBY? */
+	if (con->state == CEPH_CON_S_STANDBY) {
+		dout("clear_standby %p and ++connect_seq\n", con);
+		con->state = CEPH_CON_S_PREOPEN;
+		con->v1.connect_seq++;
+		WARN_ON(ceph_con_flag_test(con, CEPH_CON_F_WRITE_PENDING));
+		WARN_ON(ceph_con_flag_test(con, CEPH_CON_F_KEEPALIVE_PENDING));
+	}
+}
+
 /*
  * Do some work on a connection.  Drop a connection ref when we're done.
  */
@@ -1498,6 +1511,9 @@ static void ceph_con_workfn(struct work_struct *work)
 	bool fault;
 
 	mutex_lock(&con->mutex);
+	if (ceph_con_flag_test_and_clear(con, CEPH_CON_F_CLEAR_STANDBY))
+		clear_standby(con);
+
 	while (true) {
 		int ret;
 
@@ -1663,18 +1679,6 @@ static void msg_con_set(struct ceph_msg *msg, struct ceph_connection *con)
 	BUG_ON(msg->con != con);
 }
 
-static void clear_standby(struct ceph_connection *con)
-{
-	/* come back from STANDBY? */
-	if (con->state == CEPH_CON_S_STANDBY) {
-		dout("clear_standby %p and ++connect_seq\n", con);
-		con->state = CEPH_CON_S_PREOPEN;
-		con->v1.connect_seq++;
-		WARN_ON(ceph_con_flag_test(con, CEPH_CON_F_WRITE_PENDING));
-		WARN_ON(ceph_con_flag_test(con, CEPH_CON_F_KEEPALIVE_PENDING));
-	}
-}
-
 /*
  * Queue up an outgoing message on the given connection.
  *
@@ -1707,7 +1711,7 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg)
 	     le32_to_cpu(msg->hdr.middle_len),
 	     le32_to_cpu(msg->hdr.data_len));
 
-	clear_standby(con);
+	ceph_con_flag_set(con, CEPH_CON_F_CLEAR_STANDBY);
 	mutex_unlock(&con->mutex);
 
 	/* if there wasn't anything waiting to send before, queue
@@ -1793,8 +1797,8 @@ void ceph_con_keepalive(struct ceph_connection *con)
 {
 	dout("con_keepalive %p\n", con);
 	mutex_lock(&con->mutex);
-	clear_standby(con);
 	ceph_con_flag_set(con, CEPH_CON_F_KEEPALIVE_PENDING);
+	ceph_con_flag_set(con, CEPH_CON_F_CLEAR_STANDBY);
 	mutex_unlock(&con->mutex);
 
 	if (!ceph_con_flag_test_and_set(con, CEPH_CON_F_WRITE_PENDING))
-- 
2.31.1


^ permalink raw reply related	[flat|nested] 3+ messages in thread

* [RFC PATCH 2/2] libceph: allow tasks to submit messages without taking con->mutex
  2021-09-15 13:26 [RFC PATCH 0/2] libceph: submit new messages under spinlock instead of mutex Jeff Layton
  2021-09-15 13:26 ` [RFC PATCH 1/2] libceph: defer clearing standby state to work function Jeff Layton
@ 2021-09-15 13:26 ` Jeff Layton
  1 sibling, 0 replies; 3+ messages in thread
From: Jeff Layton @ 2021-09-15 13:26 UTC (permalink / raw)
  To: ceph-devel; +Cc: idryomov, mnelson

Currently, the out_queue is protected by the con->mutex. ceph_con_send
takes the mutex but just does some in-memory operations, followed by
kicking the workqueue job to do the actual send. This means that while
the workqueue job is operating, any task that wants to send a new
message will end up blocked.

Given that none of ceph_con_send's operations aside from the mutex
acquisition will block, we should be able to allow tasks to submit new
messages under a spinlock rather than taking the mutex, which should
reduce this contention and (hopefully) improve throughput for both
cephfs and rbd in highly contended situations.

Add a new spinlock to protect the out_queue, and ensure we take it while
holding the con->mutex when accessing the out_queue. Stop taking the
con->mutex in ceph_con_send, and instead just take the spinlock around
the list_add to the out_queue.

Signed-off-by: Jeff Layton <jlayton@kernel.org>
---
 include/linux/ceph/messenger.h |  1 +
 net/ceph/messenger.c           | 16 +++++++++++-----
 net/ceph/messenger_v1.c        | 35 +++++++++++++++++-----------------
 net/ceph/messenger_v2.c        |  5 +++++
 4 files changed, 35 insertions(+), 22 deletions(-)

diff --git a/include/linux/ceph/messenger.h b/include/linux/ceph/messenger.h
index 0a455b05f17e..155dd8a8e8ce 100644
--- a/include/linux/ceph/messenger.h
+++ b/include/linux/ceph/messenger.h
@@ -448,6 +448,7 @@ struct ceph_connection {
 	struct mutex mutex;
 
 	/* out queue */
+	spinlock_t out_queue_lock; /* protects out_queue */
 	struct list_head out_queue;
 	struct list_head out_sent;   /* sending or sent but unacked */
 	u64 out_seq;		     /* last message queued for send */
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index d14ff578cace..b539d3359ef4 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -633,6 +633,7 @@ void ceph_con_init(struct ceph_connection *con, void *private,
 	con_sock_state_init(con);
 
 	mutex_init(&con->mutex);
+	spin_lock_init(&con->out_queue_lock);
 	INIT_LIST_HEAD(&con->out_queue);
 	INIT_LIST_HEAD(&con->out_sent);
 	INIT_DELAYED_WORK(&con->work, ceph_con_workfn);
@@ -691,6 +692,7 @@ void ceph_con_discard_requeued(struct ceph_connection *con, u64 reconnect_seq)
 	u64 seq;
 
 	dout("%s con %p reconnect_seq %llu\n", __func__, con, reconnect_seq);
+	spin_lock(&con->out_queue_lock);
 	while (!list_empty(&con->out_queue)) {
 		msg = list_first_entry(&con->out_queue, struct ceph_msg,
 				       list_head);
@@ -704,6 +706,7 @@ void ceph_con_discard_requeued(struct ceph_connection *con, u64 reconnect_seq)
 		     msg, seq);
 		ceph_msg_remove(msg);
 	}
+	spin_unlock(&con->out_queue_lock);
 }
 
 #ifdef CONFIG_BLOCK
@@ -1601,16 +1604,19 @@ static void con_fault(struct ceph_connection *con)
 	}
 
 	/* Requeue anything that hasn't been acked */
+	spin_lock(&con->out_queue_lock);
 	list_splice_init(&con->out_sent, &con->out_queue);
 
 	/* If there are no messages queued or keepalive pending, place
 	 * the connection in a STANDBY state */
 	if (list_empty(&con->out_queue) &&
 	    !ceph_con_flag_test(con, CEPH_CON_F_KEEPALIVE_PENDING)) {
+		spin_unlock(&con->out_queue_lock);
 		dout("fault %p setting STANDBY clearing WRITE_PENDING\n", con);
 		ceph_con_flag_clear(con, CEPH_CON_F_WRITE_PENDING);
 		con->state = CEPH_CON_S_STANDBY;
 	} else {
+		spin_unlock(&con->out_queue_lock);
 		/* retry after a delay. */
 		con->state = CEPH_CON_S_PREOPEN;
 		if (!con->delay) {
@@ -1691,19 +1697,18 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg)
 	BUG_ON(msg->front.iov_len != le32_to_cpu(msg->hdr.front_len));
 	msg->needs_out_seq = true;
 
-	mutex_lock(&con->mutex);
-
-	if (con->state == CEPH_CON_S_CLOSED) {
+	if (READ_ONCE(con->state) == CEPH_CON_S_CLOSED) {
 		dout("con_send %p closed, dropping %p\n", con, msg);
 		ceph_msg_put(msg);
-		mutex_unlock(&con->mutex);
 		return;
 	}
 
 	msg_con_set(msg, con);
 
 	BUG_ON(!list_empty(&msg->list_head));
+	spin_lock(&con->out_queue_lock);
 	list_add_tail(&msg->list_head, &con->out_queue);
+	spin_unlock(&con->out_queue_lock);
 	dout("----- %p to %s%lld %d=%s len %d+%d+%d -----\n", msg,
 	     ENTITY_NAME(con->peer_name), le16_to_cpu(msg->hdr.type),
 	     ceph_msg_type_name(le16_to_cpu(msg->hdr.type)),
@@ -1712,7 +1717,6 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg)
 	     le32_to_cpu(msg->hdr.data_len));
 
 	ceph_con_flag_set(con, CEPH_CON_F_CLEAR_STANDBY);
-	mutex_unlock(&con->mutex);
 
 	/* if there wasn't anything waiting to send before, queue
 	 * new work */
@@ -2058,6 +2062,8 @@ void ceph_con_get_out_msg(struct ceph_connection *con)
 {
 	struct ceph_msg *msg;
 
+	lockdep_assert_held(&con->out_queue_lock);
+
 	BUG_ON(list_empty(&con->out_queue));
 	msg = list_first_entry(&con->out_queue, struct ceph_msg, list_head);
 	WARN_ON(msg->con != con);
diff --git a/net/ceph/messenger_v1.c b/net/ceph/messenger_v1.c
index 2cb5ffdf071a..db864be73b60 100644
--- a/net/ceph/messenger_v1.c
+++ b/net/ceph/messenger_v1.c
@@ -194,25 +194,9 @@ static void prepare_write_message_footer(struct ceph_connection *con)
  */
 static void prepare_write_message(struct ceph_connection *con)
 {
-	struct ceph_msg *m;
+	struct ceph_msg *m = con->out_msg;
 	u32 crc;
 
-	con_out_kvec_reset(con);
-	con->v1.out_msg_done = false;
-
-	/* Sneak an ack in there first?  If we can get it into the same
-	 * TCP packet that's a good thing. */
-	if (con->in_seq > con->in_seq_acked) {
-		con->in_seq_acked = con->in_seq;
-		con_out_kvec_add(con, sizeof (tag_ack), &tag_ack);
-		con->v1.out_temp_ack = cpu_to_le64(con->in_seq_acked);
-		con_out_kvec_add(con, sizeof(con->v1.out_temp_ack),
-			&con->v1.out_temp_ack);
-	}
-
-	ceph_con_get_out_msg(con);
-	m = con->out_msg;
-
 	dout("prepare_write_message %p seq %lld type %d len %d+%d+%zd\n",
 	     m, con->out_seq, le16_to_cpu(m->hdr.type),
 	     le32_to_cpu(m->hdr.front_len), le32_to_cpu(m->hdr.middle_len),
@@ -1427,10 +1411,27 @@ int ceph_con_v1_try_write(struct ceph_connection *con)
 			goto more;
 		}
 		/* is anything else pending? */
+		spin_lock(&con->out_queue_lock);
 		if (!list_empty(&con->out_queue)) {
+			con_out_kvec_reset(con);
+			con->v1.out_msg_done = false;
+
+			/* Sneak an ack in there first?  If we can get it into the same
+			 * TCP packet that's a good thing. */
+			if (con->in_seq > con->in_seq_acked) {
+				con->in_seq_acked = con->in_seq;
+				con_out_kvec_add(con, sizeof (tag_ack), &tag_ack);
+				con->v1.out_temp_ack = cpu_to_le64(con->in_seq_acked);
+				con_out_kvec_add(con, sizeof(con->v1.out_temp_ack),
+						 &con->v1.out_temp_ack);
+			}
+
+			ceph_con_get_out_msg(con);
+			spin_unlock(&con->out_queue_lock);
 			prepare_write_message(con);
 			goto more;
 		}
+		spin_unlock(&con->out_queue_lock);
 		if (con->in_seq > con->in_seq_acked) {
 			prepare_write_ack(con);
 			goto more;
diff --git a/net/ceph/messenger_v2.c b/net/ceph/messenger_v2.c
index cc40ce4e02fb..1a1c2c282120 100644
--- a/net/ceph/messenger_v2.c
+++ b/net/ceph/messenger_v2.c
@@ -3001,7 +3001,9 @@ static int populate_out_iter(struct ceph_connection *con)
 	}
 
 	WARN_ON(con->v2.out_state != OUT_S_GET_NEXT);
+	spin_lock(&con->out_queue_lock);
 	if (ceph_con_flag_test_and_clear(con, CEPH_CON_F_KEEPALIVE_PENDING)) {
+		spin_unlock(&con->out_queue_lock);
 		ret = prepare_keepalive2(con);
 		if (ret) {
 			pr_err("prepare_keepalive2 failed: %d\n", ret);
@@ -3009,18 +3011,21 @@ static int populate_out_iter(struct ceph_connection *con)
 		}
 	} else if (!list_empty(&con->out_queue)) {
 		ceph_con_get_out_msg(con);
+		spin_unlock(&con->out_queue_lock);
 		ret = prepare_message(con);
 		if (ret) {
 			pr_err("prepare_message failed: %d\n", ret);
 			return ret;
 		}
 	} else if (con->in_seq > con->in_seq_acked) {
+		spin_unlock(&con->out_queue_lock);
 		ret = prepare_ack(con);
 		if (ret) {
 			pr_err("prepare_ack failed: %d\n", ret);
 			return ret;
 		}
 	} else {
+		spin_unlock(&con->out_queue_lock);
 		goto nothing_pending;
 	}
 
-- 
2.31.1


^ permalink raw reply related	[flat|nested] 3+ messages in thread

end of thread, other threads:[~2021-09-15 13:27 UTC | newest]

Thread overview: 3+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2021-09-15 13:26 [RFC PATCH 0/2] libceph: submit new messages under spinlock instead of mutex Jeff Layton
2021-09-15 13:26 ` [RFC PATCH 1/2] libceph: defer clearing standby state to work function Jeff Layton
2021-09-15 13:26 ` [RFC PATCH 2/2] libceph: allow tasks to submit messages without taking con->mutex Jeff Layton

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).