All of lore.kernel.org
 help / color / mirror / Atom feed
From: Alex Elder <elder@inktank.com>
To: ceph-devel@vger.kernel.org
Subject: [PATCH 7/8] libceph: implement bio message data item cursor
Date: Sun, 10 Mar 2013 14:17:35 -0500	[thread overview]
Message-ID: <513CDC4F.3090109@inktank.com> (raw)
In-Reply-To: <513CD9BE.1070505@inktank.com>

Implement and use cursor routines for bio message data items for
outbound message data.

(See the previous commit for reasoning in support of the changes
in out_msg_pos_next().)

Signed-off-by: Alex Elder <elder@inktank.com>
---
 include/linux/ceph/messenger.h |    7 +++
 net/ceph/messenger.c           |  128
++++++++++++++++++++++++++++++++++------
 2 files changed, 118 insertions(+), 17 deletions(-)

diff --git a/include/linux/ceph/messenger.h b/include/linux/ceph/messenger.h
index 716c3fd..76b4645 100644
--- a/include/linux/ceph/messenger.h
+++ b/include/linux/ceph/messenger.h
@@ -98,6 +98,13 @@ static __inline__ bool ceph_msg_data_type_valid(enum
ceph_msg_data_type type)
 struct ceph_msg_data_cursor {
 	bool		last_piece;	/* now at last piece of data item */
 	union {
+#ifdef CONFIG_BLOCK
+		struct {				/* bio */
+			struct bio	*bio;		/* bio from list */
+			unsigned int	vector_index;	/* vector from bio */
+			unsigned int	vector_offset;	/* bytes from vector */
+		};
+#endif /* CONFIG_BLOCK */
 		struct {				/* pagelist */
 			struct page	*page;		/* page from list */
 			size_t		offset;		/* bytes from list */
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index c0f89c1..5ddbe5d 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -739,6 +739,97 @@ static void iter_bio_next(struct bio **bio_iter,
unsigned int *seg)
 	if (*seg == (*bio_iter)->bi_vcnt)
 		init_bio_iter((*bio_iter)->bi_next, bio_iter, seg);
 }
+
+/*
+ * For a bio data item, a piece is whatever remains of the next
+ * entry in the current bio iovec, or the first entry in the next
+ * bio in the list.
+ */
+static void ceph_msg_data_bio_cursor_init(struct ceph_msg_data *data)
+{
+	struct ceph_msg_data_cursor *cursor = &data->cursor;
+	struct bio *bio;
+
+	BUG_ON(data->type != CEPH_MSG_DATA_BIO);
+
+	bio = data->bio;
+	BUG_ON(!bio);
+	BUG_ON(!bio->bi_vcnt);
+	/* resid = bio->bi_size */
+
+	cursor->bio = bio;
+	cursor->vector_index = 0;
+	cursor->vector_offset = 0;
+	cursor->last_piece = !bio->bi_next && bio->bi_vcnt == 1;
+}
+
+static struct page *ceph_msg_data_bio_next(struct ceph_msg_data *data,
+						size_t *page_offset,
+						size_t *length,
+						bool *last_piece)
+{
+	struct ceph_msg_data_cursor *cursor = &data->cursor;
+	struct bio *bio;
+	struct bio_vec *bio_vec;
+	unsigned int index;
+
+	BUG_ON(data->type != CEPH_MSG_DATA_BIO);
+
+	bio = cursor->bio;
+	BUG_ON(!bio);
+
+	index = cursor->vector_index;
+	BUG_ON(index >= (unsigned int) bio->bi_vcnt);
+
+	bio_vec = &bio->bi_io_vec[index];
+	BUG_ON(cursor->vector_offset >= bio_vec->bv_len);
+	*page_offset = (size_t) (bio_vec->bv_offset + cursor->vector_offset);
+	BUG_ON(*page_offset >= PAGE_SIZE);
+	*length = (size_t) (bio_vec->bv_len - cursor->vector_offset);
+	BUG_ON(*length > PAGE_SIZE);
+	*last_piece = cursor->last_piece;
+
+	return bio_vec->bv_page;
+}
+
+static bool ceph_msg_data_bio_advance(struct ceph_msg_data *data,
size_t bytes)
+{
+	struct ceph_msg_data_cursor *cursor = &data->cursor;
+	struct bio *bio;
+	struct bio_vec *bio_vec;
+	unsigned int index;
+
+	BUG_ON(data->type != CEPH_MSG_DATA_BIO);
+
+	bio = cursor->bio;
+	BUG_ON(!bio);
+
+	index = cursor->vector_index;
+	BUG_ON(index >= (unsigned int) bio->bi_vcnt);
+	bio_vec = &bio->bi_io_vec[index];
+	BUG_ON(cursor->vector_offset + bytes > bio_vec->bv_len);
+
+	/* Advance the cursor offset */
+
+	cursor->vector_offset += bytes;
+	if (cursor->vector_offset < bio_vec->bv_len)
+		return false;	/* more bytes to process in this segment */
+
+	/* Move on to the next segment, and possibly the next bio */
+
+	if (++cursor->vector_index == (unsigned int) bio->bi_vcnt) {
+		bio = bio->bi_next;
+		cursor->bio = bio;
+		cursor->vector_index = 0;
+	}
+	cursor->vector_offset = 0;
+
+	if (!cursor->last_piece && bio && !bio->bi_next)
+		if (cursor->vector_index == (unsigned int) bio->bi_vcnt - 1)
+			cursor->last_piece = true;
+
+	return true;
+}
 #endif

 /*
@@ -850,6 +941,8 @@ static void ceph_msg_data_cursor_init(struct
ceph_msg_data *data)
 		break;
 #ifdef CONFIG_BLOCK
 	case CEPH_MSG_DATA_BIO:
+		ceph_msg_data_bio_cursor_init(data);
+		break;
 #endif /* CONFIG_BLOCK */
 	default:
 	    break;
@@ -875,7 +968,8 @@ static struct page *ceph_msg_data_next(struct
ceph_msg_data *data,
 						last_piece);
 #ifdef CONFIG_BLOCK
 	case CEPH_MSG_DATA_BIO:
-		break;
+		return ceph_msg_data_bio_next(data, page_offset, length,
+						last_piece);
 #endif /* CONFIG_BLOCK */
 	}
 	BUG();
@@ -896,7 +990,7 @@ static bool ceph_msg_data_advance(struct
ceph_msg_data *data, size_t bytes)
 		return ceph_msg_data_pagelist_advance(data, bytes);
 #ifdef CONFIG_BLOCK
 	case CEPH_MSG_DATA_BIO:
-		break;
+		return ceph_msg_data_bio_advance(data, bytes);
 #endif /* CONFIG_BLOCK */
 	}
 	BUG();
