All of lore.kernel.org
 help / color / mirror / Atom feed
* [PATCH 0/6] libceph: refactor messenger for multiple data sources
@ 2013-03-09 16:41 Alex Elder
  2013-03-09 16:42 ` [PATCH 1/6] libceph: use local variables for message positions Alex Elder
                   ` (6 more replies)
  0 siblings, 7 replies; 10+ messages in thread
From: Alex Elder @ 2013-03-09 16:41 UTC (permalink / raw)
  To: ceph-devel

This series factors out blocks of common code and generally
aims to simplify and make more consistent the way the ceph
messenger processes the data portions of both incoming and
outgoing messages.

They can be found in the "review/wip-msgr-refactor" branch of
the ceph-client git repository.  (That branch is based on
branch "review/wip-abstract-2").

					-Alex

[PATCH 1/6] libceph: use local variables for message positions
[PATCH 2/6] libceph: consolidate message prep code
[PATCH 3/6] libceph: small write_partial_msg_pages() refactor
[PATCH 4/6] libceph: encapsulate reading message data
[PATCH 5/6] libceph: define and use ceph_tcp_recvpage()
[PATCH 6/6] libceph: define and use ceph_crc32c_page()

^ permalink raw reply	[flat|nested] 10+ messages in thread

* [PATCH 1/6] libceph: use local variables for message positions
  2013-03-09 16:41 [PATCH 0/6] libceph: refactor messenger for multiple data sources Alex Elder
