All of lore.kernel.org
 help / color / mirror / Atom feed
* [PATCH v3 0/5] ceph/libceph: add support for sparse reads to msgr2 crc codepath
@ 2022-03-18 13:50 Jeff Layton
  2022-03-18 13:50 ` [PATCH v3 1/5] libceph: add spinlock around osd->o_requests Jeff Layton
                   ` (4 more replies)
  0 siblings, 5 replies; 15+ messages in thread
From: Jeff Layton @ 2022-03-18 13:50 UTC (permalink / raw)
  To: idryomov, xiubli; +Cc: ceph-devel

This is a revised version of the sparse read code I posted a week or so
ago. This work is required for fscrypt integration work, but may be
useful on its own as well, and may be applicable to RBD as well.

There are some significant differences from the last set:

- most of the extent data that comes in little-endian is now
  endian-converted in-place on BE arches.

- the OSD client now passes the extent map from the read back to the
  caller. This allows us to properly decrypt things at a higher level.
  It also makes it simpler for the caller to determine the actual length
  of the data read into the buffer

- this code should allow us to support multiple sparse read operations
  in an OSD request. That's not been tested yet though.

This has been tested with xfstests and it seems to work as expected, and
seems to be on-par performance-wise with "normal" reads.

Note that the messenger v2 CRC path is still the only part that has been
implemented so far. We'll need to implement support for v2-secure and v1
as well before we'll want to merge any of this.

We may also want to only selectively use sparse reads when necessary
but they don't seem to be any slower so it may be simpler to just always
use them.

Jeff Layton (5):
  libceph: add spinlock around osd->o_requests
  libceph: define struct ceph_sparse_extent and add some helpers
  libceph: add sparse read support to msgr2 crc state machine
  libceph: add sparse read support to OSD client
  ceph: convert to sparse reads

 fs/ceph/addr.c                  |  13 +-
 fs/ceph/file.c                  |  41 ++++-
 fs/ceph/super.h                 |   7 +
 include/linux/ceph/messenger.h  |  29 ++++
 include/linux/ceph/osd_client.h |  71 ++++++++-
 net/ceph/messenger.c            |   1 +
 net/ceph/messenger_v2.c         | 164 ++++++++++++++++++--
 net/ceph/osd_client.c           | 256 +++++++++++++++++++++++++++++++-
 8 files changed, 558 insertions(+), 24 deletions(-)

-- 
2.35.1


^ permalink raw reply	[flat|nested] 15+ messages in thread

* [PATCH v3 1/5] libceph: add spinlock around osd->o_requests
  2022-03-18 13:50 [PATCH v3 0/5] ceph/libceph: add support for sparse reads to msgr2 crc codepath Jeff Layton
@ 2022-03-18 13:50 ` Jeff Layton
  2022-03-18 13:50 ` [PATCH v3 2/5] libceph: define struct ceph_sparse_extent and add some helpers Jeff Layton
                   ` (3 subsequent siblings)
  4 siblings, 0 replies; 15+ messages in thread
From: Jeff Layton @ 2022-03-18 13:50 UTC (permalink / raw)
  To: idryomov, xiubli; +Cc: ceph-devel

In a later patch, we're going to need to search for a request in
the rbtree, but taking the o_mutex is inconvenient as we already
hold the con mutex at the point where we need it.

Add a new spinlock that we take when inserting and erasing entries from
the o_requests tree. Search of the rbtree can be done with either the
mutex or the spinlock, but insertion and removal requires both.

Signed-off-by: Jeff Layton <jlayton@kernel.org>
---
 include/linux/ceph/osd_client.h | 8 +++++++-
 net/ceph/osd_client.c           | 5 +++++
 2 files changed, 12 insertions(+), 1 deletion(-)

diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h
index 3431011f364d..3122c1a3205f 100644
--- a/include/linux/ceph/osd_client.h
+++ b/include/linux/ceph/osd_client.h
@@ -29,7 +29,12 @@ typedef void (*ceph_osdc_callback_t)(struct ceph_osd_request *);
 
 #define CEPH_HOMELESS_OSD	-1
 
-/* a given osd we're communicating with */
+/*
+ * A given osd we're communicating with.
+ *
+ * Note that the o_requests tree can be searched while holding the "lock" mutex
+ * or the "o_requests_lock" spinlock. Insertion or removal requires both!
+ */
 struct ceph_osd {
 	refcount_t o_ref;
 	struct ceph_osd_client *o_osdc;
@@ -37,6 +42,7 @@ struct ceph_osd {
 	int o_incarnation;
 	struct rb_node o_node;
 	struct ceph_connection o_con;
+	spinlock_t o_requests_lock;
 	struct rb_root o_requests;
 	struct rb_root o_linger_requests;
 	struct rb_root o_backoff_mappings;
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index 1c5815530e0d..1e8842ef6e63 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -1198,6 +1198,7 @@ static void osd_init(struct ceph_osd *osd)
 {
 	refcount_set(&osd->o_ref, 1);
 	RB_CLEAR_NODE(&osd->o_node);
+	spin_lock_init(&osd->o_requests_lock);
 	osd->o_requests = RB_ROOT;
 	osd->o_linger_requests = RB_ROOT;
 	osd->o_backoff_mappings = RB_ROOT;
@@ -1427,7 +1428,9 @@ static void link_request(struct ceph_osd *osd, struct ceph_osd_request *req)
 		atomic_inc(&osd->o_osdc->num_homeless);
 
 	get_osd(osd);
+	spin_lock(&osd->o_requests_lock);
 	insert_request(&osd->o_requests, req);
+	spin_unlock(&osd->o_requests_lock);
 	req->r_osd = osd;
 }
 
@@ -1439,7 +1442,9 @@ static void unlink_request(struct ceph_osd *osd, struct ceph_osd_request *req)
 	     req, req->r_tid);
 
 	req->r_osd = NULL;
+	spin_lock(&osd->o_requests_lock);
 	erase_request(&osd->o_requests, req);
+	spin_unlock(&osd->o_requests_lock);
 	put_osd(osd);
 
 	if (!osd_homeless(osd))
-- 
2.35.1


^ permalink raw reply related	[flat|nested] 15+ messages in thread

* [PATCH v3 2/5] libceph: define struct ceph_sparse_extent and add some helpers
  2022-03-18 13:50 [PATCH v3 0/5] ceph/libceph: add support for sparse reads to msgr2 crc codepath Jeff Layton
  2022-03-18 13:50 ` [PATCH v3 1/5] libceph: add spinlock around osd->o_requests Jeff Layton
@ 2022-03-18 13:50 ` Jeff Layton
  2022-03-21  7:57   ` Xiubo Li
  2022-03-18 13:50 ` [PATCH v3 3/5] libceph: add sparse read support to msgr2 crc state machine Jeff Layton
                   ` (2 subsequent siblings)
  4 siblings, 1 reply; 15+ messages in thread
From: Jeff Layton @ 2022-03-18 13:50 UTC (permalink / raw)
  To: idryomov, xiubli; +Cc: ceph-devel

When the OSD sends back a sparse read reply, it contains an array of
these structures. Define the structure and add a couple of helpers for
dealing with them.

Also add a place in struct ceph_osd_req_op to store the extent buffer,
and code to free it if it's populated when the req is torn down.

Signed-off-by: Jeff Layton <jlayton@kernel.org>
---
 include/linux/ceph/osd_client.h | 31 ++++++++++++++++++++++++++++++-
 net/ceph/osd_client.c           | 13 +++++++++++++
 2 files changed, 43 insertions(+), 1 deletion(-)

diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h
index 3122c1a3205f..00a5b53a6763 100644
--- a/include/linux/ceph/osd_client.h
+++ b/include/linux/ceph/osd_client.h
@@ -29,6 +29,17 @@ typedef void (*ceph_osdc_callback_t)(struct ceph_osd_request *);
 
 #define CEPH_HOMELESS_OSD	-1
 
+/*
+ * A single extent in a SPARSE_READ reply.
+ *
+ * Note that these come from the OSD as little-endian values. On BE arches,
+ * we convert them in-place after receipt.
+ */
+struct ceph_sparse_extent {
+	u64	off;
+	u64	len;
+} __attribute__((packed));
+
 /*
  * A given osd we're communicating with.
  *
@@ -104,6 +115,8 @@ struct ceph_osd_req_op {
 			u64 offset, length;
 			u64 truncate_size;
 			u32 truncate_seq;
+			int sparse_ext_len;
+			struct ceph_sparse_extent *sparse_ext;
 			struct ceph_osd_data osd_data;
 		} extent;
 		struct {
@@ -507,6 +520,8 @@ extern struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *,
 				      u32 truncate_seq, u64 truncate_size,
 				      bool use_mempool);
 
+int ceph_alloc_sparse_ext_map(struct ceph_osd_req_op *op, int len);
+
 extern void ceph_osdc_get_request(struct ceph_osd_request *req);
 extern void ceph_osdc_put_request(struct ceph_osd_request *req);
 
@@ -562,5 +577,19 @@ int ceph_osdc_list_watchers(struct ceph_osd_client *osdc,
 			    struct ceph_object_locator *oloc,
 			    struct ceph_watch_item **watchers,
 			    u32 *num_watchers);
-#endif
 
+/* Find offset into the buffer of the end of the extent map */
+static inline u64 ceph_sparse_ext_map_end(struct ceph_osd_req_op *op)
+{
+	struct ceph_sparse_extent *ext;
+
+	/* No extents? No data */
+	if (op->extent.sparse_ext_len == 0)
+		return 0;
+
+	ext = &op->extent.sparse_ext[op->extent.sparse_ext_len - 1];
+
+	return ext->off + ext->len - op->extent.offset;
+}
+
+#endif
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index 1e8842ef6e63..9fec258e1f8d 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -378,6 +378,7 @@ static void osd_req_op_data_release(struct ceph_osd_request *osd_req,
 	case CEPH_OSD_OP_READ:
 	case CEPH_OSD_OP_WRITE:
 	case CEPH_OSD_OP_WRITEFULL:
+		kfree(op->extent.sparse_ext);
 		ceph_osd_data_release(&op->extent.osd_data);
 		break;
 	case CEPH_OSD_OP_CALL:
@@ -1141,6 +1142,18 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
 }
 EXPORT_SYMBOL(ceph_osdc_new_request);
 
+int ceph_alloc_sparse_ext_map(struct ceph_osd_req_op *op, int len)
+{
+	op->extent.sparse_ext_len = len;
+	op->extent.sparse_ext = kmalloc_array(len,
+					sizeof(*op->extent.sparse_ext),
+					GFP_NOFS);
+	if (!op->extent.sparse_ext)
+		return -ENOMEM;
+	return 0;
+}
+EXPORT_SYMBOL(ceph_alloc_sparse_ext_map);
+
 /*
  * We keep osd requests in an rbtree, sorted by ->r_tid.
  */
-- 
2.35.1


^ permalink raw reply related	[flat|nested] 15+ messages in thread

* [PATCH v3 3/5] libceph: add sparse read support to msgr2 crc state machine
  2022-03-18 13:50 [PATCH v3 0/5] ceph/libceph: add support for sparse reads to msgr2 crc codepath Jeff Layton
  2022-03-18 13:50 ` [PATCH v3 1/5] libceph: add spinlock around osd->o_requests Jeff Layton
  2022-03-18 13:50 ` [PATCH v3 2/5] libceph: define struct ceph_sparse_extent and add some helpers Jeff Layton
