All of lore.kernel.org
 help / color / mirror / Atom feed
From: Jeff Layton <jlayton@kernel.org>
To: ceph-devel@vger.kernel.org
Cc: idryomov@gmail.com, mnelson@redhat.com
Subject: [RFC PATCH 2/2] libceph: allow tasks to submit messages without taking con->mutex
Date: Wed, 15 Sep 2021 09:26:56 -0400	[thread overview]
Message-ID: <20210915132656.30347-3-jlayton@kernel.org> (raw)
In-Reply-To: <20210915132656.30347-1-jlayton@kernel.org>

Currently, the out_queue is protected by the con->mutex. ceph_con_send
takes the mutex but just does some in-memory operations, followed by
kicking the workqueue job to do the actual send. This means that while
the workqueue job is operating, any task that wants to send a new
message will end up blocked.

Given that none of ceph_con_send's operations aside from the mutex
acquisition will block, we should be able to allow tasks to submit new
messages under a spinlock rather than taking the mutex, which should
reduce this contention and (hopefully) improve throughput for both
cephfs and rbd in highly contended situations.

Add a new spinlock to protect the out_queue, and ensure we take it while
holding the con->mutex when accessing the out_queue. Stop taking the
con->mutex in ceph_con_send, and instead just take the spinlock around
the list_add to the out_queue.

Signed-off-by: Jeff Layton <jlayton@kernel.org>
---
 include/linux/ceph/messenger.h |  1 +
 net/ceph/messenger.c           | 16 +++++++++++-----
 net/ceph/messenger_v1.c        | 35 +++++++++++++++++-----------------
 net/ceph/messenger_v2.c        |  5 +++++
 4 files changed, 35 insertions(+), 22 deletions(-)

diff --git a/include/linux/ceph/messenger.h b/include/linux/ceph/messenger.h
index 0a455b05f17e..155dd8a8e8ce 100644
--- a/include/linux/ceph/messenger.h
+++ b/include/linux/ceph/messenger.h
@@ -448,6 +448,7 @@ struct ceph_connection {
 	struct mutex mutex;
 
 	/* out queue */
+	spinlock_t out_queue_lock; /* protects out_queue */
 	struct list_head out_queue;
 	struct list_head out_sent;   /* sending or sent but unacked */
 	u64 out_seq;		     /* last message queued for send */
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index d14ff578cace..b539d3359ef4 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -633,6 +633,7 @@ void ceph_con_init(struct ceph_connection *con, void *private,
 	con_sock_state_init(con);
 
 	mutex_init(&con->mutex);
+	spin_lock_init(&con->out_queue_lock);
 	INIT_LIST_HEAD(&con->out_queue);
 	INIT_LIST_HEAD(&con->out_sent);
 	INIT_DELAYED_WORK(&con->work, ceph_con_workfn);
@@ -691,6 +692,7 @@ void ceph_con_discard_requeued(struct ceph_connection *con, u64 reconnect_seq)
 	u64 seq;
 
 	dout("%s con %p reconnect_seq %llu\n", __func__, con, reconnect_seq);
+	spin_lock(&con->out_queue_lock);
 	while (!list_empty(&con->out_queue)) {
 		msg = list_first_entry(&con->out_queue, struct ceph_msg,
 				       list_head);
@@ -704,6 +706,7 @@ void ceph_con_discard_requeued(struct ceph_connection *con, u64 reconnect_seq)
 		     msg, seq);
 		ceph_msg_remove(msg);
 	}
+	spin_unlock(&con->out_queue_lock);
 }
 
 #ifdef CONFIG_BLOCK
@@ -1601,16 +1604,19 @@ static void con_fault(struct ceph_connection *con)
 	}
 
 	/* Requeue anything that hasn't been acked */