@ 2013-03-09 16:42 ` Alex Elder
  2013-03-09 16:42 ` [PATCH 2/6] libceph: consolidate message prep code Alex Elder
                   ` (5 subsequent siblings)
  6 siblings, 0 replies; 10+ messages in thread
From: Alex Elder @ 2013-03-09 16:42 UTC (permalink / raw)
  To: ceph-devel

There are several places where a message's out_msg_pos or in_msg_pos
field is used repeatedly within a function.  Use a local pointer
variable for this purpose to unclutter the code.

This and the upcoming cleanup patches are related to:
    http://tracker.ceph.com/issues/4403

Signed-off-by: Alex Elder <elder@inktank.com>
---
 net/ceph/messenger.c |   85
+++++++++++++++++++++++++++-----------------------
 1 file changed, 46 insertions(+), 39 deletions(-)

diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index 17d9321..7788170 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -725,22 +725,23 @@ static void iter_bio_next(struct bio **bio_iter,
unsigned int *seg)
 static void prepare_write_message_data(struct ceph_connection *con)
 {
 	struct ceph_msg *msg = con->out_msg;
+	struct ceph_msg_pos *msg_pos = &con->out_msg_pos;

 	BUG_ON(!msg);
 	BUG_ON(!msg->hdr.data_len);

 	/* initialize page iterator */
-	con->out_msg_pos.page = 0;
+	msg_pos->page = 0;
 	if (msg->pages)
-		con->out_msg_pos.page_pos = msg->page_alignment;
+		msg_pos->page_pos = msg->page_alignment;
 	else
-		con->out_msg_pos.page_pos = 0;
+		msg_pos->page_pos = 0;
 #ifdef CONFIG_BLOCK
 	if (msg->bio)
 		init_bio_iter(msg->bio, &msg->bio_iter, &msg->bio_seg);
 #endif
-	con->out_msg_pos.data_pos = 0;
-	con->out_msg_pos.did_page_crc = false;
+	msg_pos->data_pos = 0;
+	msg_pos->did_page_crc = false;
 	con->out_more = 1;  /* data + footer will follow */
 }

@@ -1022,19 +1023,20 @@ static void out_msg_pos_next(struct
ceph_connection *con, struct page *page,
 			size_t len, size_t sent, bool in_trail)
 {
 	struct ceph_msg *msg = con->out_msg;
+	struct ceph_msg_pos *msg_pos = &con->out_msg_pos;

 	BUG_ON(!msg);
 	BUG_ON(!sent);

-	con->out_msg_pos.data_pos += sent;
-	con->out_msg_pos.page_pos += sent;
+	msg_pos->data_pos += sent;
+	msg_pos->page_pos += sent;
 	if (sent < len)
 		return;

 	BUG_ON(sent != len);
-	con->out_msg_pos.page_pos = 0;
-	con->out_msg_pos.page++;
-	con->out_msg_pos.did_page_crc = false;
+	msg_pos->page_pos = 0;
+	msg_pos->page++;
+	msg_pos->did_page_crc = false;
 	if (in_trail)
 		list_rotate_left(&msg->trail->head);
 	else if (msg->pagelist)
@@ -1049,18 +1051,19 @@ static void in_msg_pos_next(struct
ceph_connection *con, size_t len,
 				size_t received)
 {
 	struct ceph_msg *msg = con->in_msg;
+	struct ceph_msg_pos *msg_pos = &con->in_msg_pos;

 	BUG_ON(!msg);
 	BUG_ON(!received);

-	con->in_msg_pos.data_pos += received;
-	con->in_msg_pos.page_pos += received;
+	msg_pos->data_pos += received;
+	msg_pos->page_pos += received;
 	if (received < len)
 		return;

 	BUG_ON(received != len);
-	con->in_msg_pos.page_pos = 0;
-	con->in_msg_pos.page++;
+	msg_pos->page_pos = 0;
+	msg_pos->page++;
 #ifdef CONFIG_BLOCK
 	if (msg->bio)
 		iter_bio_next(&msg->bio_iter, &msg->bio_seg);
@@ -1077,6 +1080,7 @@ static void in_msg_pos_next(struct ceph_connection
*con, size_t len,
 static int write_partial_msg_pages(struct ceph_connection *con)
 {
 	struct ceph_msg *msg = con->out_msg;
+	struct ceph_msg_pos *msg_pos = &con->out_msg_pos;
 	unsigned int data_len = le32_to_cpu(msg->hdr.data_len);
 	size_t len;
 	bool do_datacrc = !con->msgr->nocrc;
@@ -1087,7 +1091,7 @@ static int write_partial_msg_pages(struct
ceph_connection *con)
 	const size_t trail_off = data_len - trail_len;

 	dout("write_partial_msg_pages %p msg %p page %d offset %d\n",
-	     con, msg, con->out_msg_pos.page, con->out_msg_pos.page_pos);
+	     con, msg, msg_pos->page, msg_pos->page_pos);

 	/*
 	 * Iterate through each page that contains data to be
@@ -1097,22 +1101,22 @@ static int write_partial_msg_pages(struct
ceph_connection *con)
 	 * need to map the page.  If we have no pages, they have
 	 * been revoked, so use the zero page.
 	 */
-	while (data_len > con->out_msg_pos.data_pos) {
+	while (data_len > msg_pos->data_pos) {
 		struct page *page = NULL;
 		int max_write = PAGE_SIZE;
 		int bio_offset = 0;

-		in_trail = in_trail || con->out_msg_pos.data_pos >= trail_off;
+		in_trail = in_trail || msg_pos->data_pos >= trail_off;
 		if (!in_trail)
-			total_max_write = trail_off - con->out_msg_pos.data_pos;
+			total_max_write = trail_off - msg_pos->data_pos;

 		if (in_trail) {
-			total_max_write = data_len - con->out_msg_pos.data_pos;
+			total_max_write = data_len - msg_pos->data_pos;

 			page = list_first_entry(&msg->trail->head,
 						struct page, lru);
 		} else if (msg->pages) {
-			page = msg->pages[con->out_msg_pos.page];
+			page = msg->pages[msg_pos->page];
 		} else if (msg->pagelist) {
 			page = list_first_entry(&msg->pagelist->head,
 						struct page, lru);
@@ -1128,24 +1132,24 @@ static int write_partial_msg_pages(struct
ceph_connection *con)
 		} else {
 			page = zero_page;
 		}
-		len = min_t(int, max_write - con->out_msg_pos.page_pos,
+		len = min_t(int, max_write - msg_pos->page_pos,
 			    total_max_write);

-		if (do_datacrc && !con->out_msg_pos.did_page_crc) {
+		if (do_datacrc && !msg_pos->did_page_crc) {
 			void *base;
 			u32 crc = le32_to_cpu(msg->footer.data_crc);
 			char *kaddr;

 			kaddr = kmap(page);
 			BUG_ON(kaddr == NULL);
-			base = kaddr + con->out_msg_pos.page_pos + bio_offset;
+			base = kaddr + msg_pos->page_pos + bio_offset;
 			crc = crc32c(crc, base, len);
 			kunmap(page);
 			msg->footer.data_crc = cpu_to_le32(crc);
-			con->out_msg_pos.did_page_crc = true;
+			msg_pos->did_page_crc = true;
 		}
 		ret = ceph_tcp_sendpage(con->sock, page,
-				      con->out_msg_pos.page_pos + bio_offset,
+				      msg_pos->page_pos + bio_offset,
 				      len, true);
 		if (ret <= 0)
 			goto out;
@@ -1803,22 +1807,23 @@ static int read_partial_message_pages(struct
ceph_connection *con,
 				      struct page **pages,
 				      unsigned int data_len, bool do_datacrc)
 {
+	struct ceph_msg_pos *msg_pos = &con->in_msg_pos;
 	struct page *page;
 	void *p;
 	int ret;
 	int left;

-	left = min((int)(data_len - con->in_msg_pos.data_pos),
-		   (int)(PAGE_SIZE - con->in_msg_pos.page_pos));
+	left = min((int)(data_len - msg_pos->data_pos),
+		   (int)(PAGE_SIZE - msg_pos->page_pos));
 	/* (page) data */
 	BUG_ON(pages == NULL);
-	page = pages[con->in_msg_pos.page];
+	page = pages[msg_pos->page];
 	p = kmap(page);
-	ret = ceph_tcp_recvmsg(con->sock, p + con->in_msg_pos.page_pos, left);
+	ret = ceph_tcp_recvmsg(con->sock, p + msg_pos->page_pos, left);
 	if (ret > 0 && do_datacrc)
 		con->in_data_crc =
 			crc32c(con->in_data_crc,
-				  p + con->in_msg_pos.page_pos, ret);
+				  p + msg_pos->page_pos, ret);
 	kunmap(page);
 	if (ret <= 0)
 		return ret;
@@ -1833,6 +1838,7 @@ static int read_partial_message_bio(struct
ceph_connection *con,
 				    unsigned int data_len, bool do_datacrc)
 {
 	struct ceph_msg *msg = con->in_msg;
+	struct ceph_msg_pos *msg_pos = &con->in_msg_pos;
 	struct bio_vec *bv;
 	struct page *page;
 	void *p;
@@ -1842,17 +1848,17 @@ static int read_partial_message_bio(struct
ceph_connection *con,
 	BUG_ON(!msg->bio_iter);
 	bv = bio_iovec_idx(msg->bio_iter, msg->bio_seg);

-	left = min((int)(data_len - con->in_msg_pos.data_pos),
-		   (int)(bv->bv_len - con->in_msg_pos.page_pos));
+	left = min((int)(data_len - msg_pos->data_pos),
+		   (int)(bv->bv_len - msg_pos->page_pos));

 	page = bv->bv_page;
 	p = kmap(page) + bv->bv_offset;

-	ret = ceph_tcp_recvmsg(con->sock, p + con->in_msg_pos.page_pos, left);
+	ret = ceph_tcp_recvmsg(con->sock, p + msg_pos->page_pos, left);
 	if (ret > 0 && do_datacrc)
 		con->in_data_crc =
 			crc32c(con->in_data_crc,
-				  p + con->in_msg_pos.page_pos, ret);
+				  p + msg_pos->page_pos, ret);
 	kunmap(page);
 	if (ret <= 0)
 		return ret;
@@ -1869,6 +1875,7 @@ static int read_partial_message_bio(struct
ceph_connection *con,
 static int read_partial_message(struct ceph_connection *con)
 {
 	struct ceph_msg *m = con->in_msg;
+	struct ceph_msg_pos *msg_pos = &con->in_msg_pos;
 	int size;
 	int end;
 	int ret;
@@ -1949,12 +1956,12 @@ static int read_partial_message(struct
ceph_connection *con)
 		if (m->middle)
 			m->middle->vec.iov_len = 0;

-		con->in_msg_pos.page = 0;
+		msg_pos->page = 0;
 		if (m->pages)
-			con->in_msg_pos.page_pos = m->page_alignment;
+			msg_pos->page_pos = m->page_alignment;
 		else
-			con->in_msg_pos.page_pos = 0;
-		con->in_msg_pos.data_pos = 0;
+			msg_pos->page_pos = 0;
+		msg_pos->data_pos = 0;

 #ifdef CONFIG_BLOCK
 		if (m->bio)
@@ -1978,7 +1985,7 @@ static int read_partial_message(struct
ceph_connection *con)
 	}

 	/* (page) data */
-	while (con->in_msg_pos.data_pos < data_len) {
+	while (msg_pos->data_pos < data_len) {
 		if (m->pages) {
 			ret = read_partial_message_pages(con, m->pages,
 						 data_len, do_datacrc);
-- 
1.7.9.5


^ permalink raw reply related	[flat|nested] 10+ messages in thread

* [PATCH 2/6] libceph: consolidate message prep code
  2013-03-09 16:41 [PATCH 0/6] libceph: refactor messenger for multiple data sources Alex Elder
  2013-03-09 16:42 ` [PATCH 1/6] libceph: use local variables for message positions Alex Elder