@ 2022-03-18 13:50 ` Jeff Layton
  2022-03-18 13:50 ` [PATCH v3 4/5] libceph: add sparse read support to OSD client Jeff Layton
  2022-03-18 13:50 ` [PATCH v3 5/5] ceph: convert to sparse reads Jeff Layton
  4 siblings, 0 replies; 15+ messages in thread
From: Jeff Layton @ 2022-03-18 13:50 UTC (permalink / raw)
  To: idryomov, xiubli; +Cc: ceph-devel

Add support for a new sparse_read ceph_connection operation. The idea is
that the client driver can define this operation use it to do special
handling for incoming reads.

The alloc_msg routine will look at the request and determine whether the
reply is expected to be sparse. If it is, then we'll dispatch to a
different set of state machine states that will repeatedly call the
driver's sparse_read op to get length and placement info for reading the
extent map, and the extents themselves.

This necessitates adding some new field to some other structs:

- The msg gets a new bool to track whether it's a sparse_read request.

- A new field is added to the cursor to track the amount remaining in the
current extent. This is used to cap the read from the socket into the
msg_data

- Handing a revoke with all of this is particularly difficult, so I've
added a new data_len_remain field to the v2 connection info, and then
use that to skip that much on a revoke. We may want to expand the use of
that to the normal read path as well, just for consistency's sake.

Signed-off-by: Jeff Layton <jlayton@kernel.org>
---
 include/linux/ceph/messenger.h |  29 ++++++
 net/ceph/messenger.c           |   1 +
 net/ceph/messenger_v2.c        | 164 +++++++++++++++++++++++++++++++--
 3 files changed, 185 insertions(+), 9 deletions(-)

diff --git a/include/linux/ceph/messenger.h b/include/linux/ceph/messenger.h
index e7f2fb2fc207..25213eb1d348 100644
--- a/include/linux/ceph/messenger.h
+++ b/include/linux/ceph/messenger.h
@@ -17,6 +17,7 @@
 
 struct ceph_msg;
 struct ceph_connection;
+struct ceph_msg_data_cursor;
 
 /*
  * Ceph defines these callbacks for handling connection events.
@@ -70,6 +71,31 @@ struct ceph_connection_operations {
 				      int used_proto, int result,
 				      const int *allowed_protos, int proto_cnt,
 				      const int *allowed_modes, int mode_cnt);
+
+	/**
+	 * sparse_read: read sparse data
+	 * @con: connection we're reading from
+	 * @cursor: data cursor for reading extents
+	 * @len: len of the data that msgr should read
+	 * @buf: optional buffer to read into
+	 *
+	 * This should be called more than once, each time setting up to
+	 * receive an extent into the current cursor position, and zeroing
+	 * the holes between them.
+	 *
+	 * Returns 1 if there is more data to be read, 0 if reading is
+	 * complete, or -errno if there was an error.
+	 *
+	 * If @buf is set on a 1 return, then the data should be read into
+	 * the provided buffer. Otherwise, it should be read into the cursor.
+	 *
+	 * The sparse read operation is expected to initialize the cursor
+	 * with a length covering up to the end of the last extent.
+	 */
+	int (*sparse_read)(struct ceph_connection *con,
+			   struct ceph_msg_data_cursor *cursor,
+			   u64 *len, char **buf);
+
 };
 
 /* use format string %s%lld */
@@ -207,6 +233,7 @@ struct ceph_msg_data_cursor {
 
 	struct ceph_msg_data	*data;		/* current data item */
 	size_t			resid;		/* bytes not yet consumed */
+	int			sr_resid;	/* residual sparse_read len */
 	bool			last_piece;	/* current is last piece */
 	bool			need_crc;	/* crc update needed */
 	union {
@@ -252,6 +279,7 @@ struct ceph_msg {
 	struct kref kref;
 	bool more_to_follow;
 	bool needs_out_seq;
+	bool sparse_read;
 	int front_alloc_len;
 
 	struct ceph_msgpool *pool;
@@ -396,6 +424,7 @@ struct ceph_connection_v2_info {
 
 	void *conn_bufs[16];
 	int conn_buf_cnt;
+	int data_len_remain;
 
 	struct kvec in_sign_kvecs[8];
 	struct kvec out_sign_kvecs[8];
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index d3bb656308b4..bf4e7f5751ee 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -1034,6 +1034,7 @@ void ceph_msg_data_cursor_init(struct ceph_msg_data_cursor *cursor,
 
 	cursor->total_resid = length;
 	cursor->data = msg->data;
+	cursor->sr_resid = 0;
 
 	__ceph_msg_data_cursor_init(cursor);
 }
diff --git a/net/ceph/messenger_v2.c b/net/ceph/messenger_v2.c
index c6e5bfc717d5..873db58fe516 100644
--- a/net/ceph/messenger_v2.c
+++ b/net/ceph/messenger_v2.c
@@ -52,14 +52,16 @@
 #define FRAME_LATE_STATUS_COMPLETE	0xe
 #define FRAME_LATE_STATUS_ABORTED_MASK	0xf
 
-#define IN_S_HANDLE_PREAMBLE		1
-#define IN_S_HANDLE_CONTROL		2
-#define IN_S_HANDLE_CONTROL_REMAINDER	3
-#define IN_S_PREPARE_READ_DATA		4
-#define IN_S_PREPARE_READ_DATA_CONT	5
-#define IN_S_PREPARE_READ_ENC_PAGE	6
-#define IN_S_HANDLE_EPILOGUE		7
-#define IN_S_FINISH_SKIP		8
+#define IN_S_HANDLE_PREAMBLE			1
+#define IN_S_HANDLE_CONTROL			2
+#define IN_S_HANDLE_CONTROL_REMAINDER		3
+#define IN_S_PREPARE_READ_DATA			4
+#define IN_S_PREPARE_READ_DATA_CONT		5
+#define IN_S_PREPARE_READ_ENC_PAGE		6
+#define IN_S_PREPARE_SPARSE_DATA		7
+#define IN_S_PREPARE_SPARSE_DATA_CONT		8
+#define IN_S_HANDLE_EPILOGUE			9
+#define IN_S_FINISH_SKIP			10
 
 #define OUT_S_QUEUE_DATA		1
 #define OUT_S_QUEUE_DATA_CONT		2
@@ -1819,6 +1821,120 @@ static void prepare_read_data_cont(struct ceph_connection *con)
 	con->v2.in_state = IN_S_HANDLE_EPILOGUE;
 }
 
+static int prepare_sparse_read_cont(struct ceph_connection *con)
+{
+	int ret;
+	struct bio_vec bv;
+	char *buf = NULL;
+	struct ceph_msg_data_cursor *cursor = &con->v2.in_cursor;
+	u64 len = 0;
+
+	WARN_ON(con->v2.in_state != IN_S_PREPARE_SPARSE_DATA_CONT);
+
+	if (iov_iter_is_bvec(&con->v2.in_iter)) {
+		if (ceph_test_opt(from_msgr(con->msgr), RXBOUNCE)) {
+			con->in_data_crc = crc32c(con->in_data_crc,
+						page_address(con->bounce_page),
+						con->v2.in_bvec.bv_len);
+			get_bvec_at(cursor, &bv);
+			memcpy_to_page(bv.bv_page, bv.bv_offset,
+					page_address(con->bounce_page),
+					con->v2.in_bvec.bv_len);
+		} else {
+			con->in_data_crc = ceph_crc32c_page(con->in_data_crc,
+						    con->v2.in_bvec.bv_page,
+						    con->v2.in_bvec.bv_offset,
+						    con->v2.in_bvec.bv_len);
+		}
+
+		ceph_msg_data_advance(cursor, con->v2.in_bvec.bv_len);
+		cursor->sr_resid -= con->v2.in_bvec.bv_len;
+		dout("%s: advance by 0x%x sr_resid 0x%x\n", __func__,
+			con->v2.in_bvec.bv_len, cursor->sr_resid);
+		WARN_ON_ONCE(cursor->sr_resid > cursor->total_resid);
+		if (cursor->sr_resid) {
+			get_bvec_at(cursor, &bv);
+			if (bv.bv_len > cursor->sr_resid)
+				bv.bv_len = cursor->sr_resid;
+			if (ceph_test_opt(from_msgr(con->msgr), RXBOUNCE)) {
+				bv.bv_page = con->bounce_page;
+				bv.bv_offset = 0;
+			}
+			set_in_bvec(con, &bv);
+			con->v2.data_len_remain -= bv.bv_len;
+			return 0;
+		}
+	} else if (iov_iter_is_kvec(&con->v2.in_iter)) {
+		/* On first call, we have no kvec so don't compute crc */
+		if (con->v2.in_kvec_cnt) {
+			WARN_ON_ONCE(con->v2.in_kvec_cnt > 1);
+			con->in_data_crc = crc32c(con->in_data_crc,
+					  con->v2.in_kvecs[0].iov_base,
+					  con->v2.in_kvecs[0].iov_len);
+		}
+	} else {
+		return -EIO;
+	}
+
+	/* get next extent */
+	ret = con->ops->sparse_read(con, cursor, &len, &buf);
+	if (ret <= 0) {
+		if (ret < 0)
+			return ret;
+
+		reset_in_kvecs(con);
+		add_in_kvec(con, con->v2.in_buf, CEPH_EPILOGUE_PLAIN_LEN);
+		con->v2.in_state = IN_S_HANDLE_EPILOGUE;
+		return 0;
+	}
+
+	if (buf) {
+		/* receive into buffer */
+		reset_in_kvecs(con);
+		add_in_kvec(con, buf, len);
+		con->v2.data_len_remain -= len;
+		return 0;
+	}
+
+	cursor->sr_resid = len;
+	get_bvec_at(cursor, &bv);
+	if (bv.bv_len > cursor->sr_resid)
+		bv.bv_len = cursor->sr_resid;
+	if (ceph_test_opt(from_msgr(con->msgr), RXBOUNCE)) {
+		if (unlikely(!con->bounce_page)) {
+			con->bounce_page = alloc_page(GFP_NOIO);
+			if (!con->bounce_page) {
+				pr_err("failed to allocate bounce page\n");
+				return -ENOMEM;
+			}
+		}
+
+		bv.bv_page = con->bounce_page;
+		bv.bv_offset = 0;
+	}
+	set_in_bvec(con, &bv);
+	con->v2.data_len_remain -= len;
+	return ret;
+}
+
+static int prepare_sparse_read_data(struct ceph_connection *con)
+{
+	struct ceph_msg *msg = con->in_msg;
+
+	dout("%s: starting sparse read\n", __func__);
+
+	if (WARN_ON_ONCE(!con->ops->sparse_read))
+		return -EOPNOTSUPP;
+
+	if (!con_secure(con))
+		con->in_data_crc = -1;
+
+	reset_in_kvecs(con);
+	con->v2.in_state = IN_S_PREPARE_SPARSE_DATA_CONT;
+	con->v2.data_len_remain = data_len(msg);
+	return prepare_sparse_read_cont(con);
+}
+
 static int prepare_read_tail_plain(struct ceph_connection *con)
 {
 	struct ceph_msg *msg = con->in_msg;
@@ -1839,7 +1955,10 @@ static int prepare_read_tail_plain(struct ceph_connection *con)
 	}
 
 	if (data_len(msg)) {
-		con->v2.in_state = IN_S_PREPARE_READ_DATA;
+		if (msg->sparse_read)
+			con->v2.in_state = IN_S_PREPARE_SPARSE_DATA;
+		else
+			con->v2.in_state = IN_S_PREPARE_READ_DATA;
 	} else {
 		add_in_kvec(con, con->v2.in_buf, CEPH_EPILOGUE_PLAIN_LEN);
 		con->v2.in_state = IN_S_HANDLE_EPILOGUE;
@@ -2893,6 +3012,12 @@ static int populate_in_iter(struct ceph_connection *con)
 			prepare_read_enc_page(con);
 			ret = 0;
 			break;
+		case IN_S_PREPARE_SPARSE_DATA:
+			ret = prepare_sparse_read_data(con);
+			break;
+		case IN_S_PREPARE_SPARSE_DATA_CONT:
+			ret = prepare_sparse_read_cont(con);
+			break;
 		case IN_S_HANDLE_EPILOGUE:
 			ret = handle_epilogue(con);
 			break;
@@ -3485,6 +3610,23 @@ static void revoke_at_prepare_read_enc_page(struct ceph_connection *con)
 	con->v2.in_state = IN_S_FINISH_SKIP;
 }
 
+static void revoke_at_prepare_sparse_data(struct ceph_connection *con)
+{
+	int resid;  /* current piece of data */
+	int remaining;
+
+	WARN_ON(con_secure(con));
+	WARN_ON(!data_len(con->in_msg));
+	WARN_ON(!iov_iter_is_bvec(&con->v2.in_iter));
+	resid = iov_iter_count(&con->v2.in_iter);
+	dout("%s con %p resid %d\n", __func__, con, resid);
+
+	remaining = CEPH_EPILOGUE_PLAIN_LEN + con->v2.data_len_remain;
+	con->v2.in_iter.count -= resid;
+	set_in_skip(con, resid + remaining);
+	con->v2.in_state = IN_S_FINISH_SKIP;
+}
+
 static void revoke_at_handle_epilogue(struct ceph_connection *con)
 {
 	int resid;
@@ -3501,6 +3643,7 @@ static void revoke_at_handle_epilogue(struct ceph_connection *con)
 void ceph_con_v2_revoke_incoming(struct ceph_connection *con)
 {
 	switch (con->v2.in_state) {
+	case IN_S_PREPARE_SPARSE_DATA:
 	case IN_S_PREPARE_READ_DATA:
 		revoke_at_prepare_read_data(con);
 		break;
@@ -3510,6 +3653,9 @@ void ceph_con_v2_revoke_incoming(struct ceph_connection *con)
 	case IN_S_PREPARE_READ_ENC_PAGE:
 		revoke_at_prepare_read_enc_page(con);
 		break;
+	case IN_S_PREPARE_SPARSE_DATA_CONT:
+		revoke_at_prepare_sparse_data(con);
+		break;
 	case IN_S_HANDLE_EPILOGUE:
 		revoke_at_handle_epilogue(con);
 		break;
-- 
2.35.1


^ permalink raw reply related	[flat|nested] 15+ messages in thread

* [PATCH v3 4/5] libceph: add sparse read support to OSD client
  2022-03-18 13:50 [PATCH v3 0/5] ceph/libceph: add support for sparse reads to msgr2 crc codepath Jeff Layton
                   ` (2 preceding siblings ...)
  2022-03-18 13:50 ` [PATCH v3 3/5] libceph: add sparse read support to msgr2 crc state machine Jeff Layton
@ 2022-03-18 13:50 ` Jeff Layton
  2022-03-21  8:41   ` Xiubo Li
                     ` (2 more replies)
  2022-03-18 13:50 ` [PATCH v3 5/5] ceph: convert to sparse reads Jeff Layton
  4 siblings, 3 replies; 15+ messages in thread
From: Jeff Layton @ 2022-03-18 13:50 UTC (permalink / raw)
  To: idryomov, xiubli; +Cc: ceph-devel

Add a new sparse_read operation for the OSD client, driven by its own
state machine. The messenger can repeatedly call the sparse_read
operation, and it will pass back the necessary info to set up to read
the next extent of data, while zero-filling the sparse regions.

Signed-off-by: Jeff Layton <jlayton@kernel.org>
---
 include/linux/ceph/osd_client.h |  32 +++++
 net/ceph/osd_client.c           | 238 +++++++++++++++++++++++++++++++-
 2 files changed, 266 insertions(+), 4 deletions(-)

diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h
index 00a5b53a6763..2c5f9eb7d888 100644
--- a/include/linux/ceph/osd_client.h
+++ b/include/linux/ceph/osd_client.h
@@ -40,6 +40,36 @@ struct ceph_sparse_extent {
 	u64	len;
 } __attribute__((packed));
 
+/* Sparse read state machine state values */
+enum ceph_sparse_read_state {
+	CEPH_SPARSE_READ_HDR	= 0,
+	CEPH_SPARSE_READ_EXTENTS,
+	CEPH_SPARSE_READ_DATA_LEN,
+	CEPH_SPARSE_READ_DATA,
+};
+
+/*
+ * A SPARSE_READ reply is a 32-bit count of extents, followed by an array of
+ * 64-bit offset/length pairs, and then all of the actual file data
+ * concatenated after it (sans holes).
+ *
+ * Unfortunately, we don't know how long the extent array is until we've
+ * started reading the data section of the reply. The caller should send down
+ * a destination buffer for the array, but we'll alloc one if it's too small
+ * or if the caller doesn't.
+ */
+struct ceph_sparse_read {
+	enum ceph_sparse_read_state	sr_state;	/* state machine state */
+	u64				sr_req_off;	/* orig request offset */
+	u64				sr_req_len;	/* orig request length */
+	u64				sr_pos;		/* current pos in buffer */
+	int				sr_index;	/* current extent index */
+	__le32				sr_datalen;	/* length of actual data */
+	u32				sr_count;	/* extent count in reply */
+	int				sr_ext_len;	/* length of extent array */
+	struct ceph_sparse_extent	*sr_extent;	/* extent array */
+};
+
 /*
  * A given osd we're communicating with.
  *
@@ -48,6 +78,7 @@ struct ceph_sparse_extent {
  */
 struct ceph_osd {
 	refcount_t o_ref;
+	int o_sparse_op_idx;
 	struct ceph_osd_client *o_osdc;
 	int o_osd;
 	int o_incarnation;
@@ -63,6 +94,7 @@ struct ceph_osd {
 	unsigned long lru_ttl;
 	struct list_head o_keepalive_item;
 	struct mutex lock;
+	struct ceph_sparse_read	o_sparse_read;
 };
 
 #define CEPH_OSD_SLAB_OPS	2
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index 9fec258e1f8d..3694696c8a31 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -376,6 +376,7 @@ static void osd_req_op_data_release(struct ceph_osd_request *osd_req,
 
 	switch (op->op) {
 	case CEPH_OSD_OP_READ:
+	case CEPH_OSD_OP_SPARSE_READ:
 	case CEPH_OSD_OP_WRITE:
 	case CEPH_OSD_OP_WRITEFULL:
 		kfree(op->extent.sparse_ext);
@@ -707,6 +708,7 @@ static void get_num_data_items(struct ceph_osd_request *req,
 		/* reply */
 		case CEPH_OSD_OP_STAT:
 		case CEPH_OSD_OP_READ:
+		case CEPH_OSD_OP_SPARSE_READ:
 		case CEPH_OSD_OP_LIST_WATCHERS:
 			*num_reply_data_items += 1;
 			break;
@@ -776,7 +778,7 @@ void osd_req_op_extent_init(struct ceph_osd_request *osd_req,
 
 	BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE &&
 	       opcode != CEPH_OSD_OP_WRITEFULL && opcode != CEPH_OSD_OP_ZERO &&
-	       opcode != CEPH_OSD_OP_TRUNCATE);
+	       opcode != CEPH_OSD_OP_TRUNCATE && opcode != CEPH_OSD_OP_SPARSE_READ);
 
 	op->extent.offset = offset;
 	op->extent.length = length;
@@ -985,6 +987,7 @@ static u32 osd_req_encode_op(struct ceph_osd_op *dst,
 	case CEPH_OSD_OP_STAT:
 		break;
 	case CEPH_OSD_OP_READ:
+	case CEPH_OSD_OP_SPARSE_READ:
 	case CEPH_OSD_OP_WRITE:
 	case CEPH_OSD_OP_WRITEFULL:
 	case CEPH_OSD_OP_ZERO:
@@ -1081,7 +1084,8 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
 
 	BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE &&
 	       opcode != CEPH_OSD_OP_ZERO && opcode != CEPH_OSD_OP_TRUNCATE &&
-	       opcode != CEPH_OSD_OP_CREATE && opcode != CEPH_OSD_OP_DELETE);
+	       opcode != CEPH_OSD_OP_CREATE && opcode != CEPH_OSD_OP_DELETE &&
+	       opcode != CEPH_OSD_OP_SPARSE_READ);
 
 	req = ceph_osdc_alloc_request(osdc, snapc, num_ops, use_mempool,
 					GFP_NOFS);
@@ -1222,6 +1226,13 @@ static void osd_init(struct ceph_osd *osd)
 	mutex_init(&osd->lock);
 }
 
+static void ceph_init_sparse_read(struct ceph_sparse_read *sr)
+{
+	kfree(sr->sr_extent);
+	memset(sr, '\0', sizeof(*sr));
+	sr->sr_state = CEPH_SPARSE_READ_HDR;
+}
+
 static void osd_cleanup(struct ceph_osd *osd)
 {
 	WARN_ON(!RB_EMPTY_NODE(&osd->o_node));
@@ -1232,6 +1243,8 @@ static void osd_cleanup(struct ceph_osd *osd)
 	WARN_ON(!list_empty(&osd->o_osd_lru));
 	WARN_ON(!list_empty(&osd->o_keepalive_item));
 
+	ceph_init_sparse_read(&osd->o_sparse_read);
+
 	if (osd->o_auth.authorizer) {
 		WARN_ON(osd_homeless(osd));
 		ceph_auth_destroy_authorizer(osd->o_auth.authorizer);
@@ -1251,6 +1264,9 @@ static struct ceph_osd *create_osd(struct ceph_osd_client *osdc, int onum)
 	osd_init(osd);
 	osd->o_osdc = osdc;
 	osd->o_osd = onum;
+	osd->o_sparse_op_idx = -1;
+
+	ceph_init_sparse_read(&osd->o_sparse_read);
 
 	ceph_con_init(&osd->o_con, osd, &osd_con_ops, &osdc->client->msgr);
 
@@ -2055,6 +2071,7 @@ static void setup_request_data(struct ceph_osd_request *req)
 					       &op->raw_data_in);
 			break;
 		case CEPH_OSD_OP_READ:
+		case CEPH_OSD_OP_SPARSE_READ:
 			ceph_osdc_msg_data_add(reply_msg,
 					       &op->extent.osd_data);
 			break;
@@ -2470,8 +2487,10 @@ static void finish_request(struct ceph_osd_request *req)
 
 	req->r_end_latency = ktime_get();
 
-	if (req->r_osd)
+	if (req->r_osd) {
+		ceph_init_sparse_read(&req->r_osd->o_sparse_read);
 		unlink_request(req->r_osd, req);
+	}
 	atomic_dec(&osdc->num_requests);
 
 	/*
@@ -5416,6 +5435,24 @@ static void osd_dispatch(struct ceph_connection *con, struct ceph_msg *msg)
 	ceph_msg_put(msg);
 }
 
+/* How much sparse data was requested? */
+static u64 sparse_data_requested(struct ceph_osd_request *req)
+{
+	u64 len = 0;
+
+	if (req->r_flags & CEPH_OSD_FLAG_READ) {
+		int i;
+
+		for (i = 0; i < req->r_num_ops; ++i) {
+			struct ceph_osd_req_op *op = &req->r_ops[i];
+
+			if (op->op == CEPH_OSD_OP_SPARSE_READ)
+				len += op->extent.length;
+		}
+	}
+	return len;
+}
+
 /*
  * Lookup and return message for incoming reply.  Don't try to do
  * anything about a larger than preallocated data portion of the
@@ -5432,6 +5469,7 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
 	int front_len = le32_to_cpu(hdr->front_len);
 	int data_len = le32_to_cpu(hdr->data_len);
 	u64 tid = le64_to_cpu(hdr->tid);
+	u64 srlen;
 
 	down_read(&osdc->lock);
 	if (!osd_registered(osd)) {
@@ -5464,7 +5502,8 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
 		req->r_reply = m;
 	}
 
-	if (data_len > req->r_reply->data_length) {
+	srlen = sparse_data_requested(req);
+	if (!srlen && (data_len > req->r_reply->data_length)) {
 		pr_warn("%s osd%d tid %llu data %d > preallocated %zu, skipping\n",
 			__func__, osd->o_osd, req->r_tid, data_len,
 			req->r_reply->data_length);
@@ -5474,6 +5513,8 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
 	}
 
 	m = ceph_msg_get(req->r_reply);
+	m->sparse_read = (bool)srlen;
+
 	dout("get_reply tid %lld %p\n", tid, m);
 
 out_unlock_session:
@@ -5706,9 +5747,198 @@ static int osd_check_message_signature(struct ceph_msg *msg)
 	return ceph_auth_check_message_signature(auth, msg);
 }
 
+static void advance_cursor(struct ceph_msg_data_cursor *cursor, size_t len, bool zero)
+{
+	while (len) {
+		struct page *page;
+		size_t poff, plen;
+		bool last = false;
+
+		page = ceph_msg_data_next(cursor, &poff, &plen, &last);
+		if (plen > len)
+			plen = len;
+		if (zero)
+			zero_user_segment(page, poff, poff + plen);
+		len -= plen;
+		ceph_msg_data_advance(cursor, plen);
+	}
+}
+
+static int prep_next_sparse_read(struct ceph_connection *con,
+				 struct ceph_msg_data_cursor *cursor)
+{
+	struct ceph_osd *o = con->private;
+	struct ceph_sparse_read *sr = &o->o_sparse_read;
+	struct ceph_osd_request *req;
+	struct ceph_osd_req_op *op;
+
+	spin_lock(&o->o_requests_lock);
+	req = lookup_request(&o->o_requests, le64_to_cpu(con->in_msg->hdr.tid));
+	if (!req) {
+		spin_unlock(&o->o_requests_lock);
+		return -EBADR;
+	}
+
+	if (o->o_sparse_op_idx < 0) {
+		u64 srlen = sparse_data_requested(req);
+
+		dout("%s: [%d] starting new sparse read req. srlen=0x%llx\n",
+			__func__, o->o_osd, srlen);
+		ceph_msg_data_cursor_init(cursor, con->in_msg, srlen);
+	} else {
+		op = &req->r_ops[o->o_sparse_op_idx];
+
+		WARN_ON_ONCE(op->extent.sparse_ext);
+
+		/* hand back buffer we took earlier */
+		op->extent.sparse_ext = sr->sr_extent;
+		sr->sr_extent = NULL;
+		op->extent.sparse_ext_len = sr->sr_count;
+		sr->sr_ext_len = 0;
+		dout("%s: [%d] completed extent array len %d cursor->resid %zd\n",
+			__func__, o->o_osd, op->extent.sparse_ext_len,
+			cursor->resid);
+		/*
+		 * FIXME: Need to advance to the next data item here, in the
+		 * event that there are multiple sparse read requests. Is this
+		 * the right way to do that?
+		 */
+		if (cursor->resid)
+			advance_cursor(cursor, cursor->resid, false);
+	}
+
+	ceph_init_sparse_read(sr);
+
+	/* find next op in this request (if any) */
+	while (++o->o_sparse_op_idx < req->r_num_ops) {
+		op = &req->r_ops[o->o_sparse_op_idx];
+		if (op->op == CEPH_OSD_OP_SPARSE_READ)
+			goto found;
+	}
+
+	/* reset for next sparse read request */
+	spin_unlock(&o->o_requests_lock);
+	o->o_sparse_op_idx = -1;
+	return 0;
+found:
+	sr->sr_req_off = op->extent.offset;
+	sr->sr_req_len = op->extent.length;
+	sr->sr_pos = sr->sr_req_off;
+	dout("%s: [%d] new sparse read op at idx %d 0x%llx~0x%llx\n", __func__,
+		o->o_osd, o->o_sparse_op_idx, sr->sr_req_off, sr->sr_req_len);
+
+	/* hand off request's sparse extent map buffer */
+	sr->sr_ext_len = op->extent.sparse_ext_len;
+	op->extent.sparse_ext_len = 0;
+	sr->sr_extent = op->extent.sparse_ext;
+	op->extent.sparse_ext = NULL;
+
+	spin_unlock(&o->o_requests_lock);
+	return 1;
+}
+
+#ifdef __BIG_ENDIAN
+static inline void convert_extent_map(struct ceph_sparse_read *sr)
+{
+	int i;
+
+	for (i = 0; i < sr->sr_count; i++) {
+		struct ceph_sparse_extent *ext = sr->sr_extent[i];
+
+		ext->off = le64_to_cpu((__force __le32)ext->off);
+		ext->len = le64_to_cpu((__force __le32)ext->len);
+	}
+}
+#else
+static inline void convert_extent_map(struct ceph_sparse_read *sr)
+{
+}
+#endif
+
+static int osd_sparse_read(struct ceph_connection *con,
+			   struct ceph_msg_data_cursor *cursor,
+			   u64 *plen, char **pbuf)
+{
+	struct ceph_osd *o = con->private;
+	struct ceph_sparse_read *sr = &o->o_sparse_read;
+	u32 count = sr->sr_count;
+	u64 eoff, elen;
+	int ret;
+
+	switch (sr->sr_state) {
+	case CEPH_SPARSE_READ_HDR:
+next_op:
+		ret = prep_next_sparse_read(con, cursor);
+		if (ret <= 0)
+			return ret;
+
+		/* number of extents */
+		*plen = sizeof(sr->sr_count);
+		*pbuf = (char *)&sr->sr_count;
+		sr->sr_state = CEPH_SPARSE_READ_EXTENTS;
+		break;
+	case CEPH_SPARSE_READ_EXTENTS:
+		/* Convert sr_count to host-endian */
+		count = le32_to_cpu((__force __le32)sr->sr_count);
+		sr->sr_count = count;
+		dout("[%d] got %u extents\n", o->o_osd, count);
+
+		if (count > 0) {
+			if (!sr->sr_extent || count > sr->sr_ext_len) {
+				/* no extent array provided, or too short */
+				kfree(sr->sr_extent);
+				sr->sr_extent = kmalloc_array(count, sizeof(*sr->sr_extent),
+							   GFP_NOIO);
+				if (!sr->sr_extent)
+					return -ENOMEM;
+				sr->sr_ext_len = count;
+			}
+			*plen = count * sizeof(*sr->sr_extent);
+			*pbuf = (char *)sr->sr_extent;
+			sr->sr_state = CEPH_SPARSE_READ_DATA_LEN;
+			break;
+		}
+		/* No extents? Fall through to reading data len */
+		fallthrough;
+	case CEPH_SPARSE_READ_DATA_LEN:
+		convert_extent_map(sr);
+		*plen = sizeof(sr->sr_datalen);
+		*pbuf = (char *)&sr->sr_datalen;
+		sr->sr_state = CEPH_SPARSE_READ_DATA;
+		break;
+	case CEPH_SPARSE_READ_DATA:
+		if (sr->sr_index >= count) {
+			sr->sr_state = CEPH_SPARSE_READ_HDR;
+			goto next_op;
+		}
+
+		eoff = sr->sr_extent[sr->sr_index].off;
+		elen = sr->sr_extent[sr->sr_index].len;
+
+		dout("[%d] ext %d off 0x%llx len 0x%llx\n",
+		     o->o_osd, sr->sr_index, eoff, elen);
+
+		/* zero out anything from sr_pos to start of extent */
+		if (sr->sr_pos < eoff)
+			advance_cursor(cursor, eoff - sr->sr_pos, true);
+
+		/* Set position to end of extent */
+		sr->sr_pos = eoff + elen;
+
+		/* send back the new length */
+		*plen = elen;
+
+		/* Bump the array index */
+		++sr->sr_index;
+		break;
+	}
+	return 1;
+}
+
 static const struct ceph_connection_operations osd_con_ops = {
 	.get = osd_get_con,
 	.put = osd_put_con,
+	.sparse_read = osd_sparse_read,
 	.alloc_msg = osd_alloc_msg,
 	.dispatch = osd_dispatch,
 	.fault = osd_fault,
-- 
2.35.1


^ permalink raw reply related	[flat|nested] 15+ messages in thread

* [PATCH v3 5/5] ceph: convert to sparse reads
  2022-03-18 13:50 [PATCH v3 0/5] ceph/libceph: add support for sparse reads to msgr2 crc codepath Jeff Layton
                   ` (3 preceding siblings ...)
  2022-03-18 13:50 ` [PATCH v3 4/5] libceph: add sparse read support to OSD client Jeff Layton
@ 2022-03-18 13:50 ` Jeff Layton
  2022-03-21 12:30   ` Jeff Layton
  4 siblings, 1 reply; 15+ messages in thread
From: Jeff Layton @ 2022-03-18 13:50 UTC (permalink / raw)
  To: idryomov, xiubli; +Cc: ceph-devel

Have ceph issue sparse reads instead of normal ones. The callers now
preallocate an sparse extent buffer that the libceph receive code can
populate and hand back after the operation completes.

After a successful read, we can't use the req->r_result value to
determine the amount of data "read", so instead we set the received
length to be from the end of the last extent in the buffer. Any
interstitial holes will have been filled by the receive code.

Signed-off-by: Jeff Layton <jlayton@kernel.org>
---
 fs/ceph/addr.c  | 13 +++++++++++--
 fs/ceph/file.c  | 41 ++++++++++++++++++++++++++++++++++-------
 fs/ceph/super.h |  7 +++++++
 3 files changed, 52 insertions(+), 9 deletions(-)

diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c
index 752c421c9922..6d4f9fbf22ce 100644
--- a/fs/ceph/addr.c
+++ b/fs/ceph/addr.c
@@ -220,6 +220,7 @@ static void finish_netfs_read(struct ceph_osd_request *req)
 	struct ceph_fs_client *fsc = ceph_inode_to_client(req->r_inode);
 	struct ceph_osd_data *osd_data = osd_req_op_extent_osd_data(req, 0);
 	struct netfs_read_subrequest *subreq = req->r_priv;
+	struct ceph_osd_req_op *op = &req->r_ops[0];
 	int num_pages;
 	int err = req->r_result;
 
@@ -230,7 +231,9 @@ static void finish_netfs_read(struct ceph_osd_request *req)
 	     subreq->len, i_size_read(req->r_inode));
 
 	/* no object means success but no data */
-	if (err == -ENOENT)
+	if (err >= 0)
+		err = ceph_sparse_ext_map_end(op);
+	else if (err == -ENOENT)
 		err = 0;
 	else if (err == -EBLOCKLISTED)
 		fsc->blocklisted = true;
@@ -317,7 +320,7 @@ static void ceph_netfs_issue_op(struct netfs_read_subrequest *subreq)
 		return;
 
 	req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, vino, subreq->start, &len,
-			0, 1, CEPH_OSD_OP_READ,
+			0, 1, CEPH_OSD_OP_SPARSE_READ,
 			CEPH_OSD_FLAG_READ | fsc->client->osdc.client->options->read_from_replica,
 			NULL, ci->i_truncate_seq, ci->i_truncate_size, false);
 	if (IS_ERR(req)) {
@@ -326,6 +329,12 @@ static void ceph_netfs_issue_op(struct netfs_read_subrequest *subreq)
 		goto out;
 	}
 
+	err = ceph_alloc_sparse_ext_map(&req->r_ops[0], CEPH_SPARSE_EXT_ARRAY_INITIAL);
+	if (err) {
+		ceph_osdc_put_request(req);
+		goto out;
+	}
+
 	dout("%s: pos=%llu orig_len=%zu len=%llu\n", __func__, subreq->start, subreq->len, len);
 	iov_iter_xarray(&iter, READ, &rreq->mapping->i_pages, subreq->start, len);
 	err = iov_iter_get_pages_alloc(&iter, &pages, len, &page_off);
diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index feb75eb1cd82..deba39989a07 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -931,10 +931,11 @@ static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *to,
 		bool more;
 		int idx;
 		size_t left;
+		struct ceph_osd_req_op *op;
 
 		req = ceph_osdc_new_request(osdc, &ci->i_layout,
 					ci->i_vino, off, &len, 0, 1,
-					CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ,
+					CEPH_OSD_OP_SPARSE_READ, CEPH_OSD_FLAG_READ,
 					NULL, ci->i_truncate_seq,
 					ci->i_truncate_size, false);
 		if (IS_ERR(req)) {
@@ -955,6 +956,14 @@ static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *to,
 
 		osd_req_op_extent_osd_data_pages(req, 0, pages, len, page_off,
 						 false, false);
+
+		op = &req->r_ops[0];
+		ret = ceph_alloc_sparse_ext_map(op, CEPH_SPARSE_EXT_ARRAY_INITIAL);
+		if (ret) {
+			ceph_osdc_put_request(req);
+			break;
+		}
+
 		ret = ceph_osdc_start_request(osdc, req, false);
 		if (!ret)
 			ret = ceph_osdc_wait_request(osdc, req);
@@ -964,23 +973,28 @@ static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *to,
 					 req->r_end_latency,
 					 len, ret);
 
-		ceph_osdc_put_request(req);
-
 		i_size = i_size_read(inode);
 		dout("sync_read %llu~%llu got %zd i_size %llu%s\n",
 		     off, len, ret, i_size, (more ? " MORE" : ""));
 
-		if (ret == -ENOENT)
+		/* Fix it to go to end of extent map */
+		if (ret >= 0)
+			ret = ceph_sparse_ext_map_end(op);
+		else if (ret == -ENOENT)
 			ret = 0;
+
 		if (ret >= 0 && ret < len && (off + ret < i_size)) {
 			int zlen = min(len - ret, i_size - off - ret);
 			int zoff = page_off + ret;
+
 			dout("sync_read zero gap %llu~%llu\n",
-                             off + ret, off + ret + zlen);
+				off + ret, off + ret + zlen);
 			ceph_zero_page_vector_range(zoff, zlen, pages);
 			ret += zlen;
 		}
 
+		ceph_osdc_put_request(req);
+
 		idx = 0;
 		left = ret > 0 ? ret : 0;
 		while (left > 0) {
@@ -1095,6 +1109,7 @@ static void ceph_aio_complete_req(struct ceph_osd_request *req)
 	struct inode *inode = req->r_inode;
 	struct ceph_aio_request *aio_req = req->r_priv;
 	struct ceph_osd_data *osd_data = osd_req_op_extent_osd_data(req, 0);
+	struct ceph_osd_req_op *op = &req->r_ops[0];
 	struct ceph_client_metric *metric = &ceph_sb_to_mdsc(inode->i_sb)->metric;
 	unsigned int len = osd_data->bvec_pos.iter.bi_size;
 
@@ -1117,6 +1132,8 @@ static void ceph_aio_complete_req(struct ceph_osd_request *req)
 		}
 		rc = -ENOMEM;
 	} else if (!aio_req->write) {
+		if (rc >= 0)
+			rc = ceph_sparse_ext_map_end(op);
 		if (rc == -ENOENT)
 			rc = 0;
 		if (rc >= 0 && len > rc) {
@@ -1280,6 +1297,7 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter,
 	while (iov_iter_count(iter) > 0) {
 		u64 size = iov_iter_count(iter);
 		ssize_t len;
+		struct ceph_osd_req_op *op;
 
 		if (write)
 			size = min_t(u64, size, fsc->mount_options->wsize);
@@ -1291,7 +1309,7 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter,
 					    vino, pos, &size, 0,
 					    1,
 					    write ? CEPH_OSD_OP_WRITE :
-						    CEPH_OSD_OP_READ,
+						    CEPH_OSD_OP_SPARSE_READ,
 					    flags, snapc,
 					    ci->i_truncate_seq,
 					    ci->i_truncate_size,
@@ -1342,6 +1360,12 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter,
 		}
 
 		osd_req_op_extent_osd_data_bvecs(req, 0, bvecs, num_pages, len);
+		op = &req->r_ops[0];
+		ret = ceph_alloc_sparse_ext_map(op, CEPH_SPARSE_EXT_ARRAY_INITIAL);
+		if (ret) {
+			ceph_osdc_put_request(req);
+			break;
+		}
 
 		if (aio_req) {
 			aio_req->total_len += len;
@@ -1370,8 +1394,11 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter,
 
 		size = i_size_read(inode);
 		if (!write) {
-			if (ret == -ENOENT)
+			if (ret >= 0)
+				ret = ceph_sparse_ext_map_end(op);
+			else if (ret == -ENOENT)
 				ret = 0;
+
 			if (ret >= 0 && ret < len && pos + ret < size) {
 				struct iov_iter i;
 				int zlen = min_t(size_t, len - ret,
diff --git a/fs/ceph/super.h b/fs/ceph/super.h
index 250aefecd628..ad09c26afac6 100644
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -75,6 +75,13 @@
 #define CEPH_CAPS_WANTED_DELAY_MIN_DEFAULT      5  /* cap release delay */
 #define CEPH_CAPS_WANTED_DELAY_MAX_DEFAULT     60  /* cap release delay */
 
+/*
+ * How big an extent array should we preallocate for a sparse read? This is
+ * just a starting value.  If we get more than this back from the OSD, the
+ * receiver will reallocate.
+ */
+#define CEPH_SPARSE_EXT_ARRAY_INITIAL	16
+
 struct ceph_mount_options {
 	unsigned int flags;
 
-- 
2.35.1


^ permalink raw reply related	[flat|nested] 15+ messages in thread

* Re: [PATCH v3 2/5] libceph: define struct ceph_sparse_extent and add some helpers
  2022-03-18 13:50 ` [PATCH v3 2/5] libceph: define struct ceph_sparse_extent and add some helpers Jeff Layton
