* [RFC PATCH 1/2] libceph: defer clearing standby state to work function
2021-09-15 13:26 [RFC PATCH 0/2] libceph: submit new messages under spinlock instead of mutex Jeff Layton
@ 2021-09-15 13:26 ` Jeff Layton
2021-09-15 13:26 ` [RFC PATCH 2/2] libceph: allow tasks to submit messages without taking con->mutex Jeff Layton
1 sibling, 0 replies; 3+ messages in thread
From: Jeff Layton @ 2021-09-15 13:26 UTC (permalink / raw)
To: ceph-devel; +Cc: idryomov, mnelson
In both cases where we call clear_standby, we queue the workqueue job
just afterward.
Add a new flag and to the con and set that instead of calling
clear_standby immediately. When the workqueue job runs, test_and_clear
the flag and call clear_standby if it was set.
Signed-off-by: Jeff Layton <jlayton@kernel.org>
---
include/linux/ceph/messenger.h | 1 +
net/ceph/messenger.c | 32 ++++++++++++++++++--------------
2 files changed, 19 insertions(+), 14 deletions(-)
diff --git a/include/linux/ceph/messenger.h b/include/linux/ceph/messenger.h
index c9675ee33f51..0a455b05f17e 100644
--- a/include/linux/ceph/messenger.h
+++ b/include/linux/ceph/messenger.h
@@ -284,6 +284,7 @@ struct ceph_msg {
#define CEPH_CON_F_SOCK_CLOSED 3 /* socket state changed to closed */
#define CEPH_CON_F_BACKOFF 4 /* need to retry queuing delayed
work */
+#define CEPH_CON_F_CLEAR_STANDBY 5 /* clear standby state */
/* ceph connection fault delay defaults, for exponential backoff */
#define BASE_DELAY_INTERVAL (HZ / 4)
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index c93d103fe343..d14ff578cace 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -90,6 +90,7 @@ static bool con_flag_valid(unsigned long con_flag)
case CEPH_CON_F_WRITE_PENDING:
case CEPH_CON_F_SOCK_CLOSED:
case CEPH_CON_F_BACKOFF:
+ case CEPH_CON_F_CLEAR_STANDBY:
return true;
default:
return false;
@@ -1488,6 +1489,18 @@ static void con_fault_finish(struct ceph_connection *con)
con->ops->fault(con);
}
+static void clear_standby(struct ceph_connection *con)
+{
+ /* come back from STANDBY? */
+ if (con->state == CEPH_CON_S_STANDBY) {
+ dout("clear_standby %p and ++connect_seq\n", con);
+ con->state = CEPH_CON_S_PREOPEN;
+ con->v1.connect_seq++;
+ WARN_ON(ceph_con_flag_test(con, CEPH_CON_F_WRITE_PENDING));
+ WARN_ON(ceph_con_flag_test(con, CEPH_CON_F_KEEPALIVE_PENDING));
+ }
+}
+
/*
* Do some work on a connection. Drop a connection ref when we're done.
*/
@@ -1498,6 +1511,9 @@ static void ceph_con_workfn(struct work_struct *work)
bool fault;
mutex_lock(&con->mutex);
+ if (ceph_con_flag_test_and_clear(con, CEPH_CON_F_CLEAR_STANDBY))
+ clear_standby(con);
+
while (true) {
int ret;
@@ -1663,18 +1679,6 @@ static void msg_con_set(struct ceph_msg *msg, struct ceph_connection *con)
BUG_ON(msg->con != con);
}
-static void clear_standby(struct ceph_connection *con)
-{
- /* come back from STANDBY? */
- if (con->state == CEPH_CON_S_STANDBY) {
- dout("clear_standby %p and ++connect_seq\n", con);
- con->state = CEPH_CON_S_PREOPEN;
- con->v1.connect_seq++;
- WARN_ON(ceph_con_flag_test(con, CEPH_CON_F_WRITE_PENDING));
- WARN_ON(ceph_con_flag_test(con, CEPH_CON_F_KEEPALIVE_PENDING));
- }
-}
-
/*
* Queue up an outgoing message on the given connection.
*
@@ -1707,7 +1711,7 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg)
le32_to_cpu(msg->hdr.middle_len),
le32_to_cpu(msg->hdr.data_len));
- clear_standby(con);
+ ceph_con_flag_set(con, CEPH_CON_F_CLEAR_STANDBY);
mutex_unlock(&con->mutex);
/* if there wasn't anything waiting to send before, queue
@@ -1793,8 +1797,8 @@ void ceph_con_keepalive(struct ceph_connection *con)
{
dout("con_keepalive %p\n", con);
mutex_lock(&con->mutex);
- clear_standby(con);
ceph_con_flag_set(con, CEPH_CON_F_KEEPALIVE_PENDING);
+ ceph_con_flag_set(con, CEPH_CON_F_CLEAR_STANDBY);
mutex_unlock(&con->mutex);
if (!ceph_con_flag_test_and_set(con, CEPH_CON_F_WRITE_PENDING))
--
2.31.1
^ permalink raw reply related [flat|nested] 3+ messages in thread
* [RFC PATCH 2/2] libceph: allow tasks to submit messages without taking con->mutex
2021-09-15 13:26 [RFC PATCH 0/2] libceph: submit new messages under spinlock instead of mutex Jeff Layton
2021-09-15 13:26 ` [RFC PATCH 1/2] libceph: defer clearing standby state to work function Jeff Layton
@ 2021-09-15 13:26 ` Jeff Layton
1 sibling, 0 replies; 3+ messages in thread
From: Jeff Layton @ 2021-09-15 13:26 UTC (permalink / raw)
To: ceph-devel; +Cc: idryomov, mnelson
Currently, the out_queue is protected by the con->mutex. ceph_con_send
takes the mutex but just does some in-memory operations, followed by
kicking the workqueue job to do the actual send. This means that while
the workqueue job is operating, any task that wants to send a new
message will end up blocked.
Given that none of ceph_con_send's operations aside from the mutex
acquisition will block, we should be able to allow tasks to submit new
messages under a spinlock rather than taking the mutex, which should
reduce this contention and (hopefully) improve throughput for both
cephfs and rbd in highly contended situations.
Add a new spinlock to protect the out_queue, and ensure we take it while
holding the con->mutex when accessing the out_queue. Stop taking the
con->mutex in ceph_con_send, and instead just take the spinlock around
the list_add to the out_queue.
Signed-off-by: Jeff Layton <jlayton@kernel.org>
---
include/linux/ceph/messenger.h | 1 +
net/ceph/messenger.c | 16 +++++++++++-----
net/ceph/messenger_v1.c | 35 +++++++++++++++++-----------------
net/ceph/messenger_v2.c | 5 +++++
4 files changed, 35 insertions(+), 22 deletions(-)
diff --git a/include/linux/ceph/messenger.h b/include/linux/ceph/messenger.h
index 0a455b05f17e..155dd8a8e8ce 100644
--- a/include/linux/ceph/messenger.h
+++ b/include/linux/ceph/messenger.h
@@ -448,6 +448,7 @@ struct ceph_connection {
struct mutex mutex;
/* out queue */
+ spinlock_t out_queue_lock; /* protects out_queue */
struct list_head out_queue;
struct list_head out_sent; /* sending or sent but unacked */
u64 out_seq; /* last message queued for send */
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index d14ff578cace..b539d3359ef4 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -633,6 +633,7 @@ void ceph_con_init(struct ceph_connection *con, void *private,
con_sock_state_init(con);
mutex_init(&con->mutex);
+ spin_lock_init(&con->out_queue_lock);
INIT_LIST_HEAD(&con->out_queue);
INIT_LIST_HEAD(&con->out_sent);
INIT_DELAYED_WORK(&con->work, ceph_con_workfn);
@@ -691,6 +692,7 @@ void ceph_con_discard_requeued(struct ceph_connection *con, u64 reconnect_seq)
u64 seq;
dout("%s con %p reconnect_seq %llu\n", __func__, con, reconnect_seq);
+ spin_lock(&con->out_queue_lock);
while (!list_empty(&con->out_queue)) {
msg = list_first_entry(&con->out_queue, struct ceph_msg,
list_head);
@@ -704,6 +706,7 @@ void ceph_con_discard_requeued(struct ceph_connection *con, u64 reconnect_seq)
msg, seq);
ceph_msg_remove(msg);
}
+ spin_unlock(&con->out_queue_lock);
}
#ifdef CONFIG_BLOCK
@@ -1601,16 +1604,19 @@ static void con_fault(struct ceph_connection *con)
}
/* Requeue anything that hasn't been acked */
+ spin_lock(&con->out_queue_lock);
list_splice_init(&con->out_sent, &con->out_queue);
/* If there are no messages queued or keepalive pending, place
* the connection in a STANDBY state */
if (list_empty(&con->out_queue) &&
!ceph_con_flag_test(con, CEPH_CON_F_KEEPALIVE_PENDING)) {
+ spin_unlock(&con->out_queue_lock);
dout("fault %p setting STANDBY clearing WRITE_PENDING\n", con);
ceph_con_flag_clear(con, CEPH_CON_F_WRITE_PENDING);
con->state = CEPH_CON_S_STANDBY;
} else {
+ spin_unlock(&con->out_queue_lock);
/* retry after a delay. */
con->state = CEPH_CON_S_PREOPEN;
if (!con->delay) {
@@ -1691,19 +1697,18 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg)
BUG_ON(msg->front.iov_len != le32_to_cpu(msg->hdr.front_len));
msg->needs_out_seq = true;
- mutex_lock(&con->mutex);
-
- if (con->state == CEPH_CON_S_CLOSED) {
+ if (READ_ONCE(con->state) == CEPH_CON_S_CLOSED) {
dout("con_send %p closed, dropping %p\n", con, msg);
ceph_msg_put(msg);
- mutex_unlock(&con->mutex);
return;
}
msg_con_set(msg, con);
BUG_ON(!list_empty(&msg->list_head));
+ spin_lock(&con->out_queue_lock);
list_add_tail(&msg->list_head, &con->out_queue);
+ spin_unlock(&con->out_queue_lock);
dout("----- %p to %s%lld %d=%s len %d+%d+%d -----\n", msg,
ENTITY_NAME(con->peer_name), le16_to_cpu(msg->hdr.type),
ceph_msg_type_name(le16_to_cpu(msg->hdr.type)),
@@ -1712,7 +1717,6 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg)
le32_to_cpu(msg->hdr.data_len));
ceph_con_flag_set(con, CEPH_CON_F_CLEAR_STANDBY);
- mutex_unlock(&con->mutex);
/* if there wasn't anything waiting to send before, queue
* new work */
@@ -2058,6 +2062,8 @@ void ceph_con_get_out_msg(struct ceph_connection *con)
{
struct ceph_msg *msg;
+ lockdep_assert_held(&con->out_queue_lock);
+
BUG_ON(list_empty(&con->out_queue));
msg = list_first_entry(&con->out_queue, struct ceph_msg, list_head);
WARN_ON(msg->con != con);
diff --git a/net/ceph/messenger_v1.c b/net/ceph/messenger_v1.c
index 2cb5ffdf071a..db864be73b60 100644
--- a/net/ceph/messenger_v1.c
+++ b/net/ceph/messenger_v1.c
@@ -194,25 +194,9 @@ static void prepare_write_message_footer(struct ceph_connection *con)
*/
static void prepare_write_message(struct ceph_connection *con)
{
- struct ceph_msg *m;
+ struct ceph_msg *m = con->out_msg;
u32 crc;
- con_out_kvec_reset(con);
- con->v1.out_msg_done = false;
-
- /* Sneak an ack in there first? If we can get it into the same
- * TCP packet that's a good thing. */
- if (con->in_seq > con->in_seq_acked) {
- con->in_seq_acked = con->in_seq;
- con_out_kvec_add(con, sizeof (tag_ack), &tag_ack);
- con->v1.out_temp_ack = cpu_to_le64(con->in_seq_acked);
- con_out_kvec_add(con, sizeof(con->v1.out_temp_ack),
- &con->v1.out_temp_ack);
- }
-
- ceph_con_get_out_msg(con);
- m = con->out_msg;
-
dout("prepare_write_message %p seq %lld type %d len %d+%d+%zd\n",
m, con->out_seq, le16_to_cpu(m->hdr.type),
le32_to_cpu(m->hdr.front_len), le32_to_cpu(m->hdr.middle_len),
@@ -1427,10 +1411,27 @@ int ceph_con_v1_try_write(struct ceph_connection *con)
goto more;
}
/* is anything else pending? */
+ spin_lock(&con->out_queue_lock);
if (!list_empty(&con->out_queue)) {
+ con_out_kvec_reset(con);
+ con->v1.out_msg_done = false;
+
+ /* Sneak an ack in there first? If we can get it into the same
+ * TCP packet that's a good thing. */
+ if (con->in_seq > con->in_seq_acked) {
+ con->in_seq_acked = con->in_seq;
+ con_out_kvec_add(con, sizeof (tag_ack), &tag_ack);
+ con->v1.out_temp_ack = cpu_to_le64(con->in_seq_acked);
+ con_out_kvec_add(con, sizeof(con->v1.out_temp_ack),
+ &con->v1.out_temp_ack);
+ }
+
+ ceph_con_get_out_msg(con);
+ spin_unlock(&con->out_queue_lock);
prepare_write_message(con);
goto more;
}
+ spin_unlock(&con->out_queue_lock);
if (con->in_seq > con->in_seq_acked) {
prepare_write_ack(con);
goto more;
diff --git a/net/ceph/messenger_v2.c b/net/ceph/messenger_v2.c
index cc40ce4e02fb..1a1c2c282120 100644
--- a/net/ceph/messenger_v2.c
+++ b/net/ceph/messenger_v2.c
@@ -3001,7 +3001,9 @@ static int populate_out_iter(struct ceph_connection *con)
}
WARN_ON(con->v2.out_state != OUT_S_GET_NEXT);
+ spin_lock(&con->out_queue_lock);
if (ceph_con_flag_test_and_clear(con, CEPH_CON_F_KEEPALIVE_PENDING)) {
+ spin_unlock(&con->out_queue_lock);
ret = prepare_keepalive2(con);
if (ret) {
pr_err("prepare_keepalive2 failed: %d\n", ret);
@@ -3009,18 +3011,21 @@ static int populate_out_iter(struct ceph_connection *con)
}
} else if (!list_empty(&con->out_queue)) {
ceph_con_get_out_msg(con);
+ spin_unlock(&con->out_queue_lock);
ret = prepare_message(con);
if (ret) {
pr_err("prepare_message failed: %d\n", ret);
return ret;
}
} else if (con->in_seq > con->in_seq_acked) {
+ spin_unlock(&con->out_queue_lock);
ret = prepare_ack(con);
if (ret) {
pr_err("prepare_ack failed: %d\n", ret);
return ret;
}
} else {
+ spin_unlock(&con->out_queue_lock);
goto nothing_pending;
}
--
2.31.1
^ permalink raw reply related [flat|nested] 3+ messages in thread