@ 2013-03-09 16:42 ` Alex Elder
  2013-03-09 16:43 ` [PATCH 3/6] libceph: small write_partial_msg_pages() refactor Alex Elder
                   ` (4 subsequent siblings)
  6 siblings, 0 replies; 10+ messages in thread
From: Alex Elder @ 2013-03-09 16:42 UTC (permalink / raw)
  To: ceph-devel

In prepare_write_message_data(), various fields are initialized in
preparation for writing message data out.  Meanwhile, in
read_partial_message(), there is essentially the same block of code,
operating on message variables associated with an incoming message.

Generalize prepare_write_message_data() so it works for both
incoming and outcoming messages, and use it in both spots.  The
did_page_crc is not used for input (so it's harmless to initialize
it).

Signed-off-by: Alex Elder <elder@inktank.com>
---
 net/ceph/messenger.c |   28 ++++++++++------------------
 1 file changed, 10 insertions(+), 18 deletions(-)

diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index 7788170..e8fa449 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -722,11 +722,9 @@ static void iter_bio_next(struct bio **bio_iter,
unsigned int *seg)
 }
 #endif

-static void prepare_write_message_data(struct ceph_connection *con)
+static void prepare_message_data(struct ceph_msg *msg,
+				struct ceph_msg_pos *msg_pos)
 {
-	struct ceph_msg *msg = con->out_msg;
-	struct ceph_msg_pos *msg_pos = &con->out_msg_pos;
-
 	BUG_ON(!msg);
 	BUG_ON(!msg->hdr.data_len);

@@ -742,7 +740,6 @@ static void prepare_write_message_data(struct
ceph_connection *con)
 #endif
 	msg_pos->data_pos = 0;
 	msg_pos->did_page_crc = false;
-	con->out_more = 1;  /* data + footer will follow */
 }

 /*
@@ -840,11 +837,13 @@ static void prepare_write_message(struct
ceph_connection *con)

 	/* is there a data payload? */
 	con->out_msg->footer.data_crc = 0;
-	if (m->hdr.data_len)
-		prepare_write_message_data(con);
-	else
+	if (m->hdr.data_len) {
+		prepare_message_data(con->out_msg, &con->out_msg_pos);
+		con->out_more = 1;  /* data + footer will follow */
+	} else {
 		/* no, queue up footer too and be done */
 		prepare_write_message_footer(con);
+	}

 	con_flag_set(con, CON_FLAG_WRITE_PENDING);
 }
@@ -1956,17 +1955,10 @@ static int read_partial_message(struct
ceph_connection *con)
 		if (m->middle)
 			m->middle->vec.iov_len = 0;

-		msg_pos->page = 0;
-		if (m->pages)
-			msg_pos->page_pos = m->page_alignment;
-		else
-			msg_pos->page_pos = 0;
-		msg_pos->data_pos = 0;
+		/* prepare for data payload, if any */

-#ifdef CONFIG_BLOCK
-		if (m->bio)
-			init_bio_iter(m->bio, &m->bio_iter, &m->bio_seg);
-#endif
+		if (data_len)
+			prepare_message_data(con->in_msg, &con->in_msg_pos);
 	}

 	/* front */
-- 
1.7.9.5


^ permalink raw reply related	[flat|nested] 10+ messages in thread

* [PATCH 3/6] libceph: small write_partial_msg_pages() refactor
  2013-03-09 16:41 [PATCH 0/6] libceph: refactor messenger for multiple data sources Alex Elder
  2013-03-09 16:42 ` [PATCH 1/6] libceph: use local variables for message positions Alex Elder
  2013-03-09 16:42 ` [PATCH 2/6] libceph: consolidate message prep code Alex Elder
@ 2013-03-09 16:43 ` Alex Elder
  2013-03-09 16:43 ` [PATCH 4/6] libceph: encapsulate reading message data Alex Elder
                   ` (3 subsequent siblings)
  6 siblings, 0 replies; 10+ messages in thread
From: Alex Elder @ 2013-03-09 16:43 UTC (permalink / raw)
  To: ceph-devel

Define local variables page_offset and length to represent the range
of bytes within a page that will be sent by ceph_tcp_sendpage() in
write_partial_msg_pages().

Signed-off-by: Alex Elder <elder@inktank.com>
---
 net/ceph/messenger.c |   17 +++++++++--------
 1 file changed, 9 insertions(+), 8 deletions(-)

diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index e8fa449..813c299 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -1081,7 +1081,6 @@ static int write_partial_msg_pages(struct
ceph_connection *con)
 	struct ceph_msg *msg = con->out_msg;
 	struct ceph_msg_pos *msg_pos = &con->out_msg_pos;
 	unsigned int data_len = le32_to_cpu(msg->hdr.data_len);
-	size_t len;
 	bool do_datacrc = !con->msgr->nocrc;
 	int ret;
 	int total_max_write;