@@ -923,6 +1017,10 @@ static void prepare_message_data(struct ceph_msg *msg,

 	/* Initialize data cursors */

+#ifdef CONFIG_BLOCK
+	if (ceph_msg_has_bio(msg))
+		ceph_msg_data_cursor_init(&msg->b);
+#endif /* CONFIG_BLOCK */
 	if (ceph_msg_has_pagelist(msg))
 		ceph_msg_data_cursor_init(&msg->l);
 	if (ceph_msg_has_trail(msg))
@@ -1223,6 +1321,10 @@ static void out_msg_pos_next(struct
ceph_connection *con, struct page *page,
 		need_crc = ceph_msg_data_advance(&msg->t, sent);
 	else if (ceph_msg_has_pagelist(msg))
 		need_crc = ceph_msg_data_advance(&msg->l, sent);
+#ifdef CONFIG_BLOCK
+	else if (ceph_msg_has_bio(msg))
+		need_crc = ceph_msg_data_advance(&msg->b, sent);
+#endif /* CONFIG_BLOCK */
 	BUG_ON(need_crc && sent != len);

 	if (sent < len)
@@ -1232,10 +1334,6 @@ static void out_msg_pos_next(struct
ceph_connection *con, struct page *page,
 	msg_pos->page_pos = 0;
 	msg_pos->page++;
 	msg_pos->did_page_crc = false;
-#ifdef CONFIG_BLOCK
-	if (ceph_msg_has_bio(msg))
-		iter_bio_next(&msg->b.bio_iter, &msg->b.bio_seg);
-#endif
 }

 static void in_msg_pos_next(struct ceph_connection *con, size_t len,
@@ -1313,8 +1411,6 @@ static int write_partial_message_data(struct
ceph_connection *con)
 		struct page *page = NULL;
 		size_t page_offset;
 		size_t length;
-		int max_write = PAGE_SIZE;
-		int bio_offset = 0;
 		bool use_cursor = false;
 		bool last_piece = true;	/* preserve existing behavior */

@@ -1335,21 +1431,19 @@ static int write_partial_message_data(struct
ceph_connection *con)
 							&length, &last_piece);
 #ifdef CONFIG_BLOCK
 		} else if (ceph_msg_has_bio(msg)) {
-			struct bio_vec *bv;
-
-			bv = bio_iovec_idx(msg->b.bio_iter, msg->b.bio_seg);
-			page = bv->bv_page;
-			bio_offset = bv->bv_offset;
-			max_write = bv->bv_len;
+			use_cursor = true;
+			page = ceph_msg_data_next(&msg->b, &page_offset,
+							&length, &last_piece);
 #endif
 		} else {
 			page = zero_page;
 		}
-		if (!use_cursor)
-			length = min_t(int, max_write - msg_pos->page_pos,
+		if (!use_cursor) {
+			length = min_t(int, PAGE_SIZE - msg_pos->page_pos,
 					    total_max_write);

-		page_offset = msg_pos->page_pos + bio_offset;
+			page_offset = msg_pos->page_pos;
+		}
 		if (do_datacrc && !msg_pos->did_page_crc) {
 			u32 crc = le32_to_cpu(msg->footer.data_crc);

-- 
1.7.9.5


  parent reply	other threads:[~2013-03-10 19:17 UTC|newest]

Thread overview: 12+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2013-03-10 19:06 [PATCH 0/8] libceph: implement cursor for outgoing data items Alex Elder
2013-03-10 19:12 ` [PATCH 1/8] libceph: define ceph_msg_has_*() data macros Alex Elder
2013-03-10 19:14 ` [PATCH 2/8] libceph: be explicit about message data representation Alex Elder
2013-03-10 19:15 ` [PATCH 3/8] libceph: abstract message data Alex Elder
2013-03-10 19:16 ` [PATCH 4/8] libceph: start defining message data cursor Alex Elder
2013-03-10 19:16 ` [PATCH 5/8] libceph: prepare for other message data item types Alex Elder
2013-03-10 19:17 ` [PATCH 6/8] libceph: use data cursor for message pagelist Alex Elder
2013-03-10 19:17 ` Alex Elder [this message]
2013-03-10 19:17 ` [PATCH 8/8] libceph: implement pages array cursor Alex Elder
2013-03-10 19:19 ` [PATCH 0/8] libceph: implement cursor for outgoing data items Alex Elder
2013-03-11 22:24 ` Josh Durgin
2013-03-11 22:26   ` Alex Elder

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=513CDC4F.3090109@inktank.com \
    --to=elder@inktank.com \
    --cc=ceph-devel@vger.kernel.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.