+	spin_lock(&con->out_queue_lock);
 	list_splice_init(&con->out_sent, &con->out_queue);
 
 	/* If there are no messages queued or keepalive pending, place
 	 * the connection in a STANDBY state */
 	if (list_empty(&con->out_queue) &&
 	    !ceph_con_flag_test(con, CEPH_CON_F_KEEPALIVE_PENDING)) {
+		spin_unlock(&con->out_queue_lock);
 		dout("fault %p setting STANDBY clearing WRITE_PENDING\n", con);
 		ceph_con_flag_clear(con, CEPH_CON_F_WRITE_PENDING);
 		con->state = CEPH_CON_S_STANDBY;
 	} else {
+		spin_unlock(&con->out_queue_lock);
 		/* retry after a delay. */
 		con->state = CEPH_CON_S_PREOPEN;
 		if (!con->delay) {
@@ -1691,19 +1697,18 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg)
 	BUG_ON(msg->front.iov_len != le32_to_cpu(msg->hdr.front_len));
 	msg->needs_out_seq = true;
 
-	mutex_lock(&con->mutex);
-
-	if (con->state == CEPH_CON_S_CLOSED) {
+	if (READ_ONCE(con->state) == CEPH_CON_S_CLOSED) {
 		dout("con_send %p closed, dropping %p\n", con, msg);
 		ceph_msg_put(msg);
-		mutex_unlock(&con->mutex);
 		return;
 	}
 
 	msg_con_set(msg, con);
 
 	BUG_ON(!list_empty(&msg->list_head));
+	spin_lock(&con->out_queue_lock);
 	list_add_tail(&msg->list_head, &con->out_queue);
+	spin_unlock(&con->out_queue_lock);
 	dout("----- %p to %s%lld %d=%s len %d+%d+%d -----\n", msg,
 	     ENTITY_NAME(con->peer_name), le16_to_cpu(msg->hdr.type),
 	     ceph_msg_type_name(le16_to_cpu(msg->hdr.type)),
@@ -1712,7 +1717,6 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg)
 	     le32_to_cpu(msg->hdr.data_len));
 
 	ceph_con_flag_set(con, CEPH_CON_F_CLEAR_STANDBY);
-	mutex_unlock(&con->mutex);
 
 	/* if there wasn't anything waiting to send before, queue
 	 * new work */
@@ -2058,6 +2062,8 @@ void ceph_con_get_out_msg(struct ceph_connection *con)
 {
 	struct ceph_msg *msg;
 
+	lockdep_assert_held(&con->out_queue_lock);
+
 	BUG_ON(list_empty(&con->out_queue));
 	msg = list_first_entry(&con->out_queue, struct ceph_msg, list_head);
 	WARN_ON(msg->con != con);
diff --git a/net/ceph/messenger_v1.c b/net/ceph/messenger_v1.c
index 2cb5ffdf071a..db864be73b60 100644
--- a/net/ceph/messenger_v1.c
+++ b/net/ceph/messenger_v1.c
@@ -194,25 +194,9 @@ static void prepare_write_message_footer(struct ceph_connection *con)
  */
 static void prepare_write_message(struct ceph_connection *con)
 {
-	struct ceph_msg *m;
+	struct ceph_msg *m = con->out_msg;
 	u32 crc;
 
-	con_out_kvec_reset(con);
-	con->v1.out_msg_done = false;
-
-	/* Sneak an ack in there first?  If we can get it into the same
-	 * TCP packet that's a good thing. */
-	if (con->in_seq > con->in_seq_acked) {
-		con->in_seq_acked = con->in_seq;
-		con_out_kvec_add(con, sizeof (tag_ack), &tag_ack);
-		con->v1.out_temp_ack = cpu_to_le64(con->in_seq_acked);
-		con_out_kvec_add(con, sizeof(con->v1.out_temp_ack),
-			&con->v1.out_temp_ack);
-	}
-
-	ceph_con_get_out_msg(con);
-	m = con->out_msg;
-
 	dout("prepare_write_message %p seq %lld type %d len %d+%d+%zd\n",
 	     m, con->out_seq, le16_to_cpu(m->hdr.type),
 	     le32_to_cpu(m->hdr.front_len), le32_to_cpu(m->hdr.middle_len),
@@ -1427,10 +1411,27 @@ int ceph_con_v1_try_write(struct ceph_connection *con)
 			goto more;
 		}
 		/* is anything else pending? */
+		spin_lock(&con->out_queue_lock);
 		if (!list_empty(&con->out_queue)) {
+			con_out_kvec_reset(con);
+			con->v1.out_msg_done = false;
+
+			/* Sneak an ack in there first?  If we can get it into the same
+			 * TCP packet that's a good thing. */
+			if (con->in_seq > con->in_seq_acked) {
+				con->in_seq_acked = con->in_seq;
+				con_out_kvec_add(con, sizeof (tag_ack), &tag_ack);
+				con->v1.out_temp_ack = cpu_to_le64(con->in_seq_acked);
+				con_out_kvec_add(con, sizeof(con->v1.out_temp_ack),
+						 &con->v1.out_temp_ack);
+			}
+
+			ceph_con_get_out_msg(con);
+			spin_unlock(&con->out_queue_lock);
 			prepare_write_message(con);
 			goto more;
 		}
+		spin_unlock(&con->out_queue_lock);
 		if (con->in_seq > con->in_seq_acked) {
 			prepare_write_ack(con);
 			goto more;
diff --git a/net/ceph/messenger_v2.c b/net/ceph/messenger_v2.c
index cc40ce4e02fb..1a1c2c282120 100644
--- a/net/ceph/messenger_v2.c
+++ b/net/ceph/messenger_v2.c
@@ -3001,7 +3001,9 @@ static int populate_out_iter(struct ceph_connection *con)
 	}
 
 	WARN_ON(con->v2.out_state != OUT_S_GET_NEXT);
+	spin_lock(&con->out_queue_lock);
 	if (ceph_con_flag_test_and_clear(con, CEPH_CON_F_KEEPALIVE_PENDING)) {
+		spin_unlock(&con->out_queue_lock);
 		ret = prepare_keepalive2(con);
 		if (ret) {
 			pr_err("prepare_keepalive2 failed: %d\n", ret);
@@ -3009,18 +3011,21 @@ static int populate_out_iter(struct ceph_connection *con)
 		}
 	} else if (!list_empty(&con->out_queue)) {
 		ceph_con_get_out_msg(con);
+		spin_unlock(&con->out_queue_lock);
 		ret = prepare_message(con);
 		if (ret) {
 			pr_err("prepare_message failed: %d\n", ret);
 			return ret;
 		}
 	} else if (con->in_seq > con->in_seq_acked) {
+		spin_unlock(&con->out_queue_lock);
 		ret = prepare_ack(con);
 		if (ret) {
 			pr_err("prepare_ack failed: %d\n", ret);
 			return ret;
 		}
 	} else {
+		spin_unlock(&con->out_queue_lock);
 		goto nothing_pending;
 	}
 
-- 
2.31.1


      parent reply	other threads:[~2021-09-15 13:27 UTC|newest]

Thread overview: 3+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2021-09-15 13:26 [RFC PATCH 0/2] libceph: submit new messages under spinlock instead of mutex Jeff Layton
2021-09-15 13:26 ` [RFC PATCH 1/2] libceph: defer clearing standby state to work function Jeff Layton
2021-09-15 13:26 ` Jeff Layton [this message]

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20210915132656.30347-3-jlayton@kernel.org \
    --to=jlayton@kernel.org \
    --cc=ceph-devel@vger.kernel.org \
    --cc=idryomov@gmail.com \
    --cc=mnelson@redhat.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.