@@ -1102,6 +1101,8 @@ static int write_partial_msg_pages(struct
ceph_connection *con)
 	 */
 	while (data_len > msg_pos->data_pos) {
 		struct page *page = NULL;
+		size_t page_offset;
+		size_t length;
 		int max_write = PAGE_SIZE;
 		int bio_offset = 0;

@@ -1131,9 +1132,10 @@ static int write_partial_msg_pages(struct
ceph_connection *con)
 		} else {
 			page = zero_page;
 		}
-		len = min_t(int, max_write - msg_pos->page_pos,
+		length = min_t(int, max_write - msg_pos->page_pos,
 			    total_max_write);

+		page_offset = msg_pos->page_pos + bio_offset;
 		if (do_datacrc && !msg_pos->did_page_crc) {
 			void *base;
 			u32 crc = le32_to_cpu(msg->footer.data_crc);
@@ -1141,19 +1143,18 @@ static int write_partial_msg_pages(struct
ceph_connection *con)

 			kaddr = kmap(page);
 			BUG_ON(kaddr == NULL);
-			base = kaddr + msg_pos->page_pos + bio_offset;
-			crc = crc32c(crc, base, len);
+			base = kaddr + page_offset;
+			crc = crc32c(crc, base, length);
 			kunmap(page);
 			msg->footer.data_crc = cpu_to_le32(crc);
 			msg_pos->did_page_crc = true;
 		}
-		ret = ceph_tcp_sendpage(con->sock, page,
-				      msg_pos->page_pos + bio_offset,
-				      len, true);
+		ret = ceph_tcp_sendpage(con->sock, page, page_offset,
+				      length, true);
 		if (ret <= 0)
 			goto out;

-		out_msg_pos_next(con, page, len, (size_t) ret, in_trail);
+		out_msg_pos_next(con, page, length, (size_t) ret, in_trail);
 	}

 	dout("write_partial_msg_pages %p msg %p done\n", con, msg);
-- 
1.7.9.5


^ permalink raw reply related	[flat|nested] 10+ messages in thread

* [PATCH 4/6] libceph: encapsulate reading message data
  2013-03-09 16:41 [PATCH 0/6] libceph: refactor messenger for multiple data sources Alex Elder
                   ` (2 preceding siblings ...)
  2013-03-09 16:43 ` [PATCH 3/6] libceph: small write_partial_msg_pages() refactor Alex Elder
@ 2013-03-09 16:43 ` Alex Elder
  2013-03-10  0:38   ` [PATCH 4/6, v2] " Alex Elder
  2013-03-10 16:33   ` [PATCH 4/6, v3] " Alex Elder
  2013-03-09 16:43 ` [PATCH 5/6] libceph: define and use ceph_tcp_recvpage() Alex Elder
                   ` (2 subsequent siblings)
  6 siblings, 2 replies; 10+ messages in thread
From: Alex Elder @ 2013-03-09 16:43 UTC (permalink / raw)
  To: ceph-devel

Pull the code that reads the data portion into a message into
a separate function read_partial_msg_data().

Rename write_partial_msg_pages() to be write_partial_message_data()
to match its read counterpart, and to reflect its more generic
purpose.

Signed-off-by: Alex Elder <elder@inktank.com>
---
 net/ceph/messenger.c |   68
+++++++++++++++++++++++++++++++-------------------
 1 file changed, 42 insertions(+), 26 deletions(-)

diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index 813c299..91f577a 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -1076,7 +1076,7 @@ static void in_msg_pos_next(struct ceph_connection
*con, size_t len,
  *  0 -> socket full, but more to do
  * <0 -> error
  */
-static int write_partial_msg_pages(struct ceph_connection *con)
+static int write_partial_message_data(struct ceph_connection *con)
 {
 	struct ceph_msg *msg = con->out_msg;
 	struct ceph_msg_pos *msg_pos = &con->out_msg_pos;
@@ -1088,7 +1088,7 @@ static int write_partial_msg_pages(struct
ceph_connection *con)
 	const size_t trail_len = (msg->trail ? msg->trail->length : 0);
 	const size_t trail_off = data_len - trail_len;

-	dout("write_partial_msg_pages %p msg %p page %d offset %d\n",
+	dout("%s %p msg %p page %d offset %d\n", __func__,
 	     con, msg, msg_pos->page, msg_pos->page_pos);

 	/*
@@ -1157,7 +1157,7 @@ static int write_partial_msg_pages(struct
ceph_connection *con)
 		out_msg_pos_next(con, page, length, (size_t) ret, in_trail);
 	}

-	dout("write_partial_msg_pages %p msg %p done\n", con, msg);
+	dout("%s %p msg %p done\n", __func__, con, msg);

 	/* prepare and queue up footer, too */
 	if (!do_datacrc)
@@ -1869,13 +1869,45 @@ static int read_partial_message_bio(struct
ceph_connection *con,
 }
 #endif

+static int read_partial_msg_data(struct ceph_connection *con)
+{
+	struct ceph_msg *msg = con->in_msg;
+	struct ceph_msg_pos *msg_pos = &con->in_msg_pos;
+	const bool do_datacrc = !con->msgr->nocrc;
+	unsigned int data_len;
+	int ret = 0;
+
+	BUG_ON(!msg);
+
+	data_len = le32_to_cpu(con->in_hdr.data_len);
+	prepare_message_data(msg, msg_pos);
+	while (msg_pos->data_pos < data_len) {
+		if (msg->pages) {
+			ret = read_partial_message_pages(con, msg->pages,
+						 data_len, do_datacrc);
+			if (ret <= 0)
+				return ret;
+#ifdef CONFIG_BLOCK
+		} else if (msg->bio) {
+			ret = read_partial_message_bio(con,
+						 data_len, do_datacrc);
+			if (ret <= 0)
+				return ret;
+#endif
+		} else {
+			BUG_ON(1);
+		}
+	}
+
+	return ret;
+}
+
 /*
  * read (part of) a message.
  */
 static int read_partial_message(struct ceph_connection *con)
 {
 	struct ceph_msg *m = con->in_msg;
-	struct ceph_msg_pos *msg_pos = &con->in_msg_pos;
 	int size;
 	int end;
 	int ret;
@@ -1956,10 +1988,6 @@ static int read_partial_message(struct
ceph_connection *con)
 		if (m->middle)
 			m->middle->vec.iov_len = 0;

-		/* prepare for data payload, if any */
-
-		if (data_len)
-			prepare_message_data(con->in_msg, &con->in_msg_pos);
 	}

 	/* front */
@@ -1978,22 +2006,10 @@ static int read_partial_message(struct
ceph_connection *con)
 	}

 	/* (page) data */
-	while (msg_pos->data_pos < data_len) {
-		if (m->pages) {
-			ret = read_partial_message_pages(con, m->pages,
-						 data_len, do_datacrc);
-			if (ret <= 0)
-				return ret;
-#ifdef CONFIG_BLOCK
-		} else if (m->bio) {
-			ret = read_partial_message_bio(con,
-						 data_len, do_datacrc);
-			if (ret <= 0)
-				return ret;
-#endif
-		} else {
-			BUG_ON(1);
-		}
+	if (m->hdr.data_len) {
+		ret = read_partial_msg_data(con);
+		if (ret <= 0)
+			return ret;
 	}

 	/* footer */
@@ -2119,13 +2135,13 @@ more_kvec:
 			goto do_next;
 		}

-		ret = write_partial_msg_pages(con);
+		ret = write_partial_message_data(con);
 		if (ret == 1)
 			goto more_kvec;  /* we need to send the footer, too! */
 		if (ret == 0)
 			goto out;
 		if (ret < 0) {
-			dout("try_write write_partial_msg_pages err %d\n",
+			dout("try_write write_partial_message_data err %d\n",
 			     ret);
 			goto out;
 		}
-- 
1.7.9.5


^ permalink raw reply related	[flat|nested] 10+ messages in thread

* [PATCH 5/6] libceph: define and use ceph_tcp_recvpage()
  2013-03-09 16:41 [PATCH 0/6] libceph: refactor messenger for multiple data sources Alex Elder
                   ` (3 preceding siblings ...)
  2013-03-09 16:43 ` [PATCH 4/6] libceph: encapsulate reading message data Alex Elder
@ 2013-03-09 16:43 ` Alex Elder
  2013-03-09 16:43 ` [PATCH 6/6] libceph: define and use ceph_crc32c_page() Alex Elder
  2013-03-11 21:01 ` [PATCH 0/6] libceph: refactor messenger for multiple data sources Josh Durgin
  6 siblings, 0 replies; 10+ messages in thread