@ 2022-03-21  7:57   ` Xiubo Li
  2022-03-21 10:02     ` Jeff Layton
  0 siblings, 1 reply; 15+ messages in thread
From: Xiubo Li @ 2022-03-21  7:57 UTC (permalink / raw)
  To: Jeff Layton, idryomov; +Cc: ceph-devel


On 3/18/22 9:50 PM, Jeff Layton wrote:
> When the OSD sends back a sparse read reply, it contains an array of
> these structures. Define the structure and add a couple of helpers for
> dealing with them.
>
> Also add a place in struct ceph_osd_req_op to store the extent buffer,
> and code to free it if it's populated when the req is torn down.
>
> Signed-off-by: Jeff Layton <jlayton@kernel.org>
> ---
>   include/linux/ceph/osd_client.h | 31 ++++++++++++++++++++++++++++++-
>   net/ceph/osd_client.c           | 13 +++++++++++++
>   2 files changed, 43 insertions(+), 1 deletion(-)
>
> diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h
> index 3122c1a3205f..00a5b53a6763 100644
> --- a/include/linux/ceph/osd_client.h
> +++ b/include/linux/ceph/osd_client.h
> @@ -29,6 +29,17 @@ typedef void (*ceph_osdc_callback_t)(struct ceph_osd_request *);
>   
>   #define CEPH_HOMELESS_OSD	-1
>   
> +/*
> + * A single extent in a SPARSE_READ reply.
> + *
> + * Note that these come from the OSD as little-endian values. On BE arches,
> + * we convert them in-place after receipt.
> + */
> +struct ceph_sparse_extent {
> +	u64	off;
> +	u64	len;
> +} __attribute__((packed));
> +
>   /*
>    * A given osd we're communicating with.
>    *
> @@ -104,6 +115,8 @@ struct ceph_osd_req_op {
>   			u64 offset, length;
>   			u64 truncate_size;
>   			u32 truncate_seq;
> +			int sparse_ext_len;

To be more readable, how about

s/sparse_ext_len/sparse_ext_cnt/ ?

-- Xiubo


^ permalink raw reply	[flat|nested] 15+ messages in thread

* Re: [PATCH v3 4/5] libceph: add sparse read support to OSD client
  2022-03-18 13:50 ` [PATCH v3 4/5] libceph: add sparse read support to OSD client Jeff Layton
@ 2022-03-21  8:41   ` Xiubo Li
  2022-03-21 10:01     ` Jeff Layton
  2022-03-21 14:17   ` Jeff Layton
  2022-03-22  1:58   ` Xiubo Li
  2 siblings, 1 reply; 15+ messages in thread
From: Xiubo Li @ 2022-03-21  8:41 UTC (permalink / raw)
  To: Jeff Layton, idryomov; +Cc: ceph-devel


On 3/18/22 9:50 PM, Jeff Layton wrote:
> Add a new sparse_read operation for the OSD client, driven by its own
> state machine. The messenger can repeatedly call the sparse_read
> operation, and it will pass back the necessary info to set up to read
> the next extent of data, while zero-filling the sparse regions.
>
> Signed-off-by: Jeff Layton <jlayton@kernel.org>
> ---
>   include/linux/ceph/osd_client.h |  32 +++++
>   net/ceph/osd_client.c           | 238 +++++++++++++++++++++++++++++++-
>   2 files changed, 266 insertions(+), 4 deletions(-)
>
> diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h
> index 00a5b53a6763..2c5f9eb7d888 100644
> --- a/include/linux/ceph/osd_client.h
> +++ b/include/linux/ceph/osd_client.h
> @@ -40,6 +40,36 @@ struct ceph_sparse_extent {
>   	u64	len;
>   } __attribute__((packed));
>   
> +/* Sparse read state machine state values */
> +enum ceph_sparse_read_state {
> +	CEPH_SPARSE_READ_HDR	= 0,
> +	CEPH_SPARSE_READ_EXTENTS,
> +	CEPH_SPARSE_READ_DATA_LEN,
> +	CEPH_SPARSE_READ_DATA,
> +};
> +
> +/*
> + * A SPARSE_READ reply is a 32-bit count of extents, followed by an array of
> + * 64-bit offset/length pairs, and then all of the actual file data
> + * concatenated after it (sans holes).
> + *
> + * Unfortunately, we don't know how long the extent array is until we've
> + * started reading the data section of the reply. The caller should send down
> + * a destination buffer for the array, but we'll alloc one if it's too small
> + * or if the caller doesn't.
> + */
> +struct ceph_sparse_read {
> +	enum ceph_sparse_read_state	sr_state;	/* state machine state */
> +	u64				sr_req_off;	/* orig request offset */
> +	u64				sr_req_len;	/* orig request length */
> +	u64				sr_pos;		/* current pos in buffer */
> +	int				sr_index;	/* current extent index */
> +	__le32				sr_datalen;	/* length of actual data */
> +	u32				sr_count;	/* extent count in reply */
> +	int				sr_ext_len;	/* length of extent array */
> +	struct ceph_sparse_extent	*sr_extent;	/* extent array */
> +};
> +
>   /*
>    * A given osd we're communicating with.
>    *
> @@ -48,6 +78,7 @@ struct ceph_sparse_extent {
>    */
>   struct ceph_osd {
>   	refcount_t o_ref;
> +	int o_sparse_op_idx;
>   	struct ceph_osd_client *o_osdc;
>   	int o_osd;
>   	int o_incarnation;
> @@ -63,6 +94,7 @@ struct ceph_osd {
>   	unsigned long lru_ttl;
>   	struct list_head o_keepalive_item;
>   	struct mutex lock;
> +	struct ceph_sparse_read	o_sparse_read;
>   };
>   
>   #define CEPH_OSD_SLAB_OPS	2
> diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
> index 9fec258e1f8d..3694696c8a31 100644
> --- a/net/ceph/osd_client.c
> +++ b/net/ceph/osd_client.c
> @@ -376,6 +376,7 @@ static void osd_req_op_data_release(struct ceph_osd_request *osd_req,
>   
>   	switch (op->op) {
>   	case CEPH_OSD_OP_READ:
> +	case CEPH_OSD_OP_SPARSE_READ:
>   	case CEPH_OSD_OP_WRITE:
>   	case CEPH_OSD_OP_WRITEFULL:
>   		kfree(op->extent.sparse_ext);
> @@ -707,6 +708,7 @@ static void get_num_data_items(struct ceph_osd_request *req,
>   		/* reply */
>   		case CEPH_OSD_OP_STAT:
>   		case CEPH_OSD_OP_READ:
> +		case CEPH_OSD_OP_SPARSE_READ:
>   		case CEPH_OSD_OP_LIST_WATCHERS:
>   			*num_reply_data_items += 1;
>   			break;
> @@ -776,7 +778,7 @@ void osd_req_op_extent_init(struct ceph_osd_request *osd_req,
>   
>   	BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE &&
>   	       opcode != CEPH_OSD_OP_WRITEFULL && opcode != CEPH_OSD_OP_ZERO &&
> -	       opcode != CEPH_OSD_OP_TRUNCATE);
> +	       opcode != CEPH_OSD_OP_TRUNCATE && opcode != CEPH_OSD_OP_SPARSE_READ);
>   
>   	op->extent.offset = offset;
>   	op->extent.length = length;
> @@ -985,6 +987,7 @@ static u32 osd_req_encode_op(struct ceph_osd_op *dst,
>   	case CEPH_OSD_OP_STAT:
>   		break;
>   	case CEPH_OSD_OP_READ:
> +	case CEPH_OSD_OP_SPARSE_READ:
>   	case CEPH_OSD_OP_WRITE:
>   	case CEPH_OSD_OP_WRITEFULL:
>   	case CEPH_OSD_OP_ZERO:
> @@ -1081,7 +1084,8 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
>   
>   	BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE &&
>   	       opcode != CEPH_OSD_OP_ZERO && opcode != CEPH_OSD_OP_TRUNCATE &&
> -	       opcode != CEPH_OSD_OP_CREATE && opcode != CEPH_OSD_OP_DELETE);
> +	       opcode != CEPH_OSD_OP_CREATE && opcode != CEPH_OSD_OP_DELETE &&
> +	       opcode != CEPH_OSD_OP_SPARSE_READ);
>   
>   	req = ceph_osdc_alloc_request(osdc, snapc, num_ops, use_mempool,
>   					GFP_NOFS);
> @@ -1222,6 +1226,13 @@ static void osd_init(struct ceph_osd *osd)
>   	mutex_init(&osd->lock);
>   }
>   
> +static void ceph_init_sparse_read(struct ceph_sparse_read *sr)
> +{
> +	kfree(sr->sr_extent);
> +	memset(sr, '\0', sizeof(*sr));
> +	sr->sr_state = CEPH_SPARSE_READ_HDR;
> +}
> +
>   static void osd_cleanup(struct ceph_osd *osd)
>   {
>   	WARN_ON(!RB_EMPTY_NODE(&osd->o_node));
> @@ -1232,6 +1243,8 @@ static void osd_cleanup(struct ceph_osd *osd)
>   	WARN_ON(!list_empty(&osd->o_osd_lru));
>   	WARN_ON(!list_empty(&osd->o_keepalive_item));
>   
> +	ceph_init_sparse_read(&osd->o_sparse_read);
> +
>   	if (osd->o_auth.authorizer) {
>   		WARN_ON(osd_homeless(osd));
>   		ceph_auth_destroy_authorizer(osd->o_auth.authorizer);
> @@ -1251,6 +1264,9 @@ static struct ceph_osd *create_osd(struct ceph_osd_client *osdc, int onum)
>   	osd_init(osd);
>   	osd->o_osdc = osdc;
>   	osd->o_osd = onum;
> +	osd->o_sparse_op_idx = -1;
> +
> +	ceph_init_sparse_read(&osd->o_sparse_read);
>   
>   	ceph_con_init(&osd->o_con, osd, &osd_con_ops, &osdc->client->msgr);
>   
> @@ -2055,6 +2071,7 @@ static void setup_request_data(struct ceph_osd_request *req)
>   					       &op->raw_data_in);
>   			break;
>   		case CEPH_OSD_OP_READ:
> +		case CEPH_OSD_OP_SPARSE_READ:
>   			ceph_osdc_msg_data_add(reply_msg,
>   					       &op->extent.osd_data);
>   			break;
> @@ -2470,8 +2487,10 @@ static void finish_request(struct ceph_osd_request *req)
>   
>   	req->r_end_latency = ktime_get();
>   
> -	if (req->r_osd)
> +	if (req->r_osd) {
> +		ceph_init_sparse_read(&req->r_osd->o_sparse_read);
>   		unlink_request(req->r_osd, req);
> +	}
>   	atomic_dec(&osdc->num_requests);
>   
>   	/*
> @@ -5416,6 +5435,24 @@ static void osd_dispatch(struct ceph_connection *con, struct ceph_msg *msg)
>   	ceph_msg_put(msg);
>   }
>   
> +/* How much sparse data was requested? */
> +static u64 sparse_data_requested(struct ceph_osd_request *req)
> +{
> +	u64 len = 0;
> +
> +	if (req->r_flags & CEPH_OSD_FLAG_READ) {
> +		int i;
> +
> +		for (i = 0; i < req->r_num_ops; ++i) {
> +			struct ceph_osd_req_op *op = &req->r_ops[i];
> +
> +			if (op->op == CEPH_OSD_OP_SPARSE_READ)
> +				len += op->extent.length;
> +		}
> +	}
> +	return len;
> +}
> +
>   /*
>    * Lookup and return message for incoming reply.  Don't try to do
>    * anything about a larger than preallocated data portion of the
> @@ -5432,6 +5469,7 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
>   	int front_len = le32_to_cpu(hdr->front_len);
>   	int data_len = le32_to_cpu(hdr->data_len);
>   	u64 tid = le64_to_cpu(hdr->tid);
> +	u64 srlen;
>   
>   	down_read(&osdc->lock);
>   	if (!osd_registered(osd)) {
> @@ -5464,7 +5502,8 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
>   		req->r_reply = m;
>   	}
>   
> -	if (data_len > req->r_reply->data_length) {
> +	srlen = sparse_data_requested(req);
> +	if (!srlen && (data_len > req->r_reply->data_length)) {
>   		pr_warn("%s osd%d tid %llu data %d > preallocated %zu, skipping\n",
>   			__func__, osd->o_osd, req->r_tid, data_len,
>   			req->r_reply->data_length);
> @@ -5474,6 +5513,8 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
>   	}
>   
>   	m = ceph_msg_get(req->r_reply);
> +	m->sparse_read = (bool)srlen;
> +
>   	dout("get_reply tid %lld %p\n", tid, m);
>   
>   out_unlock_session:
> @@ -5706,9 +5747,198 @@ static int osd_check_message_signature(struct ceph_msg *msg)
>   	return ceph_auth_check_message_signature(auth, msg);
>   }
>   
> +static void advance_cursor(struct ceph_msg_data_cursor *cursor, size_t len, bool zero)
> +{
> +	while (len) {
> +		struct page *page;
> +		size_t poff, plen;
> +		bool last = false;
> +
> +		page = ceph_msg_data_next(cursor, &poff, &plen, &last);
> +		if (plen > len)
> +			plen = len;
> +		if (zero)
> +			zero_user_segment(page, poff, poff + plen);
> +		len -= plen;
> +		ceph_msg_data_advance(cursor, plen);
> +	}
> +}
> +
> +static int prep_next_sparse_read(struct ceph_connection *con,
> +				 struct ceph_msg_data_cursor *cursor)
> +{
> +	struct ceph_osd *o = con->private;
> +	struct ceph_sparse_read *sr = &o->o_sparse_read;
> +	struct ceph_osd_request *req;
> +	struct ceph_osd_req_op *op;
> +
> +	spin_lock(&o->o_requests_lock);
> +	req = lookup_request(&o->o_requests, le64_to_cpu(con->in_msg->hdr.tid));

For the 'o_requests_lock', shouldn't be enough only protecting the 
'lookup_request()' ? And just unlock it here ?

-- Xiubo

> +	if (!req) {
> +		spin_unlock(&o->o_requests_lock);
> +		return -EBADR;
> +	}
> +
> +	if (o->o_sparse_op_idx < 0) {
> +		u64 srlen = sparse_data_requested(req);
> +
> +		dout("%s: [%d] starting new sparse read req. srlen=0x%llx\n",
> +			__func__, o->o_osd, srlen);
> +		ceph_msg_data_cursor_init(cursor, con->in_msg, srlen);
> +	} else {
> +		op = &req->r_ops[o->o_sparse_op_idx];
> +
> +		WARN_ON_ONCE(op->extent.sparse_ext);
> +
> +		/* hand back buffer we took earlier */
> +		op->extent.sparse_ext = sr->sr_extent;
> +		sr->sr_extent = NULL;
> +		op->extent.sparse_ext_len = sr->sr_count;
> +		sr->sr_ext_len = 0;
> +		dout("%s: [%d] completed extent array len %d cursor->resid %zd\n",
> +			__func__, o->o_osd, op->extent.sparse_ext_len,
> +			cursor->resid);
> +		/*
> +		 * FIXME: Need to advance to the next data item here, in the
> +		 * event that there are multiple sparse read requests. Is this
> +		 * the right way to do that?
> +		 */
> +		if (cursor->resid)
> +			advance_cursor(cursor, cursor->resid, false);
> +	}
> +
> +	ceph_init_sparse_read(sr);
> +
> +	/* find next op in this request (if any) */
> +	while (++o->o_sparse_op_idx < req->r_num_ops) {
> +		op = &req->r_ops[o->o_sparse_op_idx];
> +		if (op->op == CEPH_OSD_OP_SPARSE_READ)
> +			goto found;
> +	}
> +
> +	/* reset for next sparse read request */
> +	spin_unlock(&o->o_requests_lock);
> +	o->o_sparse_op_idx = -1;
> +	return 0;
> +found:
> +	sr->sr_req_off = op->extent.offset;
> +	sr->sr_req_len = op->extent.length;
> +	sr->sr_pos = sr->sr_req_off;
> +	dout("%s: [%d] new sparse read op at idx %d 0x%llx~0x%llx\n", __func__,
> +		o->o_osd, o->o_sparse_op_idx, sr->sr_req_off, sr->sr_req_len);
> +
> +	/* hand off request's sparse extent map buffer */
> +	sr->sr_ext_len = op->extent.sparse_ext_len;
> +	op->extent.sparse_ext_len = 0;
> +	sr->sr_extent = op->extent.sparse_ext;
> +	op->extent.sparse_ext = NULL;
> +
> +	spin_unlock(&o->o_requests_lock);
> +	return 1;
> +}
> +
> +#ifdef __BIG_ENDIAN
> +static inline void convert_extent_map(struct ceph_sparse_read *sr)
> +{
> +	int i;
> +
> +	for (i = 0; i < sr->sr_count; i++) {
> +		struct ceph_sparse_extent *ext = sr->sr_extent[i];
> +
> +		ext->off = le64_to_cpu((__force __le32)ext->off);
> +		ext->len = le64_to_cpu((__force __le32)ext->len);
> +	}
> +}
> +#else
> +static inline void convert_extent_map(struct ceph_sparse_read *sr)
> +{
> +}
> +#endif
> +
> +static int osd_sparse_read(struct ceph_connection *con,
> +			   struct ceph_msg_data_cursor *cursor,
> +			   u64 *plen, char **pbuf)
> +{
> +	struct ceph_osd *o = con->private;
> +	struct ceph_sparse_read *sr = &o->o_sparse_read;
> +	u32 count = sr->sr_count;
> +	u64 eoff, elen;
> +	int ret;
> +
> +	switch (sr->sr_state) {
> +	case CEPH_SPARSE_READ_HDR:
> +next_op:
> +		ret = prep_next_sparse_read(con, cursor);
> +		if (ret <= 0)
> +			return ret;
> +
> +		/* number of extents */
> +		*plen = sizeof(sr->sr_count);
> +		*pbuf = (char *)&sr->sr_count;
> +		sr->sr_state = CEPH_SPARSE_READ_EXTENTS;
> +		break;
> +	case CEPH_SPARSE_READ_EXTENTS:
> +		/* Convert sr_count to host-endian */
> +		count = le32_to_cpu((__force __le32)sr->sr_count);
> +		sr->sr_count = count;
> +		dout("[%d] got %u extents\n", o->o_osd, count);
> +
> +		if (count > 0) {
> +			if (!sr->sr_extent || count > sr->sr_ext_len) {
> +				/* no extent array provided, or too short */
> +				kfree(sr->sr_extent);
> +				sr->sr_extent = kmalloc_array(count, sizeof(*sr->sr_extent),
> +							   GFP_NOIO);
> +				if (!sr->sr_extent)
> +					return -ENOMEM;
> +				sr->sr_ext_len = count;
> +			}
> +			*plen = count * sizeof(*sr->sr_extent);
> +			*pbuf = (char *)sr->sr_extent;
> +			sr->sr_state = CEPH_SPARSE_READ_DATA_LEN;
> +			break;
> +		}
> +		/* No extents? Fall through to reading data len */
> +		fallthrough;
> +	case CEPH_SPARSE_READ_DATA_LEN:
> +		convert_extent_map(sr);
> +		*plen = sizeof(sr->sr_datalen);
> +		*pbuf = (char *)&sr->sr_datalen;
> +		sr->sr_state = CEPH_SPARSE_READ_DATA;
> +		break;
> +	case CEPH_SPARSE_READ_DATA:
> +		if (sr->sr_index >= count) {
> +			sr->sr_state = CEPH_SPARSE_READ_HDR;
> +			goto next_op;
> +		}
> +
> +		eoff = sr->sr_extent[sr->sr_index].off;
> +		elen = sr->sr_extent[sr->sr_index].len;
> +
> +		dout("[%d] ext %d off 0x%llx len 0x%llx\n",
> +		     o->o_osd, sr->sr_index, eoff, elen);
> +
> +		/* zero out anything from sr_pos to start of extent */
> +		if (sr->sr_pos < eoff)
> +			advance_cursor(cursor, eoff - sr->sr_pos, true);
> +
> +		/* Set position to end of extent */
> +		sr->sr_pos = eoff + elen;
> +
> +		/* send back the new length */
> +		*plen = elen;
> +
> +		/* Bump the array index */
> +		++sr->sr_index;
> +		break;
> +	}
> +	return 1;
> +}
> +
>   static const struct ceph_connection_operations osd_con_ops = {
>   	.get = osd_get_con,
>   	.put = osd_put_con,
> +	.sparse_read = osd_sparse_read,
>   	.alloc_msg = osd_alloc_msg,
>   	.dispatch = osd_dispatch,
>   	.fault = osd_fault,


^ permalink raw reply	[flat|nested] 15+ messages in thread

* Re: [PATCH v3 4/5] libceph: add sparse read support to OSD client
  2022-03-21  8:41   ` Xiubo Li
@ 2022-03-21 10:01     ` Jeff Layton
  0 siblings, 0 replies; 15+ messages in thread
From: Jeff Layton @ 2022-03-21 10:01 UTC (permalink / raw)
  To: Xiubo Li, idryomov; +Cc: ceph-devel

On Mon, 2022-03-21 at 16:41 +0800, Xiubo Li wrote:
> On 3/18/22 9:50 PM, Jeff Layton wrote:
> > Add a new sparse_read operation for the OSD client, driven by its own
> > state machine. The messenger can repeatedly call the sparse_read
> > operation, and it will pass back the necessary info to set up to read
> > the next extent of data, while zero-filling the sparse regions.
> > 
> > Signed-off-by: Jeff Layton <jlayton@kernel.org>
> > ---
> >   include/linux/ceph/osd_client.h |  32 +++++
> >   net/ceph/osd_client.c           | 238 +++++++++++++++++++++++++++++++-
> >   2 files changed, 266 insertions(+), 4 deletions(-)
> > 
> > diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h
> > index 00a5b53a6763..2c5f9eb7d888 100644
> > --- a/include/linux/ceph/osd_client.h
> > +++ b/include/linux/ceph/osd_client.h
> > @@ -40,6 +40,36 @@ struct ceph_sparse_extent {
> >   	u64	len;
> >   } __attribute__((packed));
> >   
> > +/* Sparse read state machine state values */
> > +enum ceph_sparse_read_state {
> > +	CEPH_SPARSE_READ_HDR	= 0,
> > +	CEPH_SPARSE_READ_EXTENTS,
> > +	CEPH_SPARSE_READ_DATA_LEN,
> > +	CEPH_SPARSE_READ_DATA,
> > +};
> > +
> > +/*
> > + * A SPARSE_READ reply is a 32-bit count of extents, followed by an array of
> > + * 64-bit offset/length pairs, and then all of the actual file data
> > + * concatenated after it (sans holes).
> > + *
> > + * Unfortunately, we don't know how long the extent array is until we've
> > + * started reading the data section of the reply. The caller should send down
> > + * a destination buffer for the array, but we'll alloc one if it's too small
> > + * or if the caller doesn't.
> > + */
> > +struct ceph_sparse_read {
> > +	enum ceph_sparse_read_state	sr_state;	/* state machine state */
> > +	u64				sr_req_off;	/* orig request offset */
> > +	u64				sr_req_len;	/* orig request length */
> > +	u64				sr_pos;		/* current pos in buffer */
> > +	int				sr_index;	/* current extent index */
> > +	__le32				sr_datalen;	/* length of actual data */
> > +	u32				sr_count;	/* extent count in reply */
> > +	int				sr_ext_len;	/* length of extent array */
> > +	struct ceph_sparse_extent	*sr_extent;	/* extent array */
> > +};
> > +
> >   /*
> >    * A given osd we're communicating with.
> >    *
> > @@ -48,6 +78,7 @@ struct ceph_sparse_extent {
> >    */
> >   struct ceph_osd {
> >   	refcount_t o_ref;
> > +	int o_sparse_op_idx;
> >   	struct ceph_osd_client *o_osdc;
> >   	int o_osd;
> >   	int o_incarnation;
> > @@ -63,6 +94,7 @@ struct ceph_osd {
> >   	unsigned long lru_ttl;
> >   	struct list_head o_keepalive_item;
> >   	struct mutex lock;
> > +	struct ceph_sparse_read	o_sparse_read;
> >   };
> >   
> >   #define CEPH_OSD_SLAB_OPS	2
> > diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
> > index 9fec258e1f8d..3694696c8a31 100644
> > --- a/net/ceph/osd_client.c
> > +++ b/net/ceph/osd_client.c
> > @@ -376,6 +376,7 @@ static void osd_req_op_data_release(struct ceph_osd_request *osd_req,
> >   
> >   	switch (op->op) {
> >   	case CEPH_OSD_OP_READ:
> > +	case CEPH_OSD_OP_SPARSE_READ:
> >   	case CEPH_OSD_OP_WRITE:
> >   	case CEPH_OSD_OP_WRITEFULL:
> >   		kfree(op->extent.sparse_ext);
> > @@ -707,6 +708,7 @@ static void get_num_data_items(struct ceph_osd_request *req,
> >   		/* reply */
> >   		case CEPH_OSD_OP_STAT:
> >   		case CEPH_OSD_OP_READ:
> > +		case CEPH_OSD_OP_SPARSE_READ:
> >   		case CEPH_OSD_OP_LIST_WATCHERS:
> >   			*num_reply_data_items += 1;
> >   			break;
> > @@ -776,7 +778,7 @@ void osd_req_op_extent_init(struct ceph_osd_request *osd_req,
> >   
> >   	BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE &&
> >   	       opcode != CEPH_OSD_OP_WRITEFULL && opcode != CEPH_OSD_OP_ZERO &&
> > -	       opcode != CEPH_OSD_OP_TRUNCATE);
> > +	       opcode != CEPH_OSD_OP_TRUNCATE && opcode != CEPH_OSD_OP_SPARSE_READ);
> >   
> >   	op->extent.offset = offset;
> >   	op->extent.length = length;
> > @@ -985,6 +987,7 @@ static u32 osd_req_encode_op(struct ceph_osd_op *dst,
> >   	case CEPH_OSD_OP_STAT:
> >   		break;
> >   	case CEPH_OSD_OP_READ:
> > +	case CEPH_OSD_OP_SPARSE_READ:
> >   	case CEPH_OSD_OP_WRITE:
> >   	case CEPH_OSD_OP_WRITEFULL:
> >   	case CEPH_OSD_OP_ZERO:
> > @@ -1081,7 +1084,8 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
> >   
> >   	BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE &&
> >   	       opcode != CEPH_OSD_OP_ZERO && opcode != CEPH_OSD_OP_TRUNCATE &&
> > -	       opcode != CEPH_OSD_OP_CREATE && opcode != CEPH_OSD_OP_DELETE);
> > +	       opcode != CEPH_OSD_OP_CREATE && opcode != CEPH_OSD_OP_DELETE &&
> > +	       opcode != CEPH_OSD_OP_SPARSE_READ);
> >   
> >   	req = ceph_osdc_alloc_request(osdc, snapc, num_ops, use_mempool,
> >   					GFP_NOFS);
> > @@ -1222,6 +1226,13 @@ static void osd_init(struct ceph_osd *osd)
> >   	mutex_init(&osd->lock);
> >   }
> >   
> > +static void ceph_init_sparse_read(struct ceph_sparse_read *sr)
> > +{
> > +	kfree(sr->sr_extent);
> > +	memset(sr, '\0', sizeof(*sr));
> > +	sr->sr_state = CEPH_SPARSE_READ_HDR;
> > +}
> > +
> >   static void osd_cleanup(struct ceph_osd *osd)
> >   {
> >   	WARN_ON(!RB_EMPTY_NODE(&osd->o_node));
> > @@ -1232,6 +1243,8 @@ static void osd_cleanup(struct ceph_osd *osd)
> >   	WARN_ON(!list_empty(&osd->o_osd_lru));
> >   	WARN_ON(!list_empty(&osd->o_keepalive_item));
> >   
> > +	ceph_init_sparse_read(&osd->o_sparse_read);
> > +
> >   	if (osd->o_auth.authorizer) {
> >   		WARN_ON(osd_homeless(osd));
> >   		ceph_auth_destroy_authorizer(osd->o_auth.authorizer);
> > @@ -1251,6 +1264,9 @@ static struct ceph_osd *create_osd(struct ceph_osd_client *osdc, int onum)
> >   	osd_init(osd);
> >   	osd->o_osdc = osdc;
> >   	osd->o_osd = onum;
> > +	osd->o_sparse_op_idx = -1;
> > +
> > +	ceph_init_sparse_read(&osd->o_sparse_read);
> >   
> >   	ceph_con_init(&osd->o_con, osd, &osd_con_ops, &osdc->client->msgr);
> >   
> > @@ -2055,6 +2071,7 @@ static void setup_request_data(struct ceph_osd_request *req)
> >   					       &op->raw_data_in);
> >   			break;
> >   		case CEPH_OSD_OP_READ:
> > +		case CEPH_OSD_OP_SPARSE_READ:
> >   			ceph_osdc_msg_data_add(reply_msg,
> >   					       &op->extent.osd_data);
> >   			break;
> > @@ -2470,8 +2487,10 @@ static void finish_request(struct ceph_osd_request *req)
> >   
> >   	req->r_end_latency = ktime_get();
> >   
> > -	if (req->r_osd)
> > +	if (req->r_osd) {
> > +		ceph_init_sparse_read(&req->r_osd->o_sparse_read);
> >   		unlink_request(req->r_osd, req);
> > +	}
> >   	atomic_dec(&osdc->num_requests);
> >   
> >   	/*
> > @@ -5416,6 +5435,24 @@ static void osd_dispatch(struct ceph_connection *con, struct ceph_msg *msg)
> >   	ceph_msg_put(msg);
> >   }
> >   
> > +/* How much sparse data was requested? */
> > +static u64 sparse_data_requested(struct ceph_osd_request *req)
> > +{
> > +	u64 len = 0;
> > +
> > +	if (req->r_flags & CEPH_OSD_FLAG_READ) {
> > +		int i;
> > +
> > +		for (i = 0; i < req->r_num_ops; ++i) {
> > +			struct ceph_osd_req_op *op = &req->r_ops[i];
> > +
> > +			if (op->op == CEPH_OSD_OP_SPARSE_READ)
> > +				len += op->extent.length;
> > +		}
> > +	}
> > +	return len;
> > +}
> > +
> >   /*
> >    * Lookup and return message for incoming reply.  Don't try to do
> >    * anything about a larger than preallocated data portion of the
> > @@ -5432,6 +5469,7 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
> >   	int front_len = le32_to_cpu(hdr->front_len);
> >   	int data_len = le32_to_cpu(hdr->data_len);
> >   	u64 tid = le64_to_cpu(hdr->tid);
> > +	u64 srlen;
> >   
> >   	down_read(&osdc->lock);
> >   	if (!osd_registered(osd)) {
> > @@ -5464,7 +5502,8 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
> >   		req->r_reply = m;
> >   	}
> >   
> > -	if (data_len > req->r_reply->data_length) {
> > +	srlen = sparse_data_requested(req);
> > +	if (!srlen && (data_len > req->r_reply->data_length)) {
> >   		pr_warn("%s osd%d tid %llu data %d > preallocated %zu, skipping\n",
> >   			__func__, osd->o_osd, req->r_tid, data_len,
> >   			req->r_reply->data_length);
> > @@ -5474,6 +5513,8 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
> >   	}
> >   
> >   	m = ceph_msg_get(req->r_reply);
> > +	m->sparse_read = (bool)srlen;
> > +
> >   	dout("get_reply tid %lld %p\n", tid, m);
> >   
> >   out_unlock_session:
> > @@ -5706,9 +5747,198 @@ static int osd_check_message_signature(struct ceph_msg *msg)
> >   	return ceph_auth_check_message_signature(auth, msg);
> >   }
> >   
> > +static void advance_cursor(struct ceph_msg_data_cursor *cursor, size_t len, bool zero)
> > +{
> > +	while (len) {
> > +		struct page *page;
> > +		size_t poff, plen;
> > +		bool last = false;
> > +
> > +		page = ceph_msg_data_next(cursor, &poff, &plen, &last);
> > +		if (plen > len)
> > +			plen = len;
> > +		if (zero)
> > +			zero_user_segment(page, poff, poff + plen);
> > +		len -= plen;
> > +		ceph_msg_data_advance(cursor, plen);
> > +	}
> > +}
> > +
> > +static int prep_next_sparse_read(struct ceph_connection *con,
> > +				 struct ceph_msg_data_cursor *cursor)
> > +{
> > +	struct ceph_osd *o = con->private;
> > +	struct ceph_sparse_read *sr = &o->o_sparse_read;
> > +	struct ceph_osd_request *req;
> > +	struct ceph_osd_req_op *op;
> > +
> > +	spin_lock(&o->o_requests_lock);
> > +	req = lookup_request(&o->o_requests, le64_to_cpu(con->in_msg->hdr.tid));
> 
> For the 'o_requests_lock', shouldn't be enough only protecting the 
> 'lookup_request()' ? And just unlock it here ?
> 
> -- Xiubo
> 

No. We're not taking a reference to the req at this point, so once we
drop the lock it could end up being freed. Note that get_reply does
something similar where it just retains the mutex after finding it
without taking a reference to it.

> > +	if (!req) {
> > +		spin_unlock(&o->o_requests_lock);
> > +		return -EBADR;
> > +	}
> > +
> > +	if (o->o_sparse_op_idx < 0) {
> > +		u64 srlen = sparse_data_requested(req);
> > +
> > +		dout("%s: [%d] starting new sparse read req. srlen=0x%llx\n",
> > +			__func__, o->o_osd, srlen);
> > +		ceph_msg_data_cursor_init(cursor, con->in_msg, srlen);
> > +	} else {
> > +		op = &req->r_ops[o->o_sparse_op_idx];
> > +
> > +		WARN_ON_ONCE(op->extent.sparse_ext);
> > +
> > +		/* hand back buffer we took earlier */
> > +		op->extent.sparse_ext = sr->sr_extent;
> > +		sr->sr_extent = NULL;
> > +		op->extent.sparse_ext_len = sr->sr_count;
> > +		sr->sr_ext_len = 0;
> > +		dout("%s: [%d] completed extent array len %d cursor->resid %zd\n",
> > +			__func__, o->o_osd, op->extent.sparse_ext_len,
> > +			cursor->resid);
> > +		/*
> > +		 * FIXME: Need to advance to the next data item here, in the
> > +		 * event that there are multiple sparse read requests. Is this
> > +		 * the right way to do that?
> > +		 */
> > +		if (cursor->resid)
> > +			advance_cursor(cursor, cursor->resid, false);
> > +	}
> > +
> > +	ceph_init_sparse_read(sr);
> > +
> > +	/* find next op in this request (if any) */
> > +	while (++o->o_sparse_op_idx < req->r_num_ops) {
> > +		op = &req->r_ops[o->o_sparse_op_idx];
> > +		if (op->op == CEPH_OSD_OP_SPARSE_READ)
> > +			goto found;
> > +	}
> > +
> > +	/* reset for next sparse read request */
> > +	spin_unlock(&o->o_requests_lock);
> > +	o->o_sparse_op_idx = -1;
> > +	return 0;
> > +found:
> > +	sr->sr_req_off = op->extent.offset;
> > +	sr->sr_req_len = op->extent.length;
> > +	sr->sr_pos = sr->sr_req_off;
> > +	dout("%s: [%d] new sparse read op at idx %d 0x%llx~0x%llx\n", __func__,
> > +		o->o_osd, o->o_sparse_op_idx, sr->sr_req_off, sr->sr_req_len);
> > +
> > +	/* hand off request's sparse extent map buffer */
> > +	sr->sr_ext_len = op->extent.sparse_ext_len;
> > +	op->extent.sparse_ext_len = 0;
> > +	sr->sr_extent = op->extent.sparse_ext;
> > +	op->extent.sparse_ext = NULL;
> > +
> > +	spin_unlock(&o->o_requests_lock);
> > +	return 1;
> > +}
> > +
> > +#ifdef __BIG_ENDIAN
> > +static inline void convert_extent_map(struct ceph_sparse_read *sr)
> > +{
> > +	int i;
> > +
> > +	for (i = 0; i < sr->sr_count; i++) {
> > +		struct ceph_sparse_extent *ext = sr->sr_extent[i];
> > +
> > +		ext->off = le64_to_cpu((__force __le32)ext->off);
> > +		ext->len = le64_to_cpu((__force __le32)ext->len);
> > +	}
> > +}
> > +#else
> > +static inline void convert_extent_map(struct ceph_sparse_read *sr)
> > +{
> > +}
> > +#endif
> > +
> > +static int osd_sparse_read(struct ceph_connection *con,
> > +			   struct ceph_msg_data_cursor *cursor,
> > +			   u64 *plen, char **pbuf)
> > +{
> > +	struct ceph_osd *o = con->private;
> > +	struct ceph_sparse_read *sr = &o->o_sparse_read;
> > +	u32 count = sr->sr_count;
> > +	u64 eoff, elen;
> > +	int ret;
> > +
> > +	switch (sr->sr_state) {
> > +	case CEPH_SPARSE_READ_HDR:
> > +next_op:
> > +		ret = prep_next_sparse_read(con, cursor);
> > +		if (ret <= 0)
> > +			return ret;
> > +
> > +		/* number of extents */
> > +		*plen = sizeof(sr->sr_count);
> > +		*pbuf = (char *)&sr->sr_count;
> > +		sr->sr_state = CEPH_SPARSE_READ_EXTENTS;
> > +		break;
> > +	case CEPH_SPARSE_READ_EXTENTS:
> > +		/* Convert sr_count to host-endian */
> > +		count = le32_to_cpu((__force __le32)sr->sr_count);
> > +		sr->sr_count = count;
> > +		dout("[%d] got %u extents\n", o->o_osd, count);
> > +
> > +		if (count > 0) {
> > +			if (!sr->sr_extent || count > sr->sr_ext_len) {
> > +				/* no extent array provided, or too short */
> > +				kfree(sr->sr_extent);
> > +				sr->sr_extent = kmalloc_array(count, sizeof(*sr->sr_extent),
> > +							   GFP_NOIO);
> > +				if (!sr->sr_extent)
> > +					return -ENOMEM;
> > +				sr->sr_ext_len = count;
> > +			}
> > +			*plen = count * sizeof(*sr->sr_extent);
> > +			*pbuf = (char *)sr->sr_extent;
> > +			sr->sr_state = CEPH_SPARSE_READ_DATA_LEN;
> > +			break;
> > +		}
> > +		/* No extents? Fall through to reading data len */
> > +		fallthrough;
> > +	case CEPH_SPARSE_READ_DATA_LEN:
> > +		convert_extent_map(sr);
> > +		*plen = sizeof(sr->sr_datalen);
> > +		*pbuf = (char *)&sr->sr_datalen;
> > +		sr->sr_state = CEPH_SPARSE_READ_DATA;
> > +		break;
> > +	case CEPH_SPARSE_READ_DATA:
> > +		if (sr->sr_index >= count) {
> > +			sr->sr_state = CEPH_SPARSE_READ_HDR;
> > +			goto next_op;
> > +		}
> > +
> > +		eoff = sr->sr_extent[sr->sr_index].off;
> > +		elen = sr->sr_extent[sr->sr_index].len;
> > +
> > +		dout("[%d] ext %d off 0x%llx len 0x%llx\n",
> > +		     o->o_osd, sr->sr_index, eoff, elen);
> > +
> > +		/* zero out anything from sr_pos to start of extent */
> > +		if (sr->sr_pos < eoff)
> > +			advance_cursor(cursor, eoff - sr->sr_pos, true);
> > +
> > +		/* Set position to end of extent */
> > +		sr->sr_pos = eoff + elen;
> > +
> > +		/* send back the new length */
> > +		*plen = elen;
> > +
> > +		/* Bump the array index */
> > +		++sr->sr_index;
> > +		break;
> > +	}
> > +	return 1;
> > +}
> > +
> >   static const struct ceph_connection_operations osd_con_ops = {
> >   	.get = osd_get_con,
> >   	.put = osd_put_con,
> > +	.sparse_read = osd_sparse_read,
> >   	.alloc_msg = osd_alloc_msg,
> >   	.dispatch = osd_dispatch,
> >   	.fault = osd_fault,
> 

-- 
Jeff Layton <jlayton@kernel.org>

^ permalink raw reply	[flat|nested] 15+ messages in thread

* Re: [PATCH v3 2/5] libceph: define struct ceph_sparse_extent and add some helpers
  2022-03-21  7:57   ` Xiubo Li
@ 2022-03-21 10:02     ` Jeff Layton
  0 siblings, 0 replies; 15+ messages in thread
From: Jeff Layton @ 2022-03-21 10:02 UTC (permalink / raw)
  To: Xiubo Li, idryomov; +Cc: ceph-devel

On Mon, 2022-03-21 at 15:57 +0800, Xiubo Li wrote:
> On 3/18/22 9:50 PM, Jeff Layton wrote:
> > When the OSD sends back a sparse read reply, it contains an array of
> > these structures. Define the structure and add a couple of helpers for
> > dealing with them.
> > 
> > Also add a place in struct ceph_osd_req_op to store the extent buffer,
> > and code to free it if it's populated when the req is torn down.
> > 
> > Signed-off-by: Jeff Layton <jlayton@kernel.org>
> > ---
> >   include/linux/ceph/osd_client.h | 31 ++++++++++++++++++++++++++++++-
> >   net/ceph/osd_client.c           | 13 +++++++++++++
> >   2 files changed, 43 insertions(+), 1 deletion(-)
> > 
> > diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h
> > index 3122c1a3205f..00a5b53a6763 100644
> > --- a/include/linux/ceph/osd_client.h
> > +++ b/include/linux/ceph/osd_client.h
> > @@ -29,6 +29,17 @@ typedef void (*ceph_osdc_callback_t)(struct ceph_osd_request *);
> >   
> >   #define CEPH_HOMELESS_OSD	-1
> >   
> > +/*
> > + * A single extent in a SPARSE_READ reply.
> > + *
> > + * Note that these come from the OSD as little-endian values. On BE arches,
> > + * we convert them in-place after receipt.
> > + */
> > +struct ceph_sparse_extent {
> > +	u64	off;
> > +	u64	len;
> > +} __attribute__((packed));
> > +
> >   /*
> >    * A given osd we're communicating with.
> >    *
> > @@ -104,6 +115,8 @@ struct ceph_osd_req_op {
> >   			u64 offset, length;
> >   			u64 truncate_size;
> >   			u32 truncate_seq;
> > +			int sparse_ext_len;
> 
> To be more readable, how about
> 
> s/sparse_ext_len/sparse_ext_cnt/ ?
> 
> -- Xiubo
> 

Sure, I'll make that change.
-- 
Jeff Layton <jlayton@kernel.org>

^ permalink raw reply	[flat|nested] 15+ messages in thread

* Re: [PATCH v3 5/5] ceph: convert to sparse reads
  2022-03-18 13:50 ` [PATCH v3 5/5] ceph: convert to sparse reads Jeff Layton
@ 2022-03-21 12:30   ` Jeff Layton
  0 siblings, 0 replies; 15+ messages in thread
From: Jeff Layton @ 2022-03-21 12:30 UTC (permalink / raw)
  To: idryomov, xiubli; +Cc: ceph-devel

On Fri, 2022-03-18 at 09:50 -0400, Jeff Layton wrote:
> Have ceph issue sparse reads instead of normal ones. The callers now
> preallocate an sparse extent buffer that the libceph receive code can
> populate and hand back after the operation completes.
> 
> After a successful read, we can't use the req->r_result value to
> determine the amount of data "read", so instead we set the received
> length to be from the end of the last extent in the buffer. Any
> interstitial holes will have been filled by the receive code.
> 
> Signed-off-by: Jeff Layton <jlayton@kernel.org>
> ---
>  fs/ceph/addr.c  | 13 +++++++++++--
>  fs/ceph/file.c  | 41 ++++++++++++++++++++++++++++++++++-------
>  fs/ceph/super.h |  7 +++++++
>  3 files changed, 52 insertions(+), 9 deletions(-)
> 
> diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c
> index 752c421c9922..6d4f9fbf22ce 100644
> --- a/fs/ceph/addr.c
> +++ b/fs/ceph/addr.c
> @@ -220,6 +220,7 @@ static void finish_netfs_read(struct ceph_osd_request *req)
>  	struct ceph_fs_client *fsc = ceph_inode_to_client(req->r_inode);
>  	struct ceph_osd_data *osd_data = osd_req_op_extent_osd_data(req, 0);
>  	struct netfs_read_subrequest *subreq = req->r_priv;
> +	struct ceph_osd_req_op *op = &req->r_ops[0];
>  	int num_pages;
>  	int err = req->r_result;
>  
> @@ -230,7 +231,9 @@ static void finish_netfs_read(struct ceph_osd_request *req)
>  	     subreq->len, i_size_read(req->r_inode));
>  
>  	/* no object means success but no data */
> -	if (err == -ENOENT)
> +	if (err >= 0)
> +		err = ceph_sparse_ext_map_end(op);
> +	else if (err == -ENOENT)
>  		err = 0;
>  	else if (err == -EBLOCKLISTED)
>  		fsc->blocklisted = true;
> @@ -317,7 +320,7 @@ static void ceph_netfs_issue_op(struct netfs_read_subrequest *subreq)
>  		return;
>  
>  	req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, vino, subreq->start, &len,
> -			0, 1, CEPH_OSD_OP_READ,
> +			0, 1, CEPH_OSD_OP_SPARSE_READ,
>  			CEPH_OSD_FLAG_READ | fsc->client->osdc.client->options->read_from_replica,
>  			NULL, ci->i_truncate_seq, ci->i_truncate_size, false);
>  	if (IS_ERR(req)) {
> @@ -326,6 +329,12 @@ static void ceph_netfs_issue_op(struct netfs_read_subrequest *subreq)
>  		goto out;
>  	}
>  
> +	err = ceph_alloc_sparse_ext_map(&req->r_ops[0], CEPH_SPARSE_EXT_ARRAY_INITIAL);
> +	if (err) {
> +		ceph_osdc_put_request(req);
> +		goto out;
> +	}
> +
>  	dout("%s: pos=%llu orig_len=%zu len=%llu\n", __func__, subreq->start, subreq->len, len);
>  	iov_iter_xarray(&iter, READ, &rreq->mapping->i_pages, subreq->start, len);
>  	err = iov_iter_get_pages_alloc(&iter, &pages, len, &page_off);
> diff --git a/fs/ceph/file.c b/fs/ceph/file.c
> index feb75eb1cd82..deba39989a07 100644
> --- a/fs/ceph/file.c
> +++ b/fs/ceph/file.c
> @@ -931,10 +931,11 @@ static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *to,
>  		bool more;
>  		int idx;
>  		size_t left;
> +		struct ceph_osd_req_op *op;
>  
>  		req = ceph_osdc_new_request(osdc, &ci->i_layout,
>  					ci->i_vino, off, &len, 0, 1,
> -					CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ,
> +					CEPH_OSD_OP_SPARSE_READ, CEPH_OSD_FLAG_READ,
>  					NULL, ci->i_truncate_seq,
>  					ci->i_truncate_size, false);
>  		if (IS_ERR(req)) {
> @@ -955,6 +956,14 @@ static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *to,
>  
>  		osd_req_op_extent_osd_data_pages(req, 0, pages, len, page_off,
>  						 false, false);
> +
> +		op = &req->r_ops[0];
> +		ret = ceph_alloc_sparse_ext_map(op, CEPH_SPARSE_EXT_ARRAY_INITIAL);
> +		if (ret) {
> +			ceph_osdc_put_request(req);
> +			break;
> +		}
> +
>  		ret = ceph_osdc_start_request(osdc, req, false);
>  		if (!ret)
>  			ret = ceph_osdc_wait_request(osdc, req);
> @@ -964,23 +973,28 @@ static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *to,
>  					 req->r_end_latency,
>  					 len, ret);
>  
> -		ceph_osdc_put_request(req);
> -
>  		i_size = i_size_read(inode);
>  		dout("sync_read %llu~%llu got %zd i_size %llu%s\n",
>  		     off, len, ret, i_size, (more ? " MORE" : ""));
>  
> -		if (ret == -ENOENT)
> +		/* Fix it to go to end of extent map */
> +		if (ret >= 0)
> +			ret = ceph_sparse_ext_map_end(op);
> +		else if (ret == -ENOENT)
>  			ret = 0;
> +
>  		if (ret >= 0 && ret < len && (off + ret < i_size)) {
>  			int zlen = min(len - ret, i_size - off - ret);
>  			int zoff = page_off + ret;
> +
>  			dout("sync_read zero gap %llu~%llu\n",
> -                             off + ret, off + ret + zlen);
> +				off + ret, off + ret + zlen);
>  			ceph_zero_page_vector_range(zoff, zlen, pages);
>  			ret += zlen;
>  		}
>  
> +		ceph_osdc_put_request(req);
> +
>  		idx = 0;
>  		left = ret > 0 ? ret : 0;
>  		while (left > 0) {
> @@ -1095,6 +1109,7 @@ static void ceph_aio_complete_req(struct ceph_osd_request *req)
>  	struct inode *inode = req->r_inode;
>  	struct ceph_aio_request *aio_req = req->r_priv;
>  	struct ceph_osd_data *osd_data = osd_req_op_extent_osd_data(req, 0);
> +	struct ceph_osd_req_op *op = &req->r_ops[0];
>  	struct ceph_client_metric *metric = &ceph_sb_to_mdsc(inode->i_sb)->metric;
>  	unsigned int len = osd_data->bvec_pos.iter.bi_size;
>  
> @@ -1117,6 +1132,8 @@ static void ceph_aio_complete_req(struct ceph_osd_request *req)
>  		}
>  		rc = -ENOMEM;
>  	} else if (!aio_req->write) {
> +		if (rc >= 0)
> +			rc = ceph_sparse_ext_map_end(op);
>  		if (rc == -ENOENT)
>  			rc = 0;
>  		if (rc >= 0 && len > rc) {
> @@ -1280,6 +1297,7 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter,
>  	while (iov_iter_count(iter) > 0) {
>  		u64 size = iov_iter_count(iter);
>  		ssize_t len;
> +		struct ceph_osd_req_op *op;
>  
>  		if (write)
>  			size = min_t(u64, size, fsc->mount_options->wsize);
> @@ -1291,7 +1309,7 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter,
>  					    vino, pos, &size, 0,
>  					    1,
>  					    write ? CEPH_OSD_OP_WRITE :
> -						    CEPH_OSD_OP_READ,
> +						    CEPH_OSD_OP_SPARSE_READ,
>  					    flags, snapc,
>  					    ci->i_truncate_seq,
>  					    ci->i_truncate_size,
> @@ -1342,6 +1360,12 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter,
>  		}
>  
>  		osd_req_op_extent_osd_data_bvecs(req, 0, bvecs, num_pages, len);
> +		op = &req->r_ops[0];
> +		ret = ceph_alloc_sparse_ext_map(op, CEPH_SPARSE_EXT_ARRAY_INITIAL);
> +		if (ret) {
> +			ceph_osdc_put_request(req);
> +			break;
> +		}
>  
>  		if (aio_req) {
>  			aio_req->total_len += len;
> @@ -1370,8 +1394,11 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter,
>  
>  		size = i_size_read(inode);
>  		if (!write) {
> -			if (ret == -ENOENT)
> +			if (ret >= 0)
> +				ret = ceph_sparse_ext_map_end(op);
> +			else if (ret == -ENOENT)
>  				ret = 0;
> +
>  			if (ret >= 0 && ret < len && pos + ret < size) {
>  				struct iov_iter i;
>  				int zlen = min_t(size_t, len - ret,
> diff --git a/fs/ceph/super.h b/fs/ceph/super.h
> index 250aefecd628..ad09c26afac6 100644
> --- a/fs/ceph/super.h
> +++ b/fs/ceph/super.h
> @@ -75,6 +75,13 @@
>  #define CEPH_CAPS_WANTED_DELAY_MIN_DEFAULT      5  /* cap release delay */
>  #define CEPH_CAPS_WANTED_DELAY_MAX_DEFAULT     60  /* cap release delay */
>  
> +/*
> + * How big an extent array should we preallocate for a sparse read? This is
> + * just a starting value.  If we get more than this back from the OSD, the
> + * receiver will reallocate.
> + */
> +#define CEPH_SPARSE_EXT_ARRAY_INITIAL	16
> +
>  struct ceph_mount_options {
>  	unsigned int flags;
>  

For the record, I don't see us merging this patch as-is. This is just
what I was using for testing, but in practice, we may want to just use
sparse reads when necessary (i.e. only with fscrypt enabled).

-- 
Jeff Layton <jlayton@kernel.org>

^ permalink raw reply	[flat|nested] 15+ messages in thread

* Re: [PATCH v3 4/5] libceph: add sparse read support to OSD client
  2022-03-18 13:50 ` [PATCH v3 4/5] libceph: add sparse read support to OSD client Jeff Layton
  2022-03-21  8:41   ` Xiubo Li
@ 2022-03-21 14:17   ` Jeff Layton
  2022-03-22  1:58   ` Xiubo Li
  2 siblings, 0 replies; 15+ messages in thread
From: Jeff Layton @ 2022-03-21 14:17 UTC (permalink / raw)
  To: idryomov, xiubli; +Cc: ceph-devel

On Fri, 2022-03-18 at 09:50 -0400, Jeff Layton wrote:
> Add a new sparse_read operation for the OSD client, driven by its own
> state machine. The messenger can repeatedly call the sparse_read
> operation, and it will pass back the necessary info to set up to read
> the next extent of data, while zero-filling the sparse regions.
> 
> Signed-off-by: Jeff Layton <jlayton@kernel.org>
> ---
>  include/linux/ceph/osd_client.h |  32 +++++
>  net/ceph/osd_client.c           | 238 +++++++++++++++++++++++++++++++-
>  2 files changed, 266 insertions(+), 4 deletions(-)
> 
> diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h
> index 00a5b53a6763..2c5f9eb7d888 100644
> --- a/include/linux/ceph/osd_client.h
> +++ b/include/linux/ceph/osd_client.h
> @@ -40,6 +40,36 @@ struct ceph_sparse_extent {
>  	u64	len;
>  } __attribute__((packed));
>  
> +/* Sparse read state machine state values */
> +enum ceph_sparse_read_state {
> +	CEPH_SPARSE_READ_HDR	= 0,
> +	CEPH_SPARSE_READ_EXTENTS,
> +	CEPH_SPARSE_READ_DATA_LEN,
> +	CEPH_SPARSE_READ_DATA,
> +};
> +
> +/*
> + * A SPARSE_READ reply is a 32-bit count of extents, followed by an array of
> + * 64-bit offset/length pairs, and then all of the actual file data
> + * concatenated after it (sans holes).
> + *
> + * Unfortunately, we don't know how long the extent array is until we've
> + * started reading the data section of the reply. The caller should send down
> + * a destination buffer for the array, but we'll alloc one if it's too small
> + * or if the caller doesn't.
> + */
> +struct ceph_sparse_read {
> +	enum ceph_sparse_read_state	sr_state;	/* state machine state */
> +	u64				sr_req_off;	/* orig request offset */
> +	u64				sr_req_len;	/* orig request length */
> +	u64				sr_pos;		/* current pos in buffer */
> +	int				sr_index;	/* current extent index */
> +	__le32				sr_datalen;	/* length of actual data */
> +	u32				sr_count;	/* extent count in reply */
> +	int				sr_ext_len;	/* length of extent array */
> +	struct ceph_sparse_extent	*sr_extent;	/* extent array */
> +};
> +
>  /*
>   * A given osd we're communicating with.
>   *
> @@ -48,6 +78,7 @@ struct ceph_sparse_extent {
>   */
>  struct ceph_osd {
>  	refcount_t o_ref;
> +	int o_sparse_op_idx;
>  	struct ceph_osd_client *o_osdc;
>  	int o_osd;
>  	int o_incarnation;
> @@ -63,6 +94,7 @@ struct ceph_osd {
>  	unsigned long lru_ttl;
>  	struct list_head o_keepalive_item;
>  	struct mutex lock;
> +	struct ceph_sparse_read	o_sparse_read;
>  };
>  
>  #define CEPH_OSD_SLAB_OPS	2
> diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
> index 9fec258e1f8d..3694696c8a31 100644
> --- a/net/ceph/osd_client.c
> +++ b/net/ceph/osd_client.c
> @@ -376,6 +376,7 @@ static void osd_req_op_data_release(struct ceph_osd_request *osd_req,
>  
>  	switch (op->op) {
>  	case CEPH_OSD_OP_READ:
> +	case CEPH_OSD_OP_SPARSE_READ:
>  	case CEPH_OSD_OP_WRITE:
>  	case CEPH_OSD_OP_WRITEFULL:
>  		kfree(op->extent.sparse_ext);
> @@ -707,6 +708,7 @@ static void get_num_data_items(struct ceph_osd_request *req,
>  		/* reply */
>  		case CEPH_OSD_OP_STAT:
>  		case CEPH_OSD_OP_READ:
> +		case CEPH_OSD_OP_SPARSE_READ:
>  		case CEPH_OSD_OP_LIST_WATCHERS:
>  			*num_reply_data_items += 1;
>  			break;
> @@ -776,7 +778,7 @@ void osd_req_op_extent_init(struct ceph_osd_request *osd_req,
>  
>  	BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE &&
>  	       opcode != CEPH_OSD_OP_WRITEFULL && opcode != CEPH_OSD_OP_ZERO &&
> -	       opcode != CEPH_OSD_OP_TRUNCATE);
> +	       opcode != CEPH_OSD_OP_TRUNCATE && opcode != CEPH_OSD_OP_SPARSE_READ);
>  
>  	op->extent.offset = offset;
>  	op->extent.length = length;
> @@ -985,6 +987,7 @@ static u32 osd_req_encode_op(struct ceph_osd_op *dst,
>  	case CEPH_OSD_OP_STAT:
>  		break;
>  	case CEPH_OSD_OP_READ:
> +	case CEPH_OSD_OP_SPARSE_READ:
>  	case CEPH_OSD_OP_WRITE:
>  	case CEPH_OSD_OP_WRITEFULL:
>  	case CEPH_OSD_OP_ZERO:
> @@ -1081,7 +1084,8 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
>  
>  	BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE &&
>  	       opcode != CEPH_OSD_OP_ZERO && opcode != CEPH_OSD_OP_TRUNCATE &&
> -	       opcode != CEPH_OSD_OP_CREATE && opcode != CEPH_OSD_OP_DELETE);
> +	       opcode != CEPH_OSD_OP_CREATE && opcode != CEPH_OSD_OP_DELETE &&
> +	       opcode != CEPH_OSD_OP_SPARSE_READ);
>  
>  	req = ceph_osdc_alloc_request(osdc, snapc, num_ops, use_mempool,
>  					GFP_NOFS);
> @@ -1222,6 +1226,13 @@ static void osd_init(struct ceph_osd *osd)
>  	mutex_init(&osd->lock);
>  }
>  
> +static void ceph_init_sparse_read(struct ceph_sparse_read *sr)
> +{
> +	kfree(sr->sr_extent);
> +	memset(sr, '\0', sizeof(*sr));
> +	sr->sr_state = CEPH_SPARSE_READ_HDR;
> +}
> +
>  static void osd_cleanup(struct ceph_osd *osd)
>  {
>  	WARN_ON(!RB_EMPTY_NODE(&osd->o_node));
> @@ -1232,6 +1243,8 @@ static void osd_cleanup(struct ceph_osd *osd)
>  	WARN_ON(!list_empty(&osd->o_osd_lru));
>  	WARN_ON(!list_empty(&osd->o_keepalive_item));
>  
> +	ceph_init_sparse_read(&osd->o_sparse_read);
> +
>  	if (osd->o_auth.authorizer) {
>  		WARN_ON(osd_homeless(osd));
>  		ceph_auth_destroy_authorizer(osd->o_auth.authorizer);
> @@ -1251,6 +1264,9 @@ static struct ceph_osd *create_osd(struct ceph_osd_client *osdc, int onum)
>  	osd_init(osd);
>  	osd->o_osdc = osdc;
>  	osd->o_osd = onum;
> +	osd->o_sparse_op_idx = -1;
> +
> +	ceph_init_sparse_read(&osd->o_sparse_read);
>  
>  	ceph_con_init(&osd->o_con, osd, &osd_con_ops, &osdc->client->msgr);
>  
> @@ -2055,6 +2071,7 @@ static void setup_request_data(struct ceph_osd_request *req)
>  					       &op->raw_data_in);
>  			break;
>  		case CEPH_OSD_OP_READ:
> +		case CEPH_OSD_OP_SPARSE_READ:
>  			ceph_osdc_msg_data_add(reply_msg,
>  					       &op->extent.osd_data);
>  			break;
> @@ -2470,8 +2487,10 @@ static void finish_request(struct ceph_osd_request *req)
>  
>  	req->r_end_latency = ktime_get();
>  
> -	if (req->r_osd)
> +	if (req->r_osd) {
> +		ceph_init_sparse_read(&req->r_osd->o_sparse_read);
>  		unlink_request(req->r_osd, req);
> +	}
>  	atomic_dec(&osdc->num_requests);
>  
>  	/*
> @@ -5416,6 +5435,24 @@ static void osd_dispatch(struct ceph_connection *con, struct ceph_msg *msg)
>  	ceph_msg_put(msg);
>  }
>  
> +/* How much sparse data was requested? */
> +static u64 sparse_data_requested(struct ceph_osd_request *req)
> +{
> +	u64 len = 0;
> +
> +	if (req->r_flags & CEPH_OSD_FLAG_READ) {
> +		int i;
> +
> +		for (i = 0; i < req->r_num_ops; ++i) {
> +			struct ceph_osd_req_op *op = &req->r_ops[i];
> +
> +			if (op->op == CEPH_OSD_OP_SPARSE_READ)
> +				len += op->extent.length;
> +		}
> +	}
> +	return len;
> +}
> +
>  /*
>   * Lookup and return message for incoming reply.  Don't try to do
>   * anything about a larger than preallocated data portion of the
> @@ -5432,6 +5469,7 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
>  	int front_len = le32_to_cpu(hdr->front_len);
>  	int data_len = le32_to_cpu(hdr->data_len);
>  	u64 tid = le64_to_cpu(hdr->tid);
> +	u64 srlen;
>  
>  	down_read(&osdc->lock);
>  	if (!osd_registered(osd)) {
> @@ -5464,7 +5502,8 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
>  		req->r_reply = m;
>  	}
>  
> -	if (data_len > req->r_reply->data_length) {
> +	srlen = sparse_data_requested(req);
> +	if (!srlen && (data_len > req->r_reply->data_length)) {
>  		pr_warn("%s osd%d tid %llu data %d > preallocated %zu, skipping\n",
>  			__func__, osd->o_osd, req->r_tid, data_len,
>  			req->r_reply->data_length);
> @@ -5474,6 +5513,8 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
>  	}
>  
>  	m = ceph_msg_get(req->r_reply);
> +	m->sparse_read = (bool)srlen;
> +
>  	dout("get_reply tid %lld %p\n", tid, m);
>  
>  out_unlock_session:
> @@ -5706,9 +5747,198 @@ static int osd_check_message_signature(struct ceph_msg *msg)
>  	return ceph_auth_check_message_signature(auth, msg);
>  }
>  
> +static void advance_cursor(struct ceph_msg_data_cursor *cursor, size_t len, bool zero)
> +{
> +	while (len) {
> +		struct page *page;
> +		size_t poff, plen;
> +		bool last = false;
> +
> +		page = ceph_msg_data_next(cursor, &poff, &plen, &last);
> +		if (plen > len)
> +			plen = len;
> +		if (zero)
> +			zero_user_segment(page, poff, poff + plen);
> +		len -= plen;
> +		ceph_msg_data_advance(cursor, plen);
> +	}
> +}
> +
> +static int prep_next_sparse_read(struct ceph_connection *con,
> +				 struct ceph_msg_data_cursor *cursor)
> +{
> +	struct ceph_osd *o = con->private;
> +	struct ceph_sparse_read *sr = &o->o_sparse_read;
> +	struct ceph_osd_request *req;
> +	struct ceph_osd_req_op *op;
> +
> +	spin_lock(&o->o_requests_lock);
> +	req = lookup_request(&o->o_requests, le64_to_cpu(con->in_msg->hdr.tid));
> +	if (!req) {
> +		spin_unlock(&o->o_requests_lock);
> +		return -EBADR;
> +	}
> +
> +	if (o->o_sparse_op_idx < 0) {
> +		u64 srlen = sparse_data_requested(req);
> +
> +		dout("%s: [%d] starting new sparse read req. srlen=0x%llx\n",
> +			__func__, o->o_osd, srlen);
> +		ceph_msg_data_cursor_init(cursor, con->in_msg, srlen);
> +	} else {
> +		op = &req->r_ops[o->o_sparse_op_idx];
> +
> +		WARN_ON_ONCE(op->extent.sparse_ext);
> +
> +		/* hand back buffer we took earlier */
> +		op->extent.sparse_ext = sr->sr_extent;
> +		sr->sr_extent = NULL;
> +		op->extent.sparse_ext_len = sr->sr_count;
> +		sr->sr_ext_len = 0;
> +		dout("%s: [%d] completed extent array len %d cursor->resid %zd\n",
> +			__func__, o->o_osd, op->extent.sparse_ext_len,
> +			cursor->resid);
> +		/*
> +		 * FIXME: Need to advance to the next data item here, in the
> +		 * event that there are multiple sparse read requests. Is this
> +		 * the right way to do that?
> +		 */
> +		if (cursor->resid)
> +			advance_cursor(cursor, cursor->resid, false);

As I suspected, the above is wrong. I was hoping that cursor->resid
would hold the residual data in the current data item, and it does when
the read is shorter than requested. When it's not though, then this ends
up advancing too far.

It's not a problem with this patchset alone since nothing in here issues
requests with multiple sparse reads, but in the current fscrypt set, it
does and this falls over.

I have a fix in my tree I'm testing now, and it seems to be OK. I'll
post a revised patchset in the near future (along with the var name
change that Xiubo requested).

> +	}
> +
> +	ceph_init_sparse_read(sr);
> +
> +	/* find next op in this request (if any) */
> +	while (++o->o_sparse_op_idx < req->r_num_ops) {
> +		op = &req->r_ops[o->o_sparse_op_idx];
> +		if (op->op == CEPH_OSD_OP_SPARSE_READ)
> +			goto found;
> +	}
> +
> +	/* reset for next sparse read request */
> +	spin_unlock(&o->o_requests_lock);
> +	o->o_sparse_op_idx = -1;
> +	return 0;
> +found:
> +	sr->sr_req_off = op->extent.offset;
> +	sr->sr_req_len = op->extent.length;
> +	sr->sr_pos = sr->sr_req_off;
> +	dout("%s: [%d] new sparse read op at idx %d 0x%llx~0x%llx\n", __func__,
> +		o->o_osd, o->o_sparse_op_idx, sr->sr_req_off, sr->sr_req_len);
> +
> +	/* hand off request's sparse extent map buffer */
> +	sr->sr_ext_len = op->extent.sparse_ext_len;
> +	op->extent.sparse_ext_len = 0;
> +	sr->sr_extent = op->extent.sparse_ext;
> +	op->extent.sparse_ext = NULL;
> +
> +	spin_unlock(&o->o_requests_lock);
> +	return 1;
> +}
> +
> +#ifdef __BIG_ENDIAN
> +static inline void convert_extent_map(struct ceph_sparse_read *sr)
> +{
> +	int i;
> +
> +	for (i = 0; i < sr->sr_count; i++) {
> +		struct ceph_sparse_extent *ext = sr->sr_extent[i];
> +
> +		ext->off = le64_to_cpu((__force __le32)ext->off);
> +		ext->len = le64_to_cpu((__force __le32)ext->len);
> +	}
> +}
> +#else
> +static inline void convert_extent_map(struct ceph_sparse_read *sr)
> +{
> +}
> +#endif
> +
> +static int osd_sparse_read(struct ceph_connection *con,
> +			   struct ceph_msg_data_cursor *cursor,
> +			   u64 *plen, char **pbuf)
> +{
> +	struct ceph_osd *o = con->private;
> +	struct ceph_sparse_read *sr = &o->o_sparse_read;
> +	u32 count = sr->sr_count;
> +	u64 eoff, elen;
> +	int ret;
> +
> +	switch (sr->sr_state) {
> +	case CEPH_SPARSE_READ_HDR:
> +next_op:
> +		ret = prep_next_sparse_read(con, cursor);
> +		if (ret <= 0)
> +			return ret;
> +
> +		/* number of extents */
> +		*plen = sizeof(sr->sr_count);
> +		*pbuf = (char *)&sr->sr_count;
> +		sr->sr_state = CEPH_SPARSE_READ_EXTENTS;
> +		break;
> +	case CEPH_SPARSE_READ_EXTENTS:
> +		/* Convert sr_count to host-endian */
> +		count = le32_to_cpu((__force __le32)sr->sr_count);
> +		sr->sr_count = count;
> +		dout("[%d] got %u extents\n", o->o_osd, count);
> +
> +		if (count > 0) {
> +			if (!sr->sr_extent || count > sr->sr_ext_len) {
> +				/* no extent array provided, or too short */
> +				kfree(sr->sr_extent);
> +				sr->sr_extent = kmalloc_array(count, sizeof(*sr->sr_extent),
> +							   GFP_NOIO);
> +				if (!sr->sr_extent)
> +					return -ENOMEM;
> +				sr->sr_ext_len = count;
> +			}
> +			*plen = count * sizeof(*sr->sr_extent);
> +			*pbuf = (char *)sr->sr_extent;
> +			sr->sr_state = CEPH_SPARSE_READ_DATA_LEN;
> +			break;
> +		}
> +		/* No extents? Fall through to reading data len */
> +		fallthrough;
> +	case CEPH_SPARSE_READ_DATA_LEN:
> +		convert_extent_map(sr);
> +		*plen = sizeof(sr->sr_datalen);
> +		*pbuf = (char *)&sr->sr_datalen;
> +		sr->sr_state = CEPH_SPARSE_READ_DATA;
> +		break;
> +	case CEPH_SPARSE_READ_DATA:
> +		if (sr->sr_index >= count) {
> +			sr->sr_state = CEPH_SPARSE_READ_HDR;
> +			goto next_op;
> +		}
> +
> +		eoff = sr->sr_extent[sr->sr_index].off;
> +		elen = sr->sr_extent[sr->sr_index].len;
> +
> +		dout("[%d] ext %d off 0x%llx len 0x%llx\n",
> +		     o->o_osd, sr->sr_index, eoff, elen);
> +
> +		/* zero out anything from sr_pos to start of extent */
> +		if (sr->sr_pos < eoff)
> +			advance_cursor(cursor, eoff - sr->sr_pos, true);
> +
> +		/* Set position to end of extent */
> +		sr->sr_pos = eoff + elen;
> +
> +		/* send back the new length */
> +		*plen = elen;
> +
> +		/* Bump the array index */
> +		++sr->sr_index;
> +		break;
> +	}
> +	return 1;
> +}
> +
>  static const struct ceph_connection_operations osd_con_ops = {
>  	.get = osd_get_con,
>  	.put = osd_put_con,
> +	.sparse_read = osd_sparse_read,
>  	.alloc_msg = osd_alloc_msg,
>  	.dispatch = osd_dispatch,
>  	.fault = osd_fault,

-- 
Jeff Layton <jlayton@kernel.org>

^ permalink raw reply	[flat|nested] 15+ messages in thread

* Re: [PATCH v3 4/5] libceph: add sparse read support to OSD client
  2022-03-18 13:50 ` [PATCH v3 4/5] libceph: add sparse read support to OSD client Jeff Layton
  2022-03-21  8:41   ` Xiubo Li
  2022-03-21 14:17   ` Jeff Layton
@ 2022-03-22  1:58   ` Xiubo Li
  2022-03-22  2:09     ` Xiubo Li
  2022-03-22 10:03     ` Jeff Layton
  2 siblings, 2 replies; 15+ messages in thread
From: Xiubo Li @ 2022-03-22  1:58 UTC (permalink / raw)
  To: Jeff Layton, idryomov; +Cc: ceph-devel


On 3/18/22 9:50 PM, Jeff Layton wrote:
> Add a new sparse_read operation for the OSD client, driven by its own
> state machine. The messenger can repeatedly call the sparse_read
> operation, and it will pass back the necessary info to set up to read
> the next extent of data, while zero-filling the sparse regions.
>
> Signed-off-by: Jeff Layton <jlayton@kernel.org>
> ---
>   include/linux/ceph/osd_client.h |  32 +++++
>   net/ceph/osd_client.c           | 238 +++++++++++++++++++++++++++++++-
>   2 files changed, 266 insertions(+), 4 deletions(-)
>
> diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h
> index 00a5b53a6763..2c5f9eb7d888 100644
> --- a/include/linux/ceph/osd_client.h
> +++ b/include/linux/ceph/osd_client.h
> @@ -40,6 +40,36 @@ struct ceph_sparse_extent {
>   	u64	len;
>   } __attribute__((packed));
>   
> +/* Sparse read state machine state values */
> +enum ceph_sparse_read_state {
> +	CEPH_SPARSE_READ_HDR	= 0,
> +	CEPH_SPARSE_READ_EXTENTS,
> +	CEPH_SPARSE_READ_DATA_LEN,
> +	CEPH_SPARSE_READ_DATA,
> +};
> +
> +/*
> + * A SPARSE_READ reply is a 32-bit count of extents, followed by an array of
> + * 64-bit offset/length pairs, and then all of the actual file data
> + * concatenated after it (sans holes).
> + *
> + * Unfortunately, we don't know how long the extent array is until we've
> + * started reading the data section of the reply. The caller should send down
> + * a destination buffer for the array, but we'll alloc one if it's too small
> + * or if the caller doesn't.
> + */
> +struct ceph_sparse_read {
> +	enum ceph_sparse_read_state	sr_state;	/* state machine state */
> +	u64				sr_req_off;	/* orig request offset */
> +	u64				sr_req_len;	/* orig request length */
> +	u64				sr_pos;		/* current pos in buffer */
> +	int				sr_index;	/* current extent index */
> +	__le32				sr_datalen;	/* length of actual data */
> +	u32				sr_count;	/* extent count in reply */
> +	int				sr_ext_len;	/* length of extent array */
> +	struct ceph_sparse_extent	*sr_extent;	/* extent array */
> +};
> +
>   /*
>    * A given osd we're communicating with.
>    *
> @@ -48,6 +78,7 @@ struct ceph_sparse_extent {
>    */
>   struct ceph_osd {
>   	refcount_t o_ref;
> +	int o_sparse_op_idx;
>   	struct ceph_osd_client *o_osdc;
>   	int o_osd;
>   	int o_incarnation;
> @@ -63,6 +94,7 @@ struct ceph_osd {
>   	unsigned long lru_ttl;
>   	struct list_head o_keepalive_item;
>   	struct mutex lock;
> +	struct ceph_sparse_read	o_sparse_read;
>   };
>   
>   #define CEPH_OSD_SLAB_OPS	2
> diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
> index 9fec258e1f8d..3694696c8a31 100644
> --- a/net/ceph/osd_client.c
> +++ b/net/ceph/osd_client.c
> @@ -376,6 +376,7 @@ static void osd_req_op_data_release(struct ceph_osd_request *osd_req,
>   
>   	switch (op->op) {
>   	case CEPH_OSD_OP_READ:
> +	case CEPH_OSD_OP_SPARSE_READ:
>   	case CEPH_OSD_OP_WRITE:
>   	case CEPH_OSD_OP_WRITEFULL:
>   		kfree(op->extent.sparse_ext);
> @@ -707,6 +708,7 @@ static void get_num_data_items(struct ceph_osd_request *req,
>   		/* reply */
>   		case CEPH_OSD_OP_STAT:
>   		case CEPH_OSD_OP_READ:
> +		case CEPH_OSD_OP_SPARSE_READ:
>   		case CEPH_OSD_OP_LIST_WATCHERS:
>   			*num_reply_data_items += 1;
>   			break;
> @@ -776,7 +778,7 @@ void osd_req_op_extent_init(struct ceph_osd_request *osd_req,
>   
>   	BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE &&
>   	       opcode != CEPH_OSD_OP_WRITEFULL && opcode != CEPH_OSD_OP_ZERO &&
> -	       opcode != CEPH_OSD_OP_TRUNCATE);
> +	       opcode != CEPH_OSD_OP_TRUNCATE && opcode != CEPH_OSD_OP_SPARSE_READ);
>   
>   	op->extent.offset = offset;
>   	op->extent.length = length;
> @@ -985,6 +987,7 @@ static u32 osd_req_encode_op(struct ceph_osd_op *dst,
>   	case CEPH_OSD_OP_STAT:
>   		break;
>   	case CEPH_OSD_OP_READ:
> +	case CEPH_OSD_OP_SPARSE_READ:
>   	case CEPH_OSD_OP_WRITE:
>   	case CEPH_OSD_OP_WRITEFULL:
>   	case CEPH_OSD_OP_ZERO:
> @@ -1081,7 +1084,8 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
>   
>   	BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE &&
>   	       opcode != CEPH_OSD_OP_ZERO && opcode != CEPH_OSD_OP_TRUNCATE &&
> -	       opcode != CEPH_OSD_OP_CREATE && opcode != CEPH_OSD_OP_DELETE);
> +	       opcode != CEPH_OSD_OP_CREATE && opcode != CEPH_OSD_OP_DELETE &&
> +	       opcode != CEPH_OSD_OP_SPARSE_READ);
>   
>   	req = ceph_osdc_alloc_request(osdc, snapc, num_ops, use_mempool,
>   					GFP_NOFS);
> @@ -1222,6 +1226,13 @@ static void osd_init(struct ceph_osd *osd)
>   	mutex_init(&osd->lock);
>   }
>   
> +static void ceph_init_sparse_read(struct ceph_sparse_read *sr)
> +{
> +	kfree(sr->sr_extent);
> +	memset(sr, '\0', sizeof(*sr));
> +	sr->sr_state = CEPH_SPARSE_READ_HDR;
> +}
> +
>   static void osd_cleanup(struct ceph_osd *osd)
>   {
>   	WARN_ON(!RB_EMPTY_NODE(&osd->o_node));
> @@ -1232,6 +1243,8 @@ static void osd_cleanup(struct ceph_osd *osd)
>   	WARN_ON(!list_empty(&osd->o_osd_lru));
>   	WARN_ON(!list_empty(&osd->o_keepalive_item));
>   
> +	ceph_init_sparse_read(&osd->o_sparse_read);
> +
>   	if (osd->o_auth.authorizer) {
>   		WARN_ON(osd_homeless(osd));
>   		ceph_auth_destroy_authorizer(osd->o_auth.authorizer);
> @@ -1251,6 +1264,9 @@ static struct ceph_osd *create_osd(struct ceph_osd_client *osdc, int onum)
>   	osd_init(osd);
>   	osd->o_osdc = osdc;
>   	osd->o_osd = onum;
> +	osd->o_sparse_op_idx = -1;
> +
> +	ceph_init_sparse_read(&osd->o_sparse_read);
>   
>   	ceph_con_init(&osd->o_con, osd, &osd_con_ops, &osdc->client->msgr);
>   
> @@ -2055,6 +2071,7 @@ static void setup_request_data(struct ceph_osd_request *req)
>   					       &op->raw_data_in);
>   			break;
>   		case CEPH_OSD_OP_READ:
> +		case CEPH_OSD_OP_SPARSE_READ:
>   			ceph_osdc_msg_data_add(reply_msg,
>   					       &op->extent.osd_data);
>   			break;
> @@ -2470,8 +2487,10 @@ static void finish_request(struct ceph_osd_request *req)
>   
>   	req->r_end_latency = ktime_get();
>   
> -	if (req->r_osd)
> +	if (req->r_osd) {
> +		ceph_init_sparse_read(&req->r_osd->o_sparse_read);
>   		unlink_request(req->r_osd, req);
> +	}
>   	atomic_dec(&osdc->num_requests);
>   
>   	/*
> @@ -5416,6 +5435,24 @@ static void osd_dispatch(struct ceph_connection *con, struct ceph_msg *msg)
>   	ceph_msg_put(msg);
>   }
>   
> +/* How much sparse data was requested? */
> +static u64 sparse_data_requested(struct ceph_osd_request *req)
> +{
> +	u64 len = 0;
> +
> +	if (req->r_flags & CEPH_OSD_FLAG_READ) {
> +		int i;
> +
> +		for (i = 0; i < req->r_num_ops; ++i) {
> +			struct ceph_osd_req_op *op = &req->r_ops[i];
> +
> +			if (op->op == CEPH_OSD_OP_SPARSE_READ)
> +				len += op->extent.length;
> +		}
> +	}
> +	return len;
> +}
> +
>   /*
>    * Lookup and return message for incoming reply.  Don't try to do
>    * anything about a larger than preallocated data portion of the
> @@ -5432,6 +5469,7 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
>   	int front_len = le32_to_cpu(hdr->front_len);
>   	int data_len = le32_to_cpu(hdr->data_len);
>   	u64 tid = le64_to_cpu(hdr->tid);
> +	u64 srlen;
>   
>   	down_read(&osdc->lock);
>   	if (!osd_registered(osd)) {
> @@ -5464,7 +5502,8 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
>   		req->r_reply = m;
>   	}
>   
> -	if (data_len > req->r_reply->data_length) {
> +	srlen = sparse_data_requested(req);
> +	if (!srlen && (data_len > req->r_reply->data_length)) {
>   		pr_warn("%s osd%d tid %llu data %d > preallocated %zu, skipping\n",
>   			__func__, osd->o_osd, req->r_tid, data_len,
>   			req->r_reply->data_length);
> @@ -5474,6 +5513,8 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
>   	}
>   
>   	m = ceph_msg_get(req->r_reply);
> +	m->sparse_read = (bool)srlen;
> +
>   	dout("get_reply tid %lld %p\n", tid, m);
>   
>   out_unlock_session:
> @@ -5706,9 +5747,198 @@ static int osd_check_message_signature(struct ceph_msg *msg)
>   	return ceph_auth_check_message_signature(auth, msg);
>   }
>   
> +static void advance_cursor(struct ceph_msg_data_cursor *cursor, size_t len, bool zero)
> +{
> +	while (len) {
> +		struct page *page;
> +		size_t poff, plen;
> +		bool last = false;
> +
> +		page = ceph_msg_data_next(cursor, &poff, &plen, &last);
> +		if (plen > len)
> +			plen = len;
> +		if (zero)
> +			zero_user_segment(page, poff, poff + plen);
> +		len -= plen;
> +		ceph_msg_data_advance(cursor, plen);
> +	}
> +}
> +
> +static int prep_next_sparse_read(struct ceph_connection *con,
> +				 struct ceph_msg_data_cursor *cursor)
> +{
> +	struct ceph_osd *o = con->private;
> +	struct ceph_sparse_read *sr = &o->o_sparse_read;
> +	struct ceph_osd_request *req;
> +	struct ceph_osd_req_op *op;
> +
> +	spin_lock(&o->o_requests_lock);
> +	req = lookup_request(&o->o_requests, le64_to_cpu(con->in_msg->hdr.tid));
> +	if (!req) {
> +		spin_unlock(&o->o_requests_lock);
> +		return -EBADR;
> +	}
> +
> +	if (o->o_sparse_op_idx < 0) {
> +		u64 srlen = sparse_data_requested(req);
> +
> +		dout("%s: [%d] starting new sparse read req. srlen=0x%llx\n",
> +			__func__, o->o_osd, srlen);
> +		ceph_msg_data_cursor_init(cursor, con->in_msg, srlen);
> +	} else {
> +		op = &req->r_ops[o->o_sparse_op_idx];
> +
> +		WARN_ON_ONCE(op->extent.sparse_ext);
> +
> +		/* hand back buffer we took earlier */
> +		op->extent.sparse_ext = sr->sr_extent;
> +		sr->sr_extent = NULL;
> +		op->extent.sparse_ext_len = sr->sr_count;
> +		sr->sr_ext_len = 0;
> +		dout("%s: [%d] completed extent array len %d cursor->resid %zd\n",
> +			__func__, o->o_osd, op->extent.sparse_ext_len,
> +			cursor->resid);
> +		/*
> +		 * FIXME: Need to advance to the next data item here, in the
> +		 * event that there are multiple sparse read requests. Is this
> +		 * the right way to do that?
> +		 */
> +		if (cursor->resid)
> +			advance_cursor(cursor, cursor->resid, false);
> +	}
> +
> +	ceph_init_sparse_read(sr);
> +
> +	/* find next op in this request (if any) */
> +	while (++o->o_sparse_op_idx < req->r_num_ops) {
> +		op = &req->r_ops[o->o_sparse_op_idx];
> +		if (op->op == CEPH_OSD_OP_SPARSE_READ)
> +			goto found;
> +	}
> +
> +	/* reset for next sparse read request */
> +	spin_unlock(&o->o_requests_lock);
> +	o->o_sparse_op_idx = -1;
> +	return 0;
> +found:
> +	sr->sr_req_off = op->extent.offset;
> +	sr->sr_req_len = op->extent.length;
> +	sr->sr_pos = sr->sr_req_off;
> +	dout("%s: [%d] new sparse read op at idx %d 0x%llx~0x%llx\n", __func__,
> +		o->o_osd, o->o_sparse_op_idx, sr->sr_req_off, sr->sr_req_len);
> +
> +	/* hand off request's sparse extent map buffer */
> +	sr->sr_ext_len = op->extent.sparse_ext_len;
> +	op->extent.sparse_ext_len = 0;
> +	sr->sr_extent = op->extent.sparse_ext;
> +	op->extent.sparse_ext = NULL;
> +
> +	spin_unlock(&o->o_requests_lock);
> +	return 1;
> +}
> +
> +#ifdef __BIG_ENDIAN
> +static inline void convert_extent_map(struct ceph_sparse_read *sr)
> +{
> +	int i;
> +
> +	for (i = 0; i < sr->sr_count; i++) {
> +		struct ceph_sparse_extent *ext = sr->sr_extent[i];
> +
> +		ext->off = le64_to_cpu((__force __le32)ext->off);
> +		ext->len = le64_to_cpu((__force __le32)ext->len);

Why '__le32' ? Shouldn't it be '__le64' ?

-- Xiubo


> +	}
> +}
> +#else
> +static inline void convert_extent_map(struct ceph_sparse_read *sr)
> +{
> +}
> +#endif
> +
> +static int osd_sparse_read(struct ceph_connection *con,
> +			   struct ceph_msg_data_cursor *cursor,
> +			   u64 *plen, char **pbuf)
> +{
> +	struct ceph_osd *o = con->private;
> +	struct ceph_sparse_read *sr = &o->o_sparse_read;
> +	u32 count = sr->sr_count;
> +	u64 eoff, elen;
> +	int ret;
> +
> +	switch (sr->sr_state) {
> +	case CEPH_SPARSE_READ_HDR:
> +next_op:
> +		ret = prep_next_sparse_read(con, cursor);
> +		if (ret <= 0)
> +			return ret;
> +
> +		/* number of extents */
> +		*plen = sizeof(sr->sr_count);
> +		*pbuf = (char *)&sr->sr_count;
> +		sr->sr_state = CEPH_SPARSE_READ_EXTENTS;
> +		break;
> +	case CEPH_SPARSE_READ_EXTENTS:
> +		/* Convert sr_count to host-endian */
> +		count = le32_to_cpu((__force __le32)sr->sr_count);
> +		sr->sr_count = count;
> +		dout("[%d] got %u extents\n", o->o_osd, count);
> +
> +		if (count > 0) {
> +			if (!sr->sr_extent || count > sr->sr_ext_len) {
> +				/* no extent array provided, or too short */
> +				kfree(sr->sr_extent);
> +				sr->sr_extent = kmalloc_array(count, sizeof(*sr->sr_extent),
> +							   GFP_NOIO);
> +				if (!sr->sr_extent)
> +					return -ENOMEM;
> +				sr->sr_ext_len = count;
> +			}
> +			*plen = count * sizeof(*sr->sr_extent);
> +			*pbuf = (char *)sr->sr_extent;
> +			sr->sr_state = CEPH_SPARSE_READ_DATA_LEN;
> +			break;
> +		}
> +		/* No extents? Fall through to reading data len */
> +		fallthrough;
> +	case CEPH_SPARSE_READ_DATA_LEN:
> +		convert_extent_map(sr);
> +		*plen = sizeof(sr->sr_datalen);
> +		*pbuf = (char *)&sr->sr_datalen;
> +		sr->sr_state = CEPH_SPARSE_READ_DATA;
> +		break;
> +	case CEPH_SPARSE_READ_DATA:
> +		if (sr->sr_index >= count) {
> +			sr->sr_state = CEPH_SPARSE_READ_HDR;
> +			goto next_op;
> +		}
> +
> +		eoff = sr->sr_extent[sr->sr_index].off;
> +		elen = sr->sr_extent[sr->sr_index].len;
> +
> +		dout("[%d] ext %d off 0x%llx len 0x%llx\n",
> +		     o->o_osd, sr->sr_index, eoff, elen);
> +
> +		/* zero out anything from sr_pos to start of extent */
> +		if (sr->sr_pos < eoff)
> +			advance_cursor(cursor, eoff - sr->sr_pos, true);
> +
> +		/* Set position to end of extent */
> +		sr->sr_pos = eoff + elen;
> +
> +		/* send back the new length */
> +		*plen = elen;
> +
> +		/* Bump the array index */
> +		++sr->sr_index;
> +		break;
> +	}
> +	return 1;
> +}
> +
>   static const struct ceph_connection_operations osd_con_ops = {
>   	.get = osd_get_con,
>   	.put = osd_put_con,
> +	.sparse_read = osd_sparse_read,
>   	.alloc_msg = osd_alloc_msg,
>   	.dispatch = osd_dispatch,
>   	.fault = osd_fault,


^ permalink raw reply	[flat|nested] 15+ messages in thread

* Re: [PATCH v3 4/5] libceph: add sparse read support to OSD client
  2022-03-22  1:58   ` Xiubo Li
@ 2022-03-22  2:09     ` Xiubo Li
  2022-03-22 10:03     ` Jeff Layton
  1 sibling, 0 replies; 15+ messages in thread
From: Xiubo Li @ 2022-03-22  2:09 UTC (permalink / raw)
  To: Jeff Layton, idryomov; +Cc: ceph-devel


On 3/22/22 9:58 AM, Xiubo Li wrote:
>
> On 3/18/22 9:50 PM, Jeff Layton wrote:
>>
...
>> +
>> +#ifdef __BIG_ENDIAN
>> +static inline void convert_extent_map(struct ceph_sparse_read *sr)
>> +{
>> +    int i;
>> +
>> +    for (i = 0; i < sr->sr_count; i++) {
>> +        struct ceph_sparse_extent *ext = sr->sr_extent[i];
>> +
>> +        ext->off = le64_to_cpu((__force __le32)ext->off);
>> +        ext->len = le64_to_cpu((__force __le32)ext->len);
>
> Why '__le32' ? Shouldn't it be '__le64' ?
>
Please ignore this, I just received your new patch series after this and 
found you have fixed it.

I will check the new series today.

Thanks.

-- XIubo


^ permalink raw reply	[flat|nested] 15+ messages in thread

* Re: [PATCH v3 4/5] libceph: add sparse read support to OSD client
  2022-03-22  1:58   ` Xiubo Li
  2022-03-22  2:09     ` Xiubo Li
@ 2022-03-22 10:03     ` Jeff Layton
  1 sibling, 0 replies; 15+ messages in thread
From: Jeff Layton @ 2022-03-22 10:03 UTC (permalink / raw)
  To: Xiubo Li, idryomov; +Cc: ceph-devel

On Tue, 2022-03-22 at 09:58 +0800, Xiubo Li wrote:
> On 3/18/22 9:50 PM, Jeff Layton wrote:
> > Add a new sparse_read operation for the OSD client, driven by its own
> > state machine. The messenger can repeatedly call the sparse_read
> > operation, and it will pass back the necessary info to set up to read
> > the next extent of data, while zero-filling the sparse regions.
> > 
> > Signed-off-by: Jeff Layton <jlayton@kernel.org>
> > ---
> >   include/linux/ceph/osd_client.h |  32 +++++
> >   net/ceph/osd_client.c           | 238 +++++++++++++++++++++++++++++++-
> >   2 files changed, 266 insertions(+), 4 deletions(-)
> > 
> > diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h
> > index 00a5b53a6763..2c5f9eb7d888 100644
> > --- a/include/linux/ceph/osd_client.h
> > +++ b/include/linux/ceph/osd_client.h
> > @@ -40,6 +40,36 @@ struct ceph_sparse_extent {
> >   	u64	len;
> >   } __attribute__((packed));
> >   
> > +/* Sparse read state machine state values */
> > +enum ceph_sparse_read_state {
> > +	CEPH_SPARSE_READ_HDR	= 0,
> > +	CEPH_SPARSE_READ_EXTENTS,
> > +	CEPH_SPARSE_READ_DATA_LEN,
> > +	CEPH_SPARSE_READ_DATA,
> > +};
> > +
> > +/*
> > + * A SPARSE_READ reply is a 32-bit count of extents, followed by an array of
> > + * 64-bit offset/length pairs, and then all of the actual file data
> > + * concatenated after it (sans holes).
> > + *
> > + * Unfortunately, we don't know how long the extent array is until we've
> > + * started reading the data section of the reply. The caller should send down
> > + * a destination buffer for the array, but we'll alloc one if it's too small
> > + * or if the caller doesn't.
> > + */
> > +struct ceph_sparse_read {
> > +	enum ceph_sparse_read_state	sr_state;	/* state machine state */
> > +	u64				sr_req_off;	/* orig request offset */
> > +	u64				sr_req_len;	/* orig request length */
> > +	u64				sr_pos;		/* current pos in buffer */
> > +	int				sr_index;	/* current extent index */
> > +	__le32				sr_datalen;	/* length of actual data */
> > +	u32				sr_count;	/* extent count in reply */
> > +	int				sr_ext_len;	/* length of extent array */
> > +	struct ceph_sparse_extent	*sr_extent;	/* extent array */
> > +};
> > +
> >   /*
> >    * A given osd we're communicating with.
> >    *
> > @@ -48,6 +78,7 @@ struct ceph_sparse_extent {
> >    */
> >   struct ceph_osd {
> >   	refcount_t o_ref;
> > +	int o_sparse_op_idx;
> >   	struct ceph_osd_client *o_osdc;
> >   	int o_osd;
> >   	int o_incarnation;
> > @@ -63,6 +94,7 @@ struct ceph_osd {
> >   	unsigned long lru_ttl;
> >   	struct list_head o_keepalive_item;
> >   	struct mutex lock;
> > +	struct ceph_sparse_read	o_sparse_read;
> >   };
> >   
> >   #define CEPH_OSD_SLAB_OPS	2
> > diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
> > index 9fec258e1f8d..3694696c8a31 100644
> > --- a/net/ceph/osd_client.c
> > +++ b/net/ceph/osd_client.c
> > @@ -376,6 +376,7 @@ static void osd_req_op_data_release(struct ceph_osd_request *osd_req,
> >   
> >   	switch (op->op) {
> >   	case CEPH_OSD_OP_READ:
> > +	case CEPH_OSD_OP_SPARSE_READ:
> >   	case CEPH_OSD_OP_WRITE:
> >   	case CEPH_OSD_OP_WRITEFULL:
> >   		kfree(op->extent.sparse_ext);
> > @@ -707,6 +708,7 @@ static void get_num_data_items(struct ceph_osd_request *req,
> >   		/* reply */
> >   		case CEPH_OSD_OP_STAT:
> >   		case CEPH_OSD_OP_READ:
> > +		case CEPH_OSD_OP_SPARSE_READ:
> >   		case CEPH_OSD_OP_LIST_WATCHERS:
> >   			*num_reply_data_items += 1;
> >   			break;
> > @@ -776,7 +778,7 @@ void osd_req_op_extent_init(struct ceph_osd_request *osd_req,
> >   
> >   	BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE &&
> >   	       opcode != CEPH_OSD_OP_WRITEFULL && opcode != CEPH_OSD_OP_ZERO &&
> > -	       opcode != CEPH_OSD_OP_TRUNCATE);
> > +	       opcode != CEPH_OSD_OP_TRUNCATE && opcode != CEPH_OSD_OP_SPARSE_READ);
> >   
> >   	op->extent.offset = offset;
> >   	op->extent.length = length;
> > @@ -985,6 +987,7 @@ static u32 osd_req_encode_op(struct ceph_osd_op *dst,
> >   	case CEPH_OSD_OP_STAT:
> >   		break;
> >   	case CEPH_OSD_OP_READ:
> > +	case CEPH_OSD_OP_SPARSE_READ:
> >   	case CEPH_OSD_OP_WRITE:
> >   	case CEPH_OSD_OP_WRITEFULL:
> >   	case CEPH_OSD_OP_ZERO:
> > @@ -1081,7 +1084,8 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
> >   
> >   	BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE &&
> >   	       opcode != CEPH_OSD_OP_ZERO && opcode != CEPH_OSD_OP_TRUNCATE &&
> > -	       opcode != CEPH_OSD_OP_CREATE && opcode != CEPH_OSD_OP_DELETE);
> > +	       opcode != CEPH_OSD_OP_CREATE && opcode != CEPH_OSD_OP_DELETE &&
> > +	       opcode != CEPH_OSD_OP_SPARSE_READ);
> >   
> >   	req = ceph_osdc_alloc_request(osdc, snapc, num_ops, use_mempool,
> >   					GFP_NOFS);
> > @@ -1222,6 +1226,13 @@ static void osd_init(struct ceph_osd *osd)
> >   	mutex_init(&osd->lock);
> >   }
> >   
> > +static void ceph_init_sparse_read(struct ceph_sparse_read *sr)
> > +{
> > +	kfree(sr->sr_extent);
> > +	memset(sr, '\0', sizeof(*sr));
> > +	sr->sr_state = CEPH_SPARSE_READ_HDR;
> > +}
> > +
> >   static void osd_cleanup(struct ceph_osd *osd)
> >   {
> >   	WARN_ON(!RB_EMPTY_NODE(&osd->o_node));
> > @@ -1232,6 +1243,8 @@ static void osd_cleanup(struct ceph_osd *osd)
> >   	WARN_ON(!list_empty(&osd->o_osd_lru));
> >   	WARN_ON(!list_empty(&osd->o_keepalive_item));
> >   
> > +	ceph_init_sparse_read(&osd->o_sparse_read);
> > +
> >   	if (osd->o_auth.authorizer) {
> >   		WARN_ON(osd_homeless(osd));
> >   		ceph_auth_destroy_authorizer(osd->o_auth.authorizer);
> > @@ -1251,6 +1264,9 @@ static struct ceph_osd *create_osd(struct ceph_osd_client *osdc, int onum)
> >   	osd_init(osd);
> >   	osd->o_osdc = osdc;
> >   	osd->o_osd = onum;
> > +	osd->o_sparse_op_idx = -1;
> > +
> > +	ceph_init_sparse_read(&osd->o_sparse_read);
> >   
> >   	ceph_con_init(&osd->o_con, osd, &osd_con_ops, &osdc->client->msgr);
> >   
> > @@ -2055,6 +2071,7 @@ static void setup_request_data(struct ceph_osd_request *req)
> >   					       &op->raw_data_in);
> >   			break;
> >   		case CEPH_OSD_OP_READ:
> > +		case CEPH_OSD_OP_SPARSE_READ:
> >   			ceph_osdc_msg_data_add(reply_msg,
> >   					       &op->extent.osd_data);
> >   			break;
> > @@ -2470,8 +2487,10 @@ static void finish_request(struct ceph_osd_request *req)
> >   
> >   	req->r_end_latency = ktime_get();
> >   
> > -	if (req->r_osd)
> > +	if (req->r_osd) {
> > +		ceph_init_sparse_read(&req->r_osd->o_sparse_read);
> >   		unlink_request(req->r_osd, req);
> > +	}
> >   	atomic_dec(&osdc->num_requests);
> >   
> >   	/*
> > @@ -5416,6 +5435,24 @@ static void osd_dispatch(struct ceph_connection *con, struct ceph_msg *msg)
> >   	ceph_msg_put(msg);
> >   }
> >   
> > +/* How much sparse data was requested? */
> > +static u64 sparse_data_requested(struct ceph_osd_request *req)
> > +{
> > +	u64 len = 0;
> > +
> > +	if (req->r_flags & CEPH_OSD_FLAG_READ) {
> > +		int i;
> > +
> > +		for (i = 0; i < req->r_num_ops; ++i) {
> > +			struct ceph_osd_req_op *op = &req->r_ops[i];
> > +
> > +			if (op->op == CEPH_OSD_OP_SPARSE_READ)
> > +				len += op->extent.length;
> > +		}
> > +	}
> > +	return len;
> > +}
> > +
> >   /*
> >    * Lookup and return message for incoming reply.  Don't try to do
> >    * anything about a larger than preallocated data portion of the
> > @@ -5432,6 +5469,7 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
> >   	int front_len = le32_to_cpu(hdr->front_len);
> >   	int data_len = le32_to_cpu(hdr->data_len);
> >   	u64 tid = le64_to_cpu(hdr->tid);
> > +	u64 srlen;
> >   
> >   	down_read(&osdc->lock);
> >   	if (!osd_registered(osd)) {
> > @@ -5464,7 +5502,8 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
> >   		req->r_reply = m;
> >   	}
> >   
> > -	if (data_len > req->r_reply->data_length) {
> > +	srlen = sparse_data_requested(req);
> > +	if (!srlen && (data_len > req->r_reply->data_length)) {
> >   		pr_warn("%s osd%d tid %llu data %d > preallocated %zu, skipping\n",
> >   			__func__, osd->o_osd, req->r_tid, data_len,
> >   			req->r_reply->data_length);
> > @@ -5474,6 +5513,8 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
> >   	}
> >   
> >   	m = ceph_msg_get(req->r_reply);
> > +	m->sparse_read = (bool)srlen;
> > +
> >   	dout("get_reply tid %lld %p\n", tid, m);
> >   
> >   out_unlock_session:
> > @@ -5706,9 +5747,198 @@ static int osd_check_message_signature(struct ceph_msg *msg)
> >   	return ceph_auth_check_message_signature(auth, msg);
> >   }
> >   
> > +static void advance_cursor(struct ceph_msg_data_cursor *cursor, size_t len, bool zero)
> > +{
> > +	while (len) {
> > +		struct page *page;
> > +		size_t poff, plen;
> > +		bool last = false;
> > +
> > +		page = ceph_msg_data_next(cursor, &poff, &plen, &last);
> > +		if (plen > len)
> > +			plen = len;
> > +		if (zero)
> > +			zero_user_segment(page, poff, poff + plen);
> > +		len -= plen;
> > +		ceph_msg_data_advance(cursor, plen);
> > +	}
> > +}
> > +
> > +static int prep_next_sparse_read(struct ceph_connection *con,
> > +				 struct ceph_msg_data_cursor *cursor)
> > +{
> > +	struct ceph_osd *o = con->private;
> > +	struct ceph_sparse_read *sr = &o->o_sparse_read;
> > +	struct ceph_osd_request *req;
> > +	struct ceph_osd_req_op *op;
> > +
> > +	spin_lock(&o->o_requests_lock);
> > +	req = lookup_request(&o->o_requests, le64_to_cpu(con->in_msg->hdr.tid));
> > +	if (!req) {
> > +		spin_unlock(&o->o_requests_lock);
> > +		return -EBADR;
> > +	}
> > +
> > +	if (o->o_sparse_op_idx < 0) {
> > +		u64 srlen = sparse_data_requested(req);
> > +
> > +		dout("%s: [%d] starting new sparse read req. srlen=0x%llx\n",
> > +			__func__, o->o_osd, srlen);
> > +		ceph_msg_data_cursor_init(cursor, con->in_msg, srlen);
> > +	} else {
> > +		op = &req->r_ops[o->o_sparse_op_idx];
> > +
> > +		WARN_ON_ONCE(op->extent.sparse_ext);
> > +
> > +		/* hand back buffer we took earlier */
> > +		op->extent.sparse_ext = sr->sr_extent;
> > +		sr->sr_extent = NULL;
> > +		op->extent.sparse_ext_len = sr->sr_count;
> > +		sr->sr_ext_len = 0;
> > +		dout("%s: [%d] completed extent array len %d cursor->resid %zd\n",
> > +			__func__, o->o_osd, op->extent.sparse_ext_len,
> > +			cursor->resid);
> > +		/*
> > +		 * FIXME: Need to advance to the next data item here, in the
> > +		 * event that there are multiple sparse read requests. Is this
> > +		 * the right way to do that?
> > +		 */
> > +		if (cursor->resid)
> > +			advance_cursor(cursor, cursor->resid, false);
> > +	}
> > +
> > +	ceph_init_sparse_read(sr);
> > +
> > +	/* find next op in this request (if any) */
> > +	while (++o->o_sparse_op_idx < req->r_num_ops) {
> > +		op = &req->r_ops[o->o_sparse_op_idx];
> > +		if (op->op == CEPH_OSD_OP_SPARSE_READ)
> > +			goto found;
> > +	}
> > +
> > +	/* reset for next sparse read request */
> > +	spin_unlock(&o->o_requests_lock);
> > +	o->o_sparse_op_idx = -1;
> > +	return 0;
> > +found:
> > +	sr->sr_req_off = op->extent.offset;
> > +	sr->sr_req_len = op->extent.length;
> > +	sr->sr_pos = sr->sr_req_off;
> > +	dout("%s: [%d] new sparse read op at idx %d 0x%llx~0x%llx\n", __func__,
> > +		o->o_osd, o->o_sparse_op_idx, sr->sr_req_off, sr->sr_req_len);
> > +
> > +	/* hand off request's sparse extent map buffer */
> > +	sr->sr_ext_len = op->extent.sparse_ext_len;
> > +	op->extent.sparse_ext_len = 0;
> > +	sr->sr_extent = op->extent.sparse_ext;
> > +	op->extent.sparse_ext = NULL;
> > +
> > +	spin_unlock(&o->o_requests_lock);
> > +	return 1;
> > +}
> > +
> > +#ifdef __BIG_ENDIAN
> > +static inline void convert_extent_map(struct ceph_sparse_read *sr)
> > +{
> > +	int i;
> > +
> > +	for (i = 0; i < sr->sr_count; i++) {
> > +		struct ceph_sparse_extent *ext = sr->sr_extent[i];
> > +
> > +		ext->off = le64_to_cpu((__force __le32)ext->off);
> > +		ext->len = le64_to_cpu((__force __le32)ext->len);
> 
> Why '__le32' ? Shouldn't it be '__le64' ?
> 
> 

Cut and paste error. I fixed that before I sent out v4.

> 
> > +	}
> > +}
> > +#else
> > +static inline void convert_extent_map(struct ceph_sparse_read *sr)
> > +{
> > +}
> > +#endif
> > +
> > +static int osd_sparse_read(struct ceph_connection *con,
> > +			   struct ceph_msg_data_cursor *cursor,
> > +			   u64 *plen, char **pbuf)
> > +{
> > +	struct ceph_osd *o = con->private;
> > +	struct ceph_sparse_read *sr = &o->o_sparse_read;
> > +	u32 count = sr->sr_count;
> > +	u64 eoff, elen;
> > +	int ret;
> > +
> > +	switch (sr->sr_state) {
> > +	case CEPH_SPARSE_READ_HDR:
> > +next_op:
> > +		ret = prep_next_sparse_read(con, cursor);
> > +		if (ret <= 0)
> > +			return ret;
> > +
> > +		/* number of extents */
> > +		*plen = sizeof(sr->sr_count);
> > +		*pbuf = (char *)&sr->sr_count;
> > +		sr->sr_state = CEPH_SPARSE_READ_EXTENTS;
> > +		break;
> > +	case CEPH_SPARSE_READ_EXTENTS:
> > +		/* Convert sr_count to host-endian */
> > +		count = le32_to_cpu((__force __le32)sr->sr_count);
> > +		sr->sr_count = count;
> > +		dout("[%d] got %u extents\n", o->o_osd, count);
> > +
> > +		if (count > 0) {
> > +			if (!sr->sr_extent || count > sr->sr_ext_len) {
> > +				/* no extent array provided, or too short */
> > +				kfree(sr->sr_extent);
> > +				sr->sr_extent = kmalloc_array(count, sizeof(*sr->sr_extent),
> > +							   GFP_NOIO);
> > +				if (!sr->sr_extent)
> > +					return -ENOMEM;
> > +				sr->sr_ext_len = count;
> > +			}
> > +			*plen = count * sizeof(*sr->sr_extent);
> > +			*pbuf = (char *)sr->sr_extent;
> > +			sr->sr_state = CEPH_SPARSE_READ_DATA_LEN;
> > +			break;
> > +		}
> > +		/* No extents? Fall through to reading data len */
> > +		fallthrough;
> > +	case CEPH_SPARSE_READ_DATA_LEN:
> > +		convert_extent_map(sr);
> > +		*plen = sizeof(sr->sr_datalen);
> > +		*pbuf = (char *)&sr->sr_datalen;
> > +		sr->sr_state = CEPH_SPARSE_READ_DATA;
> > +		break;
> > +	case CEPH_SPARSE_READ_DATA:
> > +		if (sr->sr_index >= count) {
> > +			sr->sr_state = CEPH_SPARSE_READ_HDR;
> > +			goto next_op;
> > +		}
> > +
> > +		eoff = sr->sr_extent[sr->sr_index].off;
> > +		elen = sr->sr_extent[sr->sr_index].len;
> > +
> > +		dout("[%d] ext %d off 0x%llx len 0x%llx\n",
> > +		     o->o_osd, sr->sr_index, eoff, elen);
> > +
> > +		/* zero out anything from sr_pos to start of extent */
> > +		if (sr->sr_pos < eoff)
> > +			advance_cursor(cursor, eoff - sr->sr_pos, true);
> > +
> > +		/* Set position to end of extent */
> > +		sr->sr_pos = eoff + elen;
> > +
> > +		/* send back the new length */
> > +		*plen = elen;
> > +
> > +		/* Bump the array index */
> > +		++sr->sr_index;
> > +		break;
> > +	}
> > +	return 1;
> > +}
> > +
> >   static const struct ceph_connection_operations osd_con_ops = {
> >   	.get = osd_get_con,
> >   	.put = osd_put_con,
> > +	.sparse_read = osd_sparse_read,
> >   	.alloc_msg = osd_alloc_msg,
> >   	.dispatch = osd_dispatch,
> >   	.fault = osd_fault,
> 

-- 
Jeff Layton <jlayton@kernel.org>

^ permalink raw reply	[flat|nested] 15+ messages in thread

end of thread, other threads:[~2022-03-22 10:03 UTC | newest]

Thread overview: 15+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2022-03-18 13:50 [PATCH v3 0/5] ceph/libceph: add support for sparse reads to msgr2 crc codepath Jeff Layton
2022-03-18 13:50 ` [PATCH v3 1/5] libceph: add spinlock around osd->o_requests Jeff Layton
2022-03-18 13:50 ` [PATCH v3 2/5] libceph: define struct ceph_sparse_extent and add some helpers Jeff Layton
2022-03-21  7:57   ` Xiubo Li
2022-03-21 10:02     ` Jeff Layton
2022-03-18 13:50 ` [PATCH v3 3/5] libceph: add sparse read support to msgr2 crc state machine Jeff Layton
2022-03-18 13:50 ` [PATCH v3 4/5] libceph: add sparse read support to OSD client Jeff Layton
2022-03-21  8:41   ` Xiubo Li
2022-03-21 10:01     ` Jeff Layton
2022-03-21 14:17   ` Jeff Layton
2022-03-22  1:58   ` Xiubo Li
2022-03-22  2:09     ` Xiubo Li
2022-03-22 10:03     ` Jeff Layton
2022-03-18 13:50 ` [PATCH v3 5/5] ceph: convert to sparse reads Jeff Layton
2022-03-21 12:30   ` Jeff Layton

This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.