From: Alex Elder @ 2013-03-09 16:43 UTC (permalink / raw)
  To: ceph-devel

Define a new function ceph_tcp_recvpage() that behaves in a way
comparable to ceph_tcp_sendpage().

Rearrange the code in both read_partial_message_pages() and
read_partial_message_bio() so they have matching structure,
(similar to what's in write_partial_msg_pages()), and use
this new function.

Signed-off-by: Alex Elder <elder@inktank.com>
---
 net/ceph/messenger.c |   86
+++++++++++++++++++++++++++++++++++---------------
 1 file changed, 60 insertions(+), 26 deletions(-)

diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index 91f577a..7df7941 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -471,6 +471,22 @@ static int ceph_tcp_recvmsg(struct socket *sock,
void *buf, size_t len)
 	return r;
 }

+static int ceph_tcp_recvpage(struct socket *sock, struct page *page,
+		     int page_offset, size_t length)
+{
+	void *kaddr;
+	int ret;
+
+	BUG_ON(page_offset + length > PAGE_SIZE);
+
+	kaddr = kmap(page);
+	BUG_ON(!kaddr);
+	ret = ceph_tcp_recvmsg(sock, kaddr + page_offset, length);
+	kunmap(page);
+
+	return ret;
+}
+
 /*
  * write something.  @more is true if caller will be sending more data
  * shortly.
@@ -1809,26 +1825,36 @@ static int read_partial_message_pages(struct
ceph_connection *con,
 {
 	struct ceph_msg_pos *msg_pos = &con->in_msg_pos;
 	struct page *page;
-	void *p;
+	size_t page_offset;
+	size_t length;
+	unsigned int left;
 	int ret;
-	int left;

-	left = min((int)(data_len - msg_pos->data_pos),
-		   (int)(PAGE_SIZE - msg_pos->page_pos));
 	/* (page) data */
 	BUG_ON(pages == NULL);
 	page = pages[msg_pos->page];
-	p = kmap(page);
-	ret = ceph_tcp_recvmsg(con->sock, p + msg_pos->page_pos, left);
-	if (ret > 0 && do_datacrc)
-		con->in_data_crc =
-			crc32c(con->in_data_crc,
-				  p + msg_pos->page_pos, ret);
-	kunmap(page);
+	page_offset = msg_pos->page_pos;
+	BUG_ON(msg_pos->data_pos >= data_len);
+	left = data_len - msg_pos->data_pos;
+	BUG_ON(page_offset >= PAGE_SIZE);
+	length = min_t(unsigned int, PAGE_SIZE - page_offset, left);
+
+	ret = ceph_tcp_recvpage(con->sock, page, page_offset, length);
 	if (ret <= 0)
 		return ret;

-	in_msg_pos_next(con, left, ret);
+	if (do_datacrc) {
+		void *kaddr;
+		void *base;
+
+		kaddr = kmap(page);
+		BUG_ON(!kaddr);
+		base = kaddr + page_offset;
+		con->in_data_crc = crc32c(con->in_data_crc, base, ret);
+		kunmap(page);
+	}
+
+	in_msg_pos_next(con, length, ret);

 	return ret;
 }
@@ -1841,29 +1867,37 @@ static int read_partial_message_bio(struct
ceph_connection *con,
 	struct ceph_msg_pos *msg_pos = &con->in_msg_pos;
 	struct bio_vec *bv;
 	struct page *page;
-	void *p;
-	int ret, left;
+	size_t page_offset;
+	size_t length;
+	unsigned int left;
+	int ret;

 	BUG_ON(!msg);
 	BUG_ON(!msg->bio_iter);
 	bv = bio_iovec_idx(msg->bio_iter, msg->bio_seg);
-
-	left = min((int)(data_len - msg_pos->data_pos),
-		   (int)(bv->bv_len - msg_pos->page_pos));
-
 	page = bv->bv_page;
-	p = kmap(page) + bv->bv_offset;
+	page_offset = bv->bv_offset + msg_pos->page_pos;
+	BUG_ON(msg_pos->data_pos >= data_len);
+	left = data_len - msg_pos->data_pos;
+	BUG_ON(msg_pos->page_pos >= bv->bv_len);
+	length = min_t(unsigned int, bv->bv_len - msg_pos->page_pos, left);

-	ret = ceph_tcp_recvmsg(con->sock, p + msg_pos->page_pos, left);
-	if (ret > 0 && do_datacrc)
-		con->in_data_crc =
-			crc32c(con->in_data_crc,
-				  p + msg_pos->page_pos, ret);
-	kunmap(page);
+	ret = ceph_tcp_recvpage(con->sock, page, page_offset, length);
 	if (ret <= 0)
 		return ret;

-	in_msg_pos_next(con, left, ret);
+	if (do_datacrc) {
+		void *kaddr;
+		void *base;
+
+		kaddr = kmap(page);
+		BUG_ON(!kaddr);
+		base = kaddr + page_offset;
+		con->in_data_crc = crc32c(con->in_data_crc, base, ret);
+		kunmap(page);
+	}
+
+	in_msg_pos_next(con, length, ret);

 	return ret;
 }
-- 
1.7.9.5


^ permalink raw reply related	[flat|nested] 10+ messages in thread

* [PATCH 6/6] libceph: define and use ceph_crc32c_page()
  2013-03-09 16:41 [PATCH 0/6] libceph: refactor messenger for multiple data sources Alex Elder
                   ` (4 preceding siblings ...)
  2013-03-09 16:43 ` [PATCH 5/6] libceph: define and use ceph_tcp_recvpage() Alex Elder
@ 2013-03-09 16:43 ` Alex Elder
  2013-03-11 21:01 ` [PATCH 0/6] libceph: refactor messenger for multiple data sources Josh Durgin
  6 siblings, 0 replies; 10+ messages in thread
From: Alex Elder @ 2013-03-09 16:43 UTC (permalink / raw)
  To: ceph-devel

Factor out a common block of code that updates a CRC calculation
over a range of data in a page.

This and the preceding patches are related to:
    http://tracker.ceph.com/issues/4403

Signed-off-by: Alex Elder <elder@inktank.com>
---
 net/ceph/messenger.c |   47 ++++++++++++++++++++---------------------------
 1 file changed, 20 insertions(+), 27 deletions(-)

diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index 7df7941..e677fd3 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -1085,6 +1085,19 @@ static void in_msg_pos_next(struct
ceph_connection *con, size_t len,
 #endif /* CONFIG_BLOCK */
 }

+static u32 ceph_crc32c_page(u32 crc, struct page *page,
+				unsigned int page_offset,
+				unsigned int length)
+{
+	char *kaddr;
+
+	kaddr = kmap(page);
+	BUG_ON(kaddr == NULL);
+	crc = crc32c(crc, kaddr + page_offset, length);
+	kunmap(page);
+
+	return crc;
+}
 /*
  * Write as much message data payload as we can.  If we finish, queue
  * up the footer.
@@ -1153,15 +1166,9 @@ static int write_partial_message_data(struct
ceph_connection *con)

 		page_offset = msg_pos->page_pos + bio_offset;
 		if (do_datacrc && !msg_pos->did_page_crc) {
-			void *base;
 			u32 crc = le32_to_cpu(msg->footer.data_crc);
-			char *kaddr;

-			kaddr = kmap(page);
-			BUG_ON(kaddr == NULL);
-			base = kaddr + page_offset;
-			crc = crc32c(crc, base, length);
-			kunmap(page);
+			crc = ceph_crc32c_page(crc, page, page_offset, length);
 			msg->footer.data_crc = cpu_to_le32(crc);
 			msg_pos->did_page_crc = true;
 		}
@@ -1843,16 +1850,9 @@ static int read_partial_message_pages(struct
ceph_connection *con,
 	if (ret <= 0)
 		return ret;

-	if (do_datacrc) {
-		void *kaddr;
-		void *base;
-
-		kaddr = kmap(page);
-		BUG_ON(!kaddr);
-		base = kaddr + page_offset;
-		con->in_data_crc = crc32c(con->in_data_crc, base, ret);
-		kunmap(page);
-	}
+	if (do_datacrc)
+		con->in_data_crc = ceph_crc32c_page(con->in_data_crc, page,
+							page_offset, ret);

 	in_msg_pos_next(con, length, ret);

@@ -1886,16 +1886,9 @@ static int read_partial_message_bio(struct
ceph_connection *con,
 	if (ret <= 0)
 		return ret;

-	if (do_datacrc) {
-		void *kaddr;
-		void *base;
-
-		kaddr = kmap(page);
-		BUG_ON(!kaddr);
-		base = kaddr + page_offset;
-		con->in_data_crc = crc32c(con->in_data_crc, base, ret);
-		kunmap(page);
-	}
+	if (do_datacrc)
+		con->in_data_crc = ceph_crc32c_page(con->in_data_crc, page,
+							page_offset, ret);

 	in_msg_pos_next(con, length, ret);

-- 
1.7.9.5


^ permalink raw reply related	[flat|nested] 10+ messages in thread

* [PATCH 4/6, v2] libceph: encapsulate reading message data
  2013-03-09 16:43 ` [PATCH 4/6] libceph: encapsulate reading message data Alex Elder
@ 2013-03-10  0:38   ` Alex Elder
  2013-03-10 16:33   ` [PATCH 4/6, v3] " Alex Elder
  1 sibling, 0 replies; 10+ messages in thread
From: Alex Elder @ 2013-03-10  0:38 UTC (permalink / raw)
  To: ceph-devel

The previously posted edition of this patch erroneously moved the
initialization of the message position information so it happened
every time the message was read rather than just when the message
was first initialized.  This one no longer does that.

Pull the code that reads the data portion into a message into
a separate function read_partial_msg_data().

Rename write_partial_msg_pages() to be write_partial_message_data()
to match its read counterpart, and to reflect its more generic
purpose.

Signed-off-by: Alex Elder <elder@inktank.com>
---
v2: do not move the call to prepare_message_data()

 net/ceph/messenger.c |   63
++++++++++++++++++++++++++++++++------------------
 1 file changed, 41 insertions(+), 22 deletions(-)

diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index 813c299..8415896 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -1076,7 +1076,7 @@ static void in_msg_pos_next(struct ceph_connection
*con, size_t len,
  *  0 -> socket full, but more to do
  * <0 -> error
  */
-static int write_partial_msg_pages(struct ceph_connection *con)
+static int write_partial_message_data(struct ceph_connection *con)
 {
 	struct ceph_msg *msg = con->out_msg;
 	struct ceph_msg_pos *msg_pos = &con->out_msg_pos;
@@ -1088,7 +1088,7 @@ static int write_partial_msg_pages(struct
ceph_connection *con)
 	const size_t trail_len = (msg->trail ? msg->trail->length : 0);
 	const size_t trail_off = data_len - trail_len;

-	dout("write_partial_msg_pages %p msg %p page %d offset %d\n",
+	dout("%s %p msg %p page %d offset %d\n", __func__,
 	     con, msg, msg_pos->page, msg_pos->page_pos);

 	/*
@@ -1157,7 +1157,7 @@ static int write_partial_msg_pages(struct
ceph_connection *con)
 		out_msg_pos_next(con, page, length, (size_t) ret, in_trail);
 	}

-	dout("write_partial_msg_pages %p msg %p done\n", con, msg);
+	dout("%s %p msg %p done\n", __func__, con, msg);

 	/* prepare and queue up footer, too */
 	if (!do_datacrc)
@@ -1869,13 +1869,44 @@ static int read_partial_message_bio(struct
ceph_connection *con,
 }
 #endif

+static int read_partial_msg_data(struct ceph_connection *con)
+{
+	struct ceph_msg *msg = con->in_msg;
+	struct ceph_msg_pos *msg_pos = &con->in_msg_pos;
+	const bool do_datacrc = !con->msgr->nocrc;
+	unsigned int data_len;
+	int ret = 0;
+
+	BUG_ON(!msg);
+
+	data_len = le32_to_cpu(con->in_hdr.data_len);
+	while (msg_pos->data_pos < data_len) {
+		if (msg->pages) {
+			ret = read_partial_message_pages(con, msg->pages,
+						 data_len, do_datacrc);
+			if (ret <= 0)
+				return ret;
+#ifdef CONFIG_BLOCK
+		} else if (msg->bio) {
+			ret = read_partial_message_bio(con,
+						 data_len, do_datacrc);
+			if (ret <= 0)
+				return ret;
+#endif
+		} else {
+			BUG_ON(1);
+		}
+	}
+
+	return ret;
+}
+
 /*
  * read (part of) a message.
  */
 static int read_partial_message(struct ceph_connection *con)
 {
 	struct ceph_msg *m = con->in_msg;
-	struct ceph_msg_pos *msg_pos = &con->in_msg_pos;
 	int size;
 	int end;
 	int ret;
@@ -1978,22 +2009,10 @@ static int read_partial_message(struct
ceph_connection *con)
 	}

 	/* (page) data */
-	while (msg_pos->data_pos < data_len) {
-		if (m->pages) {
-			ret = read_partial_message_pages(con, m->pages,
-						 data_len, do_datacrc);
-			if (ret <= 0)
-				return ret;
-#ifdef CONFIG_BLOCK
-		} else if (m->bio) {
-			ret = read_partial_message_bio(con,
-						 data_len, do_datacrc);
-			if (ret <= 0)
-				return ret;
-#endif
-		} else {
-			BUG_ON(1);
-		}
+	if (data_len) {
+		ret = read_partial_msg_data(con);
+		if (ret <= 0)
+			return ret;
 	}

 	/* footer */
@@ -2119,13 +2138,13 @@ more_kvec:
 			goto do_next;
 		}

-		ret = write_partial_msg_pages(con);
+		ret = write_partial_message_data(con);
 		if (ret == 1)
 			goto more_kvec;  /* we need to send the footer, too! */
 		if (ret == 0)
 			goto out;
 		if (ret < 0) {
-			dout("try_write write_partial_msg_pages err %d\n",
+			dout("try_write write_partial_message_data err %d\n",
 			     ret);
 			goto out;
 		}
-- 
1.7.9.5


^ permalink raw reply related	[flat|nested] 10+ messages in thread

* [PATCH 4/6, v3] libceph: encapsulate reading message data
  2013-03-09 16:43 ` [PATCH 4/6] libceph: encapsulate reading message data Alex Elder
  2013-03-10  0:38   ` [PATCH 4/6, v2] " Alex Elder
@ 2013-03-10 16:33   ` Alex Elder
  1 sibling, 0 replies; 10+ messages in thread
From: Alex Elder @ 2013-03-10 16:33 UTC (permalink / raw)
  To: ceph-devel

Found another problem with this one.  In order to indicate
that the it is successful, read_partial_msg_data() needs to
return a positive value (even if no bytes were read because
the entire data portion of a message has been read).  So
this latest version has it return 1 in that case rather than
zero.  The last update--initializing the message position
information--is still in place here.  I think this will be
the last one (though I probably just jinxed that.)


Pull the code that reads the data portion into a message into
a separate function read_partial_msg_data().

Rename write_partial_msg_pages() to be write_partial_message_data()
to match its read counterpart, and to reflect its more generic
purpose.

Signed-off-by: Alex Elder <elder@inktank.com>
---
v3: read_partial_msg_data() returns positive for success
v2: do not move the call to prepare_message_data()

 net/ceph/messenger.c |   63
++++++++++++++++++++++++++++++++------------------
 1 file changed, 41 insertions(+), 22 deletions(-)

diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index 813c299..6e0bd36 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -1076,7 +1076,7 @@ static void in_msg_pos_next(struct ceph_connection
*con, size_t len,
  *  0 -> socket full, but more to do
  * <0 -> error
  */
-static int write_partial_msg_pages(struct ceph_connection *con)
+static int write_partial_message_data(struct ceph_connection *con)
 {
 	struct ceph_msg *msg = con->out_msg;
 	struct ceph_msg_pos *msg_pos = &con->out_msg_pos;
@@ -1088,7 +1088,7 @@ static int write_partial_msg_pages(struct
ceph_connection *con)
 	const size_t trail_len = (msg->trail ? msg->trail->length : 0);
 	const size_t trail_off = data_len - trail_len;

-	dout("write_partial_msg_pages %p msg %p page %d offset %d\n",
+	dout("%s %p msg %p page %d offset %d\n", __func__,
 	     con, msg, msg_pos->page, msg_pos->page_pos);

 	/*
@@ -1157,7 +1157,7 @@ static int write_partial_msg_pages(struct
ceph_connection *con)
 		out_msg_pos_next(con, page, length, (size_t) ret, in_trail);
 	}

-	dout("write_partial_msg_pages %p msg %p done\n", con, msg);
+	dout("%s %p msg %p done\n", __func__, con, msg);

 	/* prepare and queue up footer, too */
 	if (!do_datacrc)
@@ -1869,13 +1869,44 @@ static int read_partial_message_bio(struct
ceph_connection *con,
 }
 #endif

+static int read_partial_msg_data(struct ceph_connection *con)
+{
+	struct ceph_msg *msg = con->in_msg;
+	struct ceph_msg_pos *msg_pos = &con->in_msg_pos;
+	const bool do_datacrc = !con->msgr->nocrc;
+	unsigned int data_len;
+	int ret;
+
+	BUG_ON(!msg);
+
+	data_len = le32_to_cpu(con->in_hdr.data_len);
+	while (msg_pos->data_pos < data_len) {
+		if (msg->pages) {
+			ret = read_partial_message_pages(con, msg->pages,
+						 data_len, do_datacrc);
+			if (ret <= 0)
+				return ret;
+#ifdef CONFIG_BLOCK
+		} else if (msg->bio) {
+			ret = read_partial_message_bio(con,
+						 data_len, do_datacrc);
+			if (ret <= 0)
+				return ret;
+#endif
+		} else {
+			BUG_ON(1);
+		}
+	}
+
+	return 1;	/* must return > 0 to indicate success */
+}
+
 /*
  * read (part of) a message.
  */
 static int read_partial_message(struct ceph_connection *con)
 {
 	struct ceph_msg *m = con->in_msg;
-	struct ceph_msg_pos *msg_pos = &con->in_msg_pos;
 	int size;
 	int end;
 	int ret;
@@ -1978,22 +2009,10 @@ static int read_partial_message(struct
ceph_connection *con)
 	}

 	/* (page) data */
-	while (msg_pos->data_pos < data_len) {
-		if (m->pages) {
-			ret = read_partial_message_pages(con, m->pages,
-						 data_len, do_datacrc);
-			if (ret <= 0)
-				return ret;
-#ifdef CONFIG_BLOCK
-		} else if (m->bio) {
-			ret = read_partial_message_bio(con,
-						 data_len, do_datacrc);
-			if (ret <= 0)
-				return ret;
-#endif
-		} else {
-			BUG_ON(1);
-		}
+	if (data_len) {
+		ret = read_partial_msg_data(con);
+		if (ret <= 0)
+			return ret;
 	}

 	/* footer */
@@ -2119,13 +2138,13 @@ more_kvec:
 			goto do_next;
 		}

-		ret = write_partial_msg_pages(con);
+		ret = write_partial_message_data(con);
 		if (ret == 1)
 			goto more_kvec;  /* we need to send the footer, too! */
 		if (ret == 0)
 			goto out;
 		if (ret < 0) {
-			dout("try_write write_partial_msg_pages err %d\n",
+			dout("try_write write_partial_message_data err %d\n",
 			     ret);
 			goto out;
 		}
-- 
1.7.9.5


^ permalink raw reply related	[flat|nested] 10+ messages in thread

* Re: [PATCH 0/6] libceph: refactor messenger for multiple data sources
  2013-03-09 16:41 [PATCH 0/6] libceph: refactor messenger for multiple data sources Alex Elder
                   ` (5 preceding siblings ...)
  2013-03-09 16:43 ` [PATCH 6/6] libceph: define and use ceph_crc32c_page() Alex Elder
@ 2013-03-11 21:01 ` Josh Durgin
  6 siblings, 0 replies; 10+ messages in thread
From: Josh Durgin @ 2013-03-11 21:01 UTC (permalink / raw)
  To: Alex Elder; +Cc: ceph-devel

On 03/09/2013 08:41 AM, Alex Elder wrote:
> This series factors out blocks of common code and generally
> aims to simplify and make more consistent the way the ceph
> messenger processes the data portions of both incoming and
> outgoing messages.
>
> They can be found in the "review/wip-msgr-refactor" branch of
> the ceph-client git repository.  (That branch is based on
> branch "review/wip-abstract-2").
>
> 					-Alex
>
> [PATCH 1/6] libceph: use local variables for message positions
> [PATCH 2/6] libceph: consolidate message prep code
> [PATCH 3/6] libceph: small write_partial_msg_pages() refactor
> [PATCH 4/6] libceph: encapsulate reading message data
> [PATCH 5/6] libceph: define and use ceph_tcp_recvpage()
> [PATCH 6/6] libceph: define and use ceph_crc32c_page()

These all look good.
Reviewed-by: Josh Durgin <josh.durgin@inktank.com>


^ permalink raw reply	[flat|nested] 10+ messages in thread

end of thread, other threads:[~2013-03-11 21:02 UTC | newest]

Thread overview: 10+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2013-03-09 16:41 [PATCH 0/6] libceph: refactor messenger for multiple data sources Alex Elder
2013-03-09 16:42 ` [PATCH 1/6] libceph: use local variables for message positions Alex Elder
2013-03-09 16:42 ` [PATCH 2/6] libceph: consolidate message prep code Alex Elder
2013-03-09 16:43 ` [PATCH 3/6] libceph: small write_partial_msg_pages() refactor Alex Elder
2013-03-09 16:43 ` [PATCH 4/6] libceph: encapsulate reading message data Alex Elder
2013-03-10  0:38   ` [PATCH 4/6, v2] " Alex Elder
2013-03-10 16:33   ` [PATCH 4/6, v3] " Alex Elder
2013-03-09 16:43 ` [PATCH 5/6] libceph: define and use ceph_tcp_recvpage() Alex Elder
2013-03-09 16:43 ` [PATCH 6/6] libceph: define and use ceph_crc32c_page() Alex Elder
2013-03-11 21:01 ` [PATCH 0/6] libceph: refactor messenger for multiple data sources Josh Durgin

This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.