CEPH-Devel Archive on lore.kernel.org
 help / color / Atom feed
* [PATCH v3 00/10] ceph-rbd: ceph RADOS block device
@ 2010-07-07 22:11 Yehuda Sadeh
  2010-07-07 22:11 ` [PATCH v3 01/10] ceph-rbd: lookup pool in osdmap by name Yehuda Sadeh
                   ` (9 more replies)
  0 siblings, 10 replies; 11+ messages in thread
From: Yehuda Sadeh @ 2010-07-07 22:11 UTC (permalink / raw)
  To: linux-kernel; +Cc: ceph-devel, linux-fsdevel, sage, Yehuda Sadeh

The following is our latest version of the rados block device. Any comments,
questions, suggestions are more than welcome. We would like to have that ready
for inclusion for the next window and it'd be nice having it reviewed by
someone more familiar with the block/storage layer. Note that the actual block
device implementation that we'd happy to get review for is located in the last
patch of the series. All the preceding patches just build up the groundwork,
and implement the infrastructure for it.

RBD allows striping of data across rados, the ceph distributed block layer, and
is binary compatible with Christian Brunner's kvm-rbd implementation. Similarly
to the Sheepdog project, it stripes the block device across 4MB (or other
configurable size) objects stored by the distributed object store, and enjoys
all the rados features (high availability, scalability, snapshots, etc.)

A use case for this device would be to have some kind of a local file system
created on it, and use it with conjuction of kvm to do migration and other
related stuff.  Another option would be to use it as data devices for other
distributed file systems (e.g., ocfs2, gfs2).

The device driver is is based on osdblk. Currently, it resides under the
fs/ceph tree and does not exist as a separate module of its own, such that the
ceph.ko module contains both the ceph file system and the rbd block device.
Another option would be to have it as a separate module that resides under
drivers/block, however, it would still depend on the ceph.ko module.

This version of the patch series includes many bug fixes. The major change is
that it moves the snapshot administration functionality (e.g., snapshot
creation, listing) to the osd side, and takes advantage of the osd extensible
functionality. Also, snapshots can now be rolled back to a specific version.


Yehuda
---

Yehuda Sadeh (10):
  ceph-rbd: lookup pool in osdmap by name
  ceph-rbd: refactor osdc requests creation functions
  ceph-rbd: messenger and osdc changes for rbd
  ceph-rbd: enable creation of clients that don't need mds
  ceph-rbd: refactor mount related functions, add helpers
  ceph-rbd: generalize mon requests, add pool op support
  ceph-rbd: some super.c changes for rbd
  ceph-rbd: osdc support for osd call and rollback operations
  ceph-rbd: sync common header and source files with server version
  ceph-rbd: rados block device

 fs/ceph/Kconfig        |   10 +
 fs/ceph/Makefile       |    2 +-
 fs/ceph/README         |    1 +
 fs/ceph/ceph_fs.c      |   50 +-
 fs/ceph/ceph_fs.h      |    8 +-
 fs/ceph/ceph_strings.c |    1 +
 fs/ceph/debugfs.c      |   11 +-
 fs/ceph/decode.h       |    5 +
 fs/ceph/file.c         |   46 ++
 fs/ceph/messenger.c    |  215 +++++-
 fs/ceph/messenger.h    |    4 +
 fs/ceph/mon_client.c   |  173 ++++-
 fs/ceph/mon_client.h   |    5 +
 fs/ceph/osd_client.c   |  353 ++++++++--
 fs/ceph/osd_client.h   |   66 ++
 fs/ceph/osdmap.c       |   13 +
 fs/ceph/osdmap.h       |    2 +
 fs/ceph/pagelist.c     |    2 +-
 fs/ceph/pagelist.h     |    2 +-
 fs/ceph/rados.h        |    8 +
 fs/ceph/rbd.c          | 1835 ++++++++++++++++++++++++++++++++++++++++++++++++
 fs/ceph/rbd.h          |    8 +
 fs/ceph/rbd_types.h    |   71 ++
 fs/ceph/super.c        |  194 ++++--
 fs/ceph/super.h        |   36 +-
 25 files changed, 2918 insertions(+), 203 deletions(-)
 create mode 100644 fs/ceph/rbd.c
 create mode 100644 fs/ceph/rbd.h
 create mode 100644 fs/ceph/rbd_types.h


^ permalink raw reply	[flat|nested] 11+ messages in thread

* [PATCH v3 01/10] ceph-rbd: lookup pool in osdmap by name
  2010-07-07 22:11 [PATCH v3 00/10] ceph-rbd: ceph RADOS block device Yehuda Sadeh
@ 2010-07-07 22:11 ` Yehuda Sadeh
  2010-07-07 22:11 ` [PATCH v3 02/10] ceph-rbd: refactor osdc requests creation functions Yehuda Sadeh
                   ` (8 subsequent siblings)
  9 siblings, 0 replies; 11+ messages in thread
From: Yehuda Sadeh @ 2010-07-07 22:11 UTC (permalink / raw)
  To: linux-kernel; +Cc: ceph-devel, linux-fsdevel, sage, Yehuda Sadeh

Signed-off-by: Yehuda Sadeh <yehuda@hq.newdream.net>
Signed-off-by: Sage Weil <sage@newdream.net>
---
 fs/ceph/osdmap.c |   13 +++++++++++++
 fs/ceph/osdmap.h |    2 ++
 2 files changed, 15 insertions(+), 0 deletions(-)

diff --git a/fs/ceph/osdmap.c b/fs/ceph/osdmap.c
index ddc656f..11aac1c 100644
--- a/fs/ceph/osdmap.c
+++ b/fs/ceph/osdmap.c
@@ -417,6 +417,19 @@ static struct ceph_pg_pool_info *__lookup_pg_pool(struct rb_root *root, int id)
 	return NULL;
 }
 
+int ceph_pg_poolid_by_name(struct ceph_osdmap *map, const char *name)
+{
+	struct rb_node *rbp;
+
+	for (rbp = rb_first(&map->pg_pools); rbp; rbp = rb_next(rbp)) {
+		struct ceph_pg_pool_info *pi =
+			rb_entry(rbp, struct ceph_pg_pool_info, node);
+		if (pi->name && strcmp(pi->name, name) == 0)
+			return pi->id;
+	}
+	return -ENOENT;
+}
+
 static void __remove_pg_pool(struct rb_root *root, struct ceph_pg_pool_info *pi)
 {
 	rb_erase(&pi->node, root);
diff --git a/fs/ceph/osdmap.h b/fs/ceph/osdmap.h
index 970b547..a592b21 100644
--- a/fs/ceph/osdmap.h
+++ b/fs/ceph/osdmap.h
@@ -125,4 +125,6 @@ extern int ceph_calc_pg_acting(struct ceph_osdmap *osdmap, struct ceph_pg pgid,
 extern int ceph_calc_pg_primary(struct ceph_osdmap *osdmap,
 				struct ceph_pg pgid);
 
+extern int ceph_pg_poolid_by_name(struct ceph_osdmap *map, const char *name);
+
 #endif
-- 
1.5.6.5

^ permalink raw reply	[flat|nested] 11+ messages in thread

* [PATCH v3 02/10] ceph-rbd: refactor osdc requests creation functions
  2010-07-07 22:11 [PATCH v3 00/10] ceph-rbd: ceph RADOS block device Yehuda Sadeh
  2010-07-07 22:11 ` [PATCH v3 01/10] ceph-rbd: lookup pool in osdmap by name Yehuda Sadeh
@ 2010-07-07 22:11 ` Yehuda Sadeh
  2010-07-07 22:11 ` [PATCH v3 03/10] ceph-rbd: messenger and osdc changes for rbd Yehuda Sadeh
                   ` (7 subsequent siblings)
  9 siblings, 0 replies; 11+ messages in thread
From: Yehuda Sadeh @ 2010-07-07 22:11 UTC (permalink / raw)
  To: linux-kernel; +Cc: ceph-devel, linux-fsdevel, sage, Yehuda Sadeh

The osd requests creation are being decoupled from the
vino parameter, allowing clients using the osd to use
other arbitrary object names that are not necessarily
vino based. Also, calc_raw_layout now takes a snap id.

Signed-off-by: Yehuda Sadeh <yehuda@hq.newdream.net>
Signed-off-by: Sage Weil <sage@newdream.net>
---
 fs/ceph/osd_client.c |  187 ++++++++++++++++++++++++++++++++++---------------
 fs/ceph/osd_client.h |   25 +++++++
 2 files changed, 155 insertions(+), 57 deletions(-)

diff --git a/fs/ceph/osd_client.c b/fs/ceph/osd_client.c
index d25b4ad..2fdd181 100644
--- a/fs/ceph/osd_client.c
+++ b/fs/ceph/osd_client.c
@@ -22,6 +22,35 @@ static int __kick_requests(struct ceph_osd_client *osdc,
 
 static void kick_requests(struct ceph_osd_client *osdc, struct ceph_osd *osd);
 
+void ceph_calc_raw_layout(struct ceph_osd_client *osdc,
+			struct ceph_file_layout *layout,
+			u64 snapid,
+			u64 off, u64 len, u64 *bno,
+			struct ceph_osd_request *req)
+{
+	struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
+	struct ceph_osd_op *op = (void *)(reqhead + 1);
+	u64 orig_len = len;
+	u64 objoff, objlen;    /* extent in object */
+
+	reqhead->snapid = cpu_to_le64(snapid);
+
+	/* object extent? */
+	ceph_calc_file_object_mapping(layout, off, &len, bno,
+				      &objoff, &objlen);
+	if (len < orig_len)
+		dout(" skipping last %llu, final file extent %llu~%llu\n",
+		     orig_len - len, off, len);
+
+	op->extent.offset = cpu_to_le64(objoff);
+	op->extent.length = cpu_to_le64(objlen);
+	req->r_num_pages = calc_pages_for(off, len);
+
+	dout("calc_layout bno=%llx %llu~%llu (%d pages)\n",
+	     *bno, objoff, objlen, req->r_num_pages);
+
+}
+
 /*
  * Implement client access to distributed object storage cluster.
  *
@@ -48,34 +77,17 @@ static void kick_requests(struct ceph_osd_client *osdc, struct ceph_osd *osd);
  * fill osd op in request message.
  */
 static void calc_layout(struct ceph_osd_client *osdc,
-			struct ceph_vino vino, struct ceph_file_layout *layout,
+			struct ceph_vino vino,
+			struct ceph_file_layout *layout,
 			u64 off, u64 *plen,
 			struct ceph_osd_request *req)
 {
-	struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
-	struct ceph_osd_op *op = (void *)(reqhead + 1);
-	u64 orig_len = *plen;
-	u64 objoff, objlen;    /* extent in object */
 	u64 bno;
 
-	reqhead->snapid = cpu_to_le64(vino.snap);
-
-	/* object extent? */
-	ceph_calc_file_object_mapping(layout, off, plen, &bno,
-				      &objoff, &objlen);
-	if (*plen < orig_len)
-		dout(" skipping last %llu, final file extent %llu~%llu\n",
-		     orig_len - *plen, off, *plen);
+	ceph_calc_raw_layout(osdc, layout, vino.snap, off, *plen, &bno, req);
 
 	sprintf(req->r_oid, "%llx.%08llx", vino.ino, bno);
 	req->r_oid_len = strlen(req->r_oid);
-
-	op->extent.offset = cpu_to_le64(objoff);
-	op->extent.length = cpu_to_le64(objlen);
-	req->r_num_pages = calc_pages_for(off, *plen);
-
-	dout("calc_layout %s (%d) %llu~%llu (%d pages)\n",
-	     req->r_oid, req->r_oid_len, objoff, objlen, req->r_num_pages);
 }
 
 /*
@@ -108,43 +120,34 @@ void ceph_osdc_release_request(struct kref *kref)
 		kfree(req);
 }
 
-/*
- * build new request AND message, calculate layout, and adjust file
- * extent as needed.
- *
- * if the file was recently truncated, we include information about its
- * old and new size so that the object can be updated appropriately.  (we
- * avoid synchronously deleting truncated objects because it's slow.)
- *
- * if @do_sync, include a 'startsync' command so that the osd will flush
- * data quickly.
- */
-struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
-					       struct ceph_file_layout *layout,
-					       struct ceph_vino vino,
-					       u64 off, u64 *plen,
-					       int opcode, int flags,
+struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
+					       int flags,
 					       struct ceph_snap_context *snapc,
 					       int do_sync,
-					       u32 truncate_seq,
-					       u64 truncate_size,
-					       struct timespec *mtime,
-					       bool use_mempool, int num_reply)
+					       bool use_mempool,
+					       gfp_t gfp_flags,
+					       struct page **pages)
 {
 	struct ceph_osd_request *req;
 	struct ceph_msg *msg;
-	struct ceph_osd_request_head *head;
-	struct ceph_osd_op *op;
-	void *p;
 	int num_op = 1 + do_sync;
-	size_t msg_size = sizeof(*head) + num_op*sizeof(*op);
-	int i;
+	size_t msg_size = sizeof(struct ceph_osd_request_head) +
+			  num_op*sizeof(struct ceph_osd_op);
 
 	if (use_mempool) {
-		req = mempool_alloc(osdc->req_mempool, GFP_NOFS);
+		req = mempool_alloc(osdc->req_mempool, gfp_flags);
 		memset(req, 0, sizeof(*req));
 	} else {
-		req = kzalloc(sizeof(*req), GFP_NOFS);
+		req = kzalloc(sizeof(*req), gfp_flags);
+	}
+	if (!req)
+		return NULL;
+
+	if (use_mempool) {
+		req = mempool_alloc(osdc->req_mempool, gfp_flags);
+		memset(req, 0, sizeof(*req));
+	} else {
+		req = kzalloc(sizeof(*req), gfp_flags);
 	}
 	if (req == NULL)
 		return NULL;
@@ -164,7 +167,7 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
 		msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0);
 	else
 		msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY,
-				   OSD_OPREPLY_FRONT_LEN, GFP_NOFS);
+				   OSD_OPREPLY_FRONT_LEN, gfp_flags);
 	if (!msg) {
 		ceph_osdc_put_request(req);
 		return NULL;
@@ -178,18 +181,48 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
 	if (use_mempool)
 		msg = ceph_msgpool_get(&osdc->msgpool_op, 0);
 	else
-		msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, GFP_NOFS);
+		msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, gfp_flags);
 	if (!msg) {
 		ceph_osdc_put_request(req);
 		return NULL;
 	}
 	msg->hdr.type = cpu_to_le16(CEPH_MSG_OSD_OP);
 	memset(msg->front.iov_base, 0, msg->front.iov_len);
+
+	req->r_request = msg;
+	req->r_pages = pages;
+
+	return req;
+}
+
+/*
+ * build new request AND message
+ *
+ */
+void ceph_osdc_build_request(struct ceph_osd_request *req,
+			    u64 off, u64 *plen,
+			    int opcode,
+			    struct ceph_snap_context *snapc,
+			    int do_sync,
+			    u32 truncate_seq,
+			    u64 truncate_size,
+			    struct timespec *mtime,
+			    const char *oid,
+			    int oid_len)
+{
+	struct ceph_msg *msg = req->r_request;
+	struct ceph_osd_request_head *head;
+	struct ceph_osd_op *op;
+	void *p;
+	int num_op = 1 + do_sync;
+	size_t msg_size = sizeof(*head) + num_op*sizeof(*op);
+	int i;
+	int flags = req->r_flags;
+
 	head = msg->front.iov_base;
 	op = (void *)(head + 1);
 	p = (void *)(op + num_op);
 
-	req->r_request = msg;
 	req->r_snapc = ceph_get_snap_context(snapc);
 
 	head->client_inc = cpu_to_le32(1); /* always, for now. */
@@ -199,10 +232,6 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
 	head->num_ops = cpu_to_le16(num_op);
 	op->op = cpu_to_le16(opcode);
 
-	/* calculate max write size */
-	calc_layout(osdc, vino, layout, off, plen, req);
-	req->r_file_layout = *layout;  /* keep a copy */
-
 	if (flags & CEPH_OSD_FLAG_WRITE) {
 		req->r_request->hdr.data_off = cpu_to_le16(off);
 		req->r_request->hdr.data_len = cpu_to_le32(*plen);
@@ -212,9 +241,9 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
 	op->extent.truncate_seq = cpu_to_le32(truncate_seq);
 
 	/* fill in oid */
-	head->object_len = cpu_to_le32(req->r_oid_len);
-	memcpy(p, req->r_oid, req->r_oid_len);
-	p += req->r_oid_len;
+	head->object_len = cpu_to_le32(oid_len);
+	memcpy(p, oid, oid_len);
+	p += oid_len;
 
 	if (do_sync) {
 		op++;
@@ -233,6 +262,50 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
 	msg_size = p - msg->front.iov_base;
 	msg->front.iov_len = msg_size;
 	msg->hdr.front_len = cpu_to_le32(msg_size);
+	return;
+}
+
+/*
+ * build new request AND message, calculate layout, and adjust file
+ * extent as needed.
+ *
+ * if the file was recently truncated, we include information about its
+ * old and new size so that the object can be updated appropriately.  (we
+ * avoid synchronously deleting truncated objects because it's slow.)
+ *
+ * if @do_sync, include a 'startsync' command so that the osd will flush
+ * data quickly.
+ */
+struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
+					       struct ceph_file_layout *layout,
+					       struct ceph_vino vino,
+					       u64 off, u64 *plen,
+					       int opcode, int flags,
+					       struct ceph_snap_context *snapc,
+					       int do_sync,
+					       u32 truncate_seq,
+					       u64 truncate_size,
+					       struct timespec *mtime,
+					       bool use_mempool, int num_reply)
+{
+	struct ceph_osd_request *req =
+		ceph_osdc_alloc_request(osdc, flags,
+					 snapc, do_sync,
+					 use_mempool,
+					 GFP_NOFS, NULL);
+	if (IS_ERR(req))
+		return req;
+
+	/* calculate max write size */
+	calc_layout(osdc, vino, layout, off, plen, req);
+	req->r_file_layout = *layout;  /* keep a copy */
+
+	ceph_osdc_build_request(req, off, plen, opcode,
+				snapc, do_sync,
+				truncate_seq, truncate_size,
+				mtime,
+				req->r_oid, req->r_oid_len);
+
 	return req;
 }
 
diff --git a/fs/ceph/osd_client.h b/fs/ceph/osd_client.h
index ce77698..b687c2e 100644
--- a/fs/ceph/osd_client.h
+++ b/fs/ceph/osd_client.h
@@ -119,6 +119,31 @@ extern void ceph_osdc_handle_reply(struct ceph_osd_client *osdc,
 extern void ceph_osdc_handle_map(struct ceph_osd_client *osdc,
 				 struct ceph_msg *msg);
 
+extern void ceph_calc_raw_layout(struct ceph_osd_client *osdc,
+			struct ceph_file_layout *layout,
+			u64 snapid,
+			u64 off, u64 len, u64 *bno,
+			struct ceph_osd_request *req);
+
+extern struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
+					       int flags,
+					       struct ceph_snap_context *snapc,
+					       int do_sync,
+					       bool use_mempool,
+					       gfp_t gfp_flags,
+					       struct page **pages);
+
+extern void ceph_osdc_build_request(struct ceph_osd_request *req,
+			    u64 off, u64 *plen,
+			    int opcode,
+			    struct ceph_snap_context *snapc,
+			    int do_sync,
+			    u32 truncate_seq,
+			    u64 truncate_size,
+			    struct timespec *mtime,
+			    const char *oid,
+			    int oid_len);
+
 extern struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *,
 				      struct ceph_file_layout *layout,
 				      struct ceph_vino vino,
-- 
1.5.6.5

^ permalink raw reply	[flat|nested] 11+ messages in thread

* [PATCH v3 03/10] ceph-rbd: messenger and osdc changes for rbd
  2010-07-07 22:11 [PATCH v3 00/10] ceph-rbd: ceph RADOS block device Yehuda Sadeh
  2010-07-07 22:11 ` [PATCH v3 01/10] ceph-rbd: lookup pool in osdmap by name Yehuda Sadeh
  2010-07-07 22:11 ` [PATCH v3 02/10] ceph-rbd: refactor osdc requests creation functions Yehuda Sadeh
@ 2010-07-07 22:11 ` Yehuda Sadeh
  2010-07-07 22:11 ` [PATCH v3 04/10] ceph-rbd: enable creation of clients that don't need mds Yehuda Sadeh
                   ` (6 subsequent siblings)
  9 siblings, 0 replies; 11+ messages in thread
From: Yehuda Sadeh @ 2010-07-07 22:11 UTC (permalink / raw)
  To: linux-kernel; +Cc: ceph-devel, linux-fsdevel, sage, Yehuda Sadeh

The messenger can send/receive data in bio.  This was added
so that we wouldn't need to copy the data from the rados
block device.

We can now have trailing variable sized data for osd
ops. Also osd ops encoding is more modular.

Signed-off-by: Yehuda Sadeh <yehuda@hq.newdream.net>
Signed-off-by: Sage Weil <sage@newdream.net>
---
 fs/ceph/messenger.c  |  215 ++++++++++++++++++++++++++++++++++++++++++--------
 fs/ceph/messenger.h  |    4 +
 fs/ceph/osd_client.c |  198 ++++++++++++++++++++++++++++++++++++---------
 fs/ceph/osd_client.h |   58 +++++++++++---
 fs/ceph/pagelist.c   |    2 +-
 fs/ceph/pagelist.h   |    2 +-
 6 files changed, 393 insertions(+), 86 deletions(-)

diff --git a/fs/ceph/messenger.c b/fs/ceph/messenger.c
index 64b8b1f..c559784 100644
--- a/fs/ceph/messenger.c
+++ b/fs/ceph/messenger.c
@@ -9,6 +9,8 @@
 #include <linux/slab.h>
 #include <linux/socket.h>
 #include <linux/string.h>
+#include <linux/bio.h>
+#include <linux/blkdev.h>
 #include <net/tcp.h>
 
 #include "super.h"
@@ -539,8 +541,11 @@ static void prepare_write_message(struct ceph_connection *con)
 	if (le32_to_cpu(m->hdr.data_len) > 0) {
 		/* initialize page iterator */
 		con->out_msg_pos.page = 0;
-		con->out_msg_pos.page_pos =
-			le16_to_cpu(m->hdr.data_off) & ~PAGE_MASK;
+		if (m->pages)
+			con->out_msg_pos.page_pos =
+				le16_to_cpu(m->hdr.data_off) & ~PAGE_MASK;
+		else
+			con->out_msg_pos.page_pos = 0;
 		con->out_msg_pos.data_pos = 0;
 		con->out_msg_pos.did_page_crc = 0;
 		con->out_more = 1;  /* data + footer will follow */
@@ -722,6 +727,31 @@ out:
 	return ret;  /* done! */
 }
 
+#ifdef CONFIG_BLOCK
+static void init_bio_iter(struct bio *bio, struct bio **iter, int *seg)
+{
+	if (!bio) {
+		*iter = NULL;
+		*seg = 0;
+		return;
+	}
+	*iter = bio;
+	*seg = bio->bi_idx;
+}
+
+static void iter_bio_next(struct bio **bio_iter, int *seg)
+{
+	if (*bio_iter == NULL)
+		return;
+
+	BUG_ON(*seg >= (*bio_iter)->bi_vcnt);
+
+	(*seg)++;
+	if (*seg == (*bio_iter)->bi_vcnt)
+		init_bio_iter((*bio_iter)->bi_next, bio_iter, seg);
+}
+#endif
+
 /*
  * Write as much message data payload as we can.  If we finish, queue
  * up the footer.
@@ -736,21 +766,45 @@ static int write_partial_msg_pages(struct ceph_connection *con)
 	size_t len;
 	int crc = con->msgr->nocrc;
 	int ret;
+	int total_max_write;
+	int in_trail = 0;
+	size_t trail_len = (msg->trail ? msg->trail->length : 0);
 
 	dout("write_partial_msg_pages %p msg %p page %d/%d offset %d\n",
 	     con, con->out_msg, con->out_msg_pos.page, con->out_msg->nr_pages,
 	     con->out_msg_pos.page_pos);
 
-	while (con->out_msg_pos.page < con->out_msg->nr_pages) {
+#ifdef CONFIG_BLOCK
+	if (msg->bio && !msg->bio_iter)
+		init_bio_iter(msg->bio, &msg->bio_iter, &msg->bio_seg);
+#endif
+
+	while (data_len > con->out_msg_pos.data_pos) {
 		struct page *page = NULL;
 		void *kaddr = NULL;
+		int max_write = PAGE_SIZE;
+		int page_shift = 0;
+
+		total_max_write = data_len - trail_len - con->out_msg_pos.data_pos;
 
 		/*
 		 * if we are calculating the data crc (the default), we need
 		 * to map the page.  if our pages[] has been revoked, use the
 		 * zero page.
 		 */
-		if (msg->pages) {
+
+		/* have we reached the trail part of the data? */
+		if (con->out_msg_pos.data_pos >= data_len - trail_len) {
+			in_trail = 1;
+
+			total_max_write = data_len - con->out_msg_pos.data_pos;
+
+			page = list_first_entry(&msg->trail->head,
+						struct page, lru);
+			if (crc)
+				kaddr = kmap(page);
+			max_write = PAGE_SIZE;
+		} else if (msg->pages) {
 			page = msg->pages[con->out_msg_pos.page];
 			if (crc)
 				kaddr = kmap(page);
@@ -759,13 +813,25 @@ static int write_partial_msg_pages(struct ceph_connection *con)
 						struct page, lru);
 			if (crc)
 				kaddr = kmap(page);
+#ifdef CONFIG_BLOCK
+		} else if (msg->bio) {
+			struct bio_vec *bv;
+
+			bv = bio_iovec_idx(msg->bio_iter, msg->bio_seg);
+			page = bv->bv_page;
+			page_shift = bv->bv_offset;
+			if (crc)
+				kaddr = kmap(page) + page_shift;
+			max_write = bv->bv_len;
+#endif
 		} else {
 			page = con->msgr->zero_page;
 			if (crc)
 				kaddr = page_address(con->msgr->zero_page);
 		}
-		len = min((int)(PAGE_SIZE - con->out_msg_pos.page_pos),
-			  (int)(data_len - con->out_msg_pos.data_pos));
+		len = min_t(int, max_write - con->out_msg_pos.page_pos,
+			    total_max_write);
+
 		if (crc && !con->out_msg_pos.did_page_crc) {
 			void *base = kaddr + con->out_msg_pos.page_pos;
 			u32 tmpcrc = le32_to_cpu(con->out_msg->footer.data_crc);
@@ -775,13 +841,14 @@ static int write_partial_msg_pages(struct ceph_connection *con)
 				cpu_to_le32(crc32c(tmpcrc, base, len));
 			con->out_msg_pos.did_page_crc = 1;
 		}
-
 		ret = kernel_sendpage(con->sock, page,
-				      con->out_msg_pos.page_pos, len,
+				      con->out_msg_pos.page_pos + page_shift,
+				      len,
 				      MSG_DONTWAIT | MSG_NOSIGNAL |
 				      MSG_MORE);
 
-		if (crc && (msg->pages || msg->pagelist))
+		if (crc &&
+		    (msg->pages || msg->pagelist || msg->bio || in_trail) )
 			kunmap(page);
 
 		if (ret <= 0)
@@ -793,9 +860,16 @@ static int write_partial_msg_pages(struct ceph_connection *con)
 			con->out_msg_pos.page_pos = 0;
 			con->out_msg_pos.page++;
 			con->out_msg_pos.did_page_crc = 0;
-			if (msg->pagelist)
+			if (in_trail)
+				list_move_tail(&page->lru,
+					       &msg->trail->head);
+			else if (msg->pagelist)
 				list_move_tail(&page->lru,
 					       &msg->pagelist->head);
+#ifdef CONFIG_BLOCK
+			else if (msg->bio)
+				iter_bio_next(&msg->bio_iter, &msg->bio_seg);
+#endif
 		}
 	}
 
@@ -1302,8 +1376,7 @@ static int read_partial_message_section(struct ceph_connection *con,
 					struct kvec *section, unsigned int sec_len,
 					u32 *crc)
 {
-	int left;
-	int ret;
+	int ret, left;
 
 	BUG_ON(!section);
 
@@ -1326,13 +1399,83 @@ static int read_partial_message_section(struct ceph_connection *con,
 static struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con,
 				struct ceph_msg_header *hdr,
 				int *skip);
+
+
+static int read_partial_message_pages(struct ceph_connection *con,
+				      struct page **pages,
+				      unsigned data_len, int datacrc)
+{
+	void *p;
+	int ret;
+	int left;
+
+	left = min((int)(data_len - con->in_msg_pos.data_pos),
+		   (int)(PAGE_SIZE - con->in_msg_pos.page_pos));
+	/* (page) data */
+	BUG_ON(pages == NULL);
+	p = kmap(pages[con->in_msg_pos.page]);
+	ret = ceph_tcp_recvmsg(con->sock, p + con->in_msg_pos.page_pos,
+			       left);
+	if (ret > 0 && datacrc)
+		con->in_data_crc =
+			crc32c(con->in_data_crc,
+				  p + con->in_msg_pos.page_pos, ret);
+	kunmap(pages[con->in_msg_pos.page]);
+	if (ret <= 0)
+		return ret;
+	con->in_msg_pos.data_pos += ret;
+	con->in_msg_pos.page_pos += ret;
+	if (con->in_msg_pos.page_pos == PAGE_SIZE) {
+		con->in_msg_pos.page_pos = 0;
+		con->in_msg_pos.page++;
+	}
+
+	return ret;
+}
+
+#ifdef CONFIG_BLOCK
+static int read_partial_message_bio(struct ceph_connection *con,
+				    struct bio **bio_iter, int *bio_seg,
+				    unsigned data_len, int datacrc)
+{
+	struct bio_vec *bv = bio_iovec_idx(*bio_iter, *bio_seg);
+	void *p;
+	int ret, left;
+
+	if (IS_ERR(bv))
+		return PTR_ERR(bv);
+
+	left = min((int)(data_len - con->in_msg_pos.data_pos),
+		   (int)(bv->bv_len - con->in_msg_pos.page_pos));
+
+	p = kmap(bv->bv_page) + bv->bv_offset;
+
+	ret = ceph_tcp_recvmsg(con->sock, p + con->in_msg_pos.page_pos,
+			       left);
+	if (ret > 0 && datacrc)
+		con->in_data_crc =
+			crc32c(con->in_data_crc,
+				  p + con->in_msg_pos.page_pos, ret);
+	kunmap(bv->bv_page);
+	if (ret <= 0)
+		return ret;
+	con->in_msg_pos.data_pos += ret;
+	con->in_msg_pos.page_pos += ret;
+	if (con->in_msg_pos.page_pos == bv->bv_len) {
+		con->in_msg_pos.page_pos = 0;
+		iter_bio_next(bio_iter, bio_seg);
+	}
+
+	return ret;
+}
+#endif
+
 /*
  * read (part of) a message.
  */
 static int read_partial_message(struct ceph_connection *con)
 {
 	struct ceph_msg *m = con->in_msg;
-	void *p;
 	int ret;
 	int to, left;
 	unsigned front_len, middle_len, data_len, data_off;
@@ -1417,7 +1560,10 @@ static int read_partial_message(struct ceph_connection *con)
 			m->middle->vec.iov_len = 0;
 
 		con->in_msg_pos.page = 0;
-		con->in_msg_pos.page_pos = data_off & ~PAGE_MASK;
+		if (m->pages)
+			con->in_msg_pos.page_pos = data_off & ~PAGE_MASK;
+		else
+			con->in_msg_pos.page_pos = 0;
 		con->in_msg_pos.data_pos = 0;
 	}
 
@@ -1434,27 +1580,29 @@ static int read_partial_message(struct ceph_connection *con)
 		if (ret <= 0)
 			return ret;
 	}
+#ifdef CONFIG_BLOCK
+	if (m->bio && !m->bio_iter)
+		init_bio_iter(m->bio, &m->bio_iter, &m->bio_seg);
+#endif
 
 	/* (page) data */
 	while (con->in_msg_pos.data_pos < data_len) {
-		left = min((int)(data_len - con->in_msg_pos.data_pos),
-			   (int)(PAGE_SIZE - con->in_msg_pos.page_pos));
-		BUG_ON(m->pages == NULL);
-		p = kmap(m->pages[con->in_msg_pos.page]);
-		ret = ceph_tcp_recvmsg(con->sock, p + con->in_msg_pos.page_pos,
-				       left);
-		if (ret > 0 && datacrc)
-			con->in_data_crc =
-				crc32c(con->in_data_crc,
-					  p + con->in_msg_pos.page_pos, ret);
-		kunmap(m->pages[con->in_msg_pos.page]);
-		if (ret <= 0)
-			return ret;
-		con->in_msg_pos.data_pos += ret;
-		con->in_msg_pos.page_pos += ret;
-		if (con->in_msg_pos.page_pos == PAGE_SIZE) {
-			con->in_msg_pos.page_pos = 0;
-			con->in_msg_pos.page++;
+		if (m->pages) {
+			ret = read_partial_message_pages(con, m->pages,
+						 data_len, datacrc);
+			if (ret <= 0)
+				return ret;
+#ifdef CONFIG_BLOCK
+		} else if (m->bio) {
+
+			ret = read_partial_message_bio(con,
+						 &m->bio_iter, &m->bio_seg,
+						 data_len, datacrc);
+			if (ret <= 0)
+				return ret;
+#endif
+		} else {
+			BUG_ON(1);
 		}
 	}
 
@@ -2130,6 +2278,9 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags)
 	m->nr_pages = 0;
 	m->pages = NULL;
 	m->pagelist = NULL;
+	m->bio = NULL;
+	m->bio_iter = NULL;
+	m->bio_seg = 0;
 
 	dout("ceph_msg_new %p front %d\n", m, front_len);
 	return m;
diff --git a/fs/ceph/messenger.h b/fs/ceph/messenger.h
index 76fbc95..5a79450 100644
--- a/fs/ceph/messenger.h
+++ b/fs/ceph/messenger.h
@@ -82,6 +82,10 @@ struct ceph_msg {
 	struct ceph_pagelist *pagelist; /* instead of pages */
 	struct list_head list_head;
 	struct kref kref;
+	struct bio  *bio;		/* instead of pages/pagelist */
+	struct bio  *bio_iter;		/* bio iterator */
+	int bio_seg;			/* current bio segment */
+	struct ceph_pagelist *trail;	/* the trailing part of the data */
 	bool front_is_vmalloc;
 	bool more_to_follow;
 	bool needs_out_seq;
diff --git a/fs/ceph/osd_client.c b/fs/ceph/osd_client.c
index 2fdd181..1cb9533 100644
--- a/fs/ceph/osd_client.c
+++ b/fs/ceph/osd_client.c
@@ -6,12 +6,16 @@
 #include <linux/pagemap.h>
 #include <linux/slab.h>
 #include <linux/uaccess.h>
+#ifdef CONFIG_BLOCK
+#include <linux/bio.h>
+#endif
 
 #include "super.h"
 #include "osd_client.h"
 #include "messenger.h"
 #include "decode.h"
 #include "auth.h"
+#include "pagelist.h"
 
 #define OSD_OP_FRONT_LEN	4096
 #define OSD_OPREPLY_FRONT_LEN	512
@@ -25,26 +29,26 @@ static void kick_requests(struct ceph_osd_client *osdc, struct ceph_osd *osd);
 void ceph_calc_raw_layout(struct ceph_osd_client *osdc,
 			struct ceph_file_layout *layout,
 			u64 snapid,
-			u64 off, u64 len, u64 *bno,
+			u64 off, u64 *plen, u64 *bno,
 			struct ceph_osd_request *req)
 {
 	struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
 	struct ceph_osd_op *op = (void *)(reqhead + 1);
-	u64 orig_len = len;
+	u64 orig_len = *plen;
 	u64 objoff, objlen;    /* extent in object */
 
 	reqhead->snapid = cpu_to_le64(snapid);
 
 	/* object extent? */
-	ceph_calc_file_object_mapping(layout, off, &len, bno,
+	ceph_calc_file_object_mapping(layout, off, plen, bno,
 				      &objoff, &objlen);
-	if (len < orig_len)
+	if (*plen < orig_len)
 		dout(" skipping last %llu, final file extent %llu~%llu\n",
-		     orig_len - len, off, len);
+		     orig_len - *plen, off, *plen);
 
 	op->extent.offset = cpu_to_le64(objoff);
 	op->extent.length = cpu_to_le64(objlen);
-	req->r_num_pages = calc_pages_for(off, len);
+	req->r_num_pages = calc_pages_for(off, *plen);
 
 	dout("calc_layout bno=%llx %llu~%llu (%d pages)\n",
 	     *bno, objoff, objlen, req->r_num_pages);
@@ -84,7 +88,7 @@ static void calc_layout(struct ceph_osd_client *osdc,
 {
 	u64 bno;
 
-	ceph_calc_raw_layout(osdc, layout, vino.snap, off, *plen, &bno, req);
+	ceph_calc_raw_layout(osdc, layout, vino.snap, off, plen, &bno, req);
 
 	sprintf(req->r_oid, "%llx.%08llx", vino.ino, bno);
 	req->r_oid_len = strlen(req->r_oid);
@@ -113,26 +117,62 @@ void ceph_osdc_release_request(struct kref *kref)
 	if (req->r_own_pages)
 		ceph_release_page_vector(req->r_pages,
 					 req->r_num_pages);
+#ifdef CONFIG_BLOCK
+	if (req->r_bio)
+		bio_put(req->r_bio);
+#endif
 	ceph_put_snap_context(req->r_snapc);
+	if (req->r_trail)
+		ceph_pagelist_release(req->r_trail);
 	if (req->r_mempool)
 		mempool_free(req, req->r_osdc->req_mempool);
 	else
 		kfree(req);
 }
 
+static int op_needs_trail(int op)
+{
+	switch (op) {
+	case CEPH_OSD_OP_GETXATTR:
+	case CEPH_OSD_OP_SETXATTR:
+	case CEPH_OSD_OP_CMPXATTR:
+		return 1;
+	default:
+		return 0;
+	}
+}
+
+static int get_num_ops(struct ceph_osd_req_op *ops, int *needs_trail)
+{
+	int i = 0;
+
+	if (needs_trail)
+		*needs_trail = 0;
+	while (ops[i].op) {
+		if (needs_trail && op_needs_trail(ops[i].op))
+			*needs_trail = 1;
+		i++;
+	}
+
+	return i;
+}
+
 struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
 					       int flags,
 					       struct ceph_snap_context *snapc,
-					       int do_sync,
+					       struct ceph_osd_req_op *ops,
 					       bool use_mempool,
 					       gfp_t gfp_flags,
-					       struct page **pages)
+					       struct page **pages,
+					       struct bio *bio)
 {
 	struct ceph_osd_request *req;
 	struct ceph_msg *msg;
-	int num_op = 1 + do_sync;
-	size_t msg_size = sizeof(struct ceph_osd_request_head) +
-			  num_op*sizeof(struct ceph_osd_op);
+	int needs_trail;
+	int num_op = get_num_ops(ops, &needs_trail);
+	size_t msg_size = sizeof(struct ceph_osd_request_head);
+
+	msg_size += num_op*sizeof(struct ceph_osd_op);
 
 	if (use_mempool) {
 		req = mempool_alloc(osdc->req_mempool, gfp_flags);
@@ -154,6 +194,7 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
 
 	req->r_osdc = osdc;
 	req->r_mempool = use_mempool;
+
 	kref_init(&req->r_kref);
 	init_completion(&req->r_completion);
 	init_completion(&req->r_safe_completion);
@@ -174,6 +215,15 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
 	}
 	req->r_reply = msg;
 
+	/* allocate space for the trailing data */
+	if (needs_trail) {
+		req->r_trail = kmalloc(sizeof(struct ceph_pagelist), gfp_flags);
+		if (!req->r_trail) {
+			ceph_osdc_put_request(req);
+			return NULL;
+		}
+		ceph_pagelist_init(req->r_trail);
+	}
 	/* create request message; allow space for oid */
 	msg_size += 40;
 	if (snapc)
@@ -191,33 +241,77 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
 
 	req->r_request = msg;
 	req->r_pages = pages;
+#ifdef CONFIG_BLOCK
+	if (bio) {
+		req->r_bio = bio;
+		bio_get(req->r_bio);
+	}
+#endif
 
 	return req;
 }
 
+static void osd_req_encode_op(struct ceph_osd_request *req,
+			      struct ceph_osd_op *dst,
+			      struct ceph_osd_req_op *src)
+{
+	dst->op = cpu_to_le16(src->op);
+
+	switch (dst->op) {
+	case CEPH_OSD_OP_READ:
+	case CEPH_OSD_OP_WRITE:
+		dst->extent.offset =
+			cpu_to_le64(src->extent.offset);
+		dst->extent.length =
+			cpu_to_le64(src->extent.length);
+		dst->extent.truncate_size =
+			cpu_to_le64(src->extent.truncate_size);
+		dst->extent.truncate_seq =
+			cpu_to_le32(src->extent.truncate_seq);
+		break;
+
+	case CEPH_OSD_OP_GETXATTR:
+	case CEPH_OSD_OP_SETXATTR:
+	case CEPH_OSD_OP_CMPXATTR:
+		BUG_ON(!req->r_trail);
+
+		dst->xattr.name_len = cpu_to_le32(src->xattr.name_len);
+		dst->xattr.value_len = cpu_to_le32(src->xattr.value_len);
+		dst->xattr.cmp_op = src->xattr.cmp_op;
+		dst->xattr.cmp_mode = src->xattr.cmp_mode;
+		ceph_pagelist_append(req->r_trail, src->xattr.name,
+				     src->xattr.name_len);
+		ceph_pagelist_append(req->r_trail, src->xattr.val,
+				     src->xattr.value_len);
+		dst->payload_len = cpu_to_le32(src->xattr.name_len +
+					       src->xattr.value_len);
+		break;
+	}
+	dst->payload_len = cpu_to_le32(src->payload_len);
+}
+
 /*
  * build new request AND message
  *
  */
 void ceph_osdc_build_request(struct ceph_osd_request *req,
-			    u64 off, u64 *plen,
-			    int opcode,
-			    struct ceph_snap_context *snapc,
-			    int do_sync,
-			    u32 truncate_seq,
-			    u64 truncate_size,
-			    struct timespec *mtime,
-			    const char *oid,
-			    int oid_len)
+			     u64 off, u64 *plen,
+			     struct ceph_osd_req_op *src_ops,
+			     struct ceph_snap_context *snapc,
+			     struct timespec *mtime,
+			     const char *oid,
+			     int oid_len)
 {
 	struct ceph_msg *msg = req->r_request;
 	struct ceph_osd_request_head *head;
+	struct ceph_osd_req_op *src_op;
 	struct ceph_osd_op *op;
 	void *p;
-	int num_op = 1 + do_sync;
+	int num_op = get_num_ops(src_ops, NULL);
 	size_t msg_size = sizeof(*head) + num_op*sizeof(*op);
-	int i;
 	int flags = req->r_flags;
+	u64 data_len = 0;
+	int i;
 
 	head = msg->front.iov_base;
 	op = (void *)(head + 1);
@@ -230,25 +324,23 @@ void ceph_osdc_build_request(struct ceph_osd_request *req,
 	if (flags & CEPH_OSD_FLAG_WRITE)
 		ceph_encode_timespec(&head->mtime, mtime);
 	head->num_ops = cpu_to_le16(num_op);
-	op->op = cpu_to_le16(opcode);
 
-	if (flags & CEPH_OSD_FLAG_WRITE) {
-		req->r_request->hdr.data_off = cpu_to_le16(off);
-		req->r_request->hdr.data_len = cpu_to_le32(*plen);
-		op->payload_len = cpu_to_le32(*plen);
-	}
-	op->extent.truncate_size = cpu_to_le64(truncate_size);
-	op->extent.truncate_seq = cpu_to_le32(truncate_seq);
 
 	/* fill in oid */
 	head->object_len = cpu_to_le32(oid_len);
 	memcpy(p, oid, oid_len);
 	p += oid_len;
 
-	if (do_sync) {
+	src_op = src_ops;
+	while (src_op->op) {
+		osd_req_encode_op(req, op, src_op);
+		src_op++;
 		op++;
-		op->op = cpu_to_le16(CEPH_OSD_OP_STARTSYNC);
 	}
+
+	if (req->r_trail)
+		data_len += req->r_trail->length;
+
 	if (snapc) {
 		head->snap_seq = cpu_to_le64(snapc->seq);
 		head->num_snaps = cpu_to_le32(snapc->num_snaps);
@@ -258,6 +350,14 @@ void ceph_osdc_build_request(struct ceph_osd_request *req,
 		}
 	}
 
+	if (flags & CEPH_OSD_FLAG_WRITE) {
+		req->r_request->hdr.data_off = cpu_to_le16(off);
+		req->r_request->hdr.data_len = cpu_to_le32(*plen + data_len);
+	} else if (data_len) {
+		req->r_request->hdr.data_off = 0;
+		req->r_request->hdr.data_len = cpu_to_le32(data_len);
+	}
+
 	BUG_ON(p > msg->front.iov_base + msg->front.iov_len);
 	msg_size = p - msg->front.iov_base;
 	msg->front.iov_len = msg_size;
@@ -288,11 +388,23 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
 					       struct timespec *mtime,
 					       bool use_mempool, int num_reply)
 {
-	struct ceph_osd_request *req =
-		ceph_osdc_alloc_request(osdc, flags,
-					 snapc, do_sync,
+	struct ceph_osd_req_op ops[3];
+	struct ceph_osd_request *req;
+
+	ops[0].op = opcode;
+	ops[0].extent.truncate_seq = truncate_seq;
+	ops[0].extent.truncate_size = truncate_size;
+
+	if (do_sync) {
+		ops[1].op = CEPH_OSD_OP_STARTSYNC;
+		ops[2].op = 0;
+	} else
+		ops[1].op = 0;
+
+	req = ceph_osdc_alloc_request(osdc, flags,
+					 snapc, ops,
 					 use_mempool,
-					 GFP_NOFS, NULL);
+					 GFP_NOFS, NULL, NULL);
 	if (IS_ERR(req))
 		return req;
 
@@ -300,9 +412,8 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
 	calc_layout(osdc, vino, layout, off, plen, req);
 	req->r_file_layout = *layout;  /* keep a copy */
 
-	ceph_osdc_build_request(req, off, plen, opcode,
-				snapc, do_sync,
-				truncate_seq, truncate_size,
+	ceph_osdc_build_request(req, off, plen, ops,
+				snapc,
 				mtime,
 				req->r_oid, req->r_oid_len);
 
@@ -1177,6 +1288,10 @@ int ceph_osdc_start_request(struct ceph_osd_client *osdc,
 
 	req->r_request->pages = req->r_pages;
 	req->r_request->nr_pages = req->r_num_pages;
+#ifdef CONFIG_BLOCK
+	req->r_request->bio = req->r_bio;
+#endif
+	req->r_request->trail = req->r_trail;
 
 	register_request(osdc, req);
 
@@ -1495,6 +1610,9 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
 		}
 		m->pages = req->r_pages;
 		m->nr_pages = req->r_num_pages;
+#ifdef CONFIG_BLOCK
+		m->bio = req->r_bio;
+#endif
 	}
 	*skip = 0;
 	req->r_con_filling_msg = ceph_con_get(con);
diff --git a/fs/ceph/osd_client.h b/fs/ceph/osd_client.h
index b687c2e..51c2b79 100644
--- a/fs/ceph/osd_client.h
+++ b/fs/ceph/osd_client.h
@@ -15,6 +15,7 @@ struct ceph_snap_context;
 struct ceph_osd_request;
 struct ceph_osd_client;
 struct ceph_authorizer;
+struct ceph_pagelist;
 
 /*
  * completion callback for async writepages
@@ -80,6 +81,11 @@ struct ceph_osd_request {
 	struct page     **r_pages;            /* pages for data payload */
 	int               r_pages_from_pool;
 	int               r_own_pages;        /* if true, i own page list */
+#ifdef CONFIG_BLOCK
+	struct bio       *r_bio;	      /* instead of pages */
+#endif
+
+	struct ceph_pagelist *r_trail;	      /* trailing part of the data */
 };
 
 struct ceph_osd_client {
@@ -110,6 +116,36 @@ struct ceph_osd_client {
 	struct ceph_msgpool	msgpool_op_reply;
 };
 
+struct ceph_osd_req_op {
+	u16 op;           /* CEPH_OSD_OP_* */
+	u32 flags;        /* CEPH_OSD_FLAG_* */
+	union {
+		struct {
+			u64 offset, length;
+			u64 truncate_size;
+			u32 truncate_seq;
+		} extent;
+		struct {
+			const char *name;
+			u32 name_len;
+			const char  *val;
+			u32 value_len;
+			__u8 cmp_op;       /* CEPH_OSD_CMPXATTR_OP_* */
+			__u8 cmp_mode;     /* CEPH_OSD_CMPXATTR_MODE_* */
+		} xattr;
+		struct {
+			__u8 class_len;
+			__u8 method_len;
+			__u8 argc;
+			u32 indata_len;
+		} cls;
+		struct {
+			u64 cookie, count;
+		} pgls;
+	};
+	u32 payload_len;
+};
+
 extern int ceph_osdc_init(struct ceph_osd_client *osdc,
 			  struct ceph_client *client);
 extern void ceph_osdc_stop(struct ceph_osd_client *osdc);
@@ -122,27 +158,25 @@ extern void ceph_osdc_handle_map(struct ceph_osd_client *osdc,
 extern void ceph_calc_raw_layout(struct ceph_osd_client *osdc,
 			struct ceph_file_layout *layout,
 			u64 snapid,
-			u64 off, u64 len, u64 *bno,
+			u64 off, u64 *plen, u64 *bno,
 			struct ceph_osd_request *req);
 
 extern struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
 					       int flags,
 					       struct ceph_snap_context *snapc,
-					       int do_sync,
+					       struct ceph_osd_req_op *ops,
 					       bool use_mempool,
 					       gfp_t gfp_flags,
-					       struct page **pages);
+					       struct page **pages,
+					       struct bio *bio);
 
 extern void ceph_osdc_build_request(struct ceph_osd_request *req,
-			    u64 off, u64 *plen,
-			    int opcode,
-			    struct ceph_snap_context *snapc,
-			    int do_sync,
-			    u32 truncate_seq,
-			    u64 truncate_size,
-			    struct timespec *mtime,
-			    const char *oid,
-			    int oid_len);
+				    u64 off, u64 *plen,
+				    struct ceph_osd_req_op *src_ops,
+				    struct ceph_snap_context *snapc,
+				    struct timespec *mtime,
+				    const char *oid,
+				    int oid_len);
 
 extern struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *,
 				      struct ceph_file_layout *layout,
diff --git a/fs/ceph/pagelist.c b/fs/ceph/pagelist.c
index b6859f4..8170365 100644
--- a/fs/ceph/pagelist.c
+++ b/fs/ceph/pagelist.c
@@ -31,7 +31,7 @@ static int ceph_pagelist_addpage(struct ceph_pagelist *pl)
 	return 0;
 }
 
-int ceph_pagelist_append(struct ceph_pagelist *pl, void *buf, size_t len)
+int ceph_pagelist_append(struct ceph_pagelist *pl, const void *buf, size_t len)
 {
 	while (pl->room < len) {
 		size_t bit = pl->room;
diff --git a/fs/ceph/pagelist.h b/fs/ceph/pagelist.h
index e8a4187..cc9327a 100644
--- a/fs/ceph/pagelist.h
+++ b/fs/ceph/pagelist.h
@@ -19,7 +19,7 @@ static inline void ceph_pagelist_init(struct ceph_pagelist *pl)
 }
 extern int ceph_pagelist_release(struct ceph_pagelist *pl);
 
-extern int ceph_pagelist_append(struct ceph_pagelist *pl, void *d, size_t l);
+extern int ceph_pagelist_append(struct ceph_pagelist *pl, const void *d, size_t l);
 
 static inline int ceph_pagelist_encode_64(struct ceph_pagelist *pl, u64 v)
 {
-- 
1.5.6.5


^ permalink raw reply	[flat|nested] 11+ messages in thread

* [PATCH v3 04/10] ceph-rbd: enable creation of clients that don't need mds
  2010-07-07 22:11 [PATCH v3 00/10] ceph-rbd: ceph RADOS block device Yehuda Sadeh
                   ` (2 preceding siblings ...)
  2010-07-07 22:11 ` [PATCH v3 03/10] ceph-rbd: messenger and osdc changes for rbd Yehuda Sadeh
@ 2010-07-07 22:11 ` Yehuda Sadeh
  2010-07-07 22:11 ` [PATCH v3 05/10] ceph-rbd: refactor mount related functions, add helpers Yehuda Sadeh
                   ` (5 subsequent siblings)
  9 siblings, 0 replies; 11+ messages in thread
From: Yehuda Sadeh @ 2010-07-07 22:11 UTC (permalink / raw)
  To: linux-kernel; +Cc: ceph-devel, linux-fsdevel, sage, Yehuda Sadeh

Preparing grounds for rbd that doesn't need mds client.

Signed-off-by: Yehuda Sadeh <yehuda@hq.newdream.net>
Signed-off-by: Sage Weil <sage@newdream.net>
---
 fs/ceph/debugfs.c    |   11 ++++++++---
 fs/ceph/mon_client.c |    3 ++-
 fs/ceph/super.c      |   18 ++++++++++++------
 fs/ceph/super.h      |    2 ++
 4 files changed, 24 insertions(+), 10 deletions(-)

diff --git a/fs/ceph/debugfs.c b/fs/ceph/debugfs.c
index 3be33fb..c5d6a01 100644
--- a/fs/ceph/debugfs.c
+++ b/fs/ceph/debugfs.c
@@ -440,9 +440,14 @@ int ceph_debugfs_client_init(struct ceph_client *client)
 	if (!client->debugfs_congestion_kb)
 		goto out;
 
-	sprintf(name, "../../bdi/%s", dev_name(client->sb->s_bdi->dev));
-	client->debugfs_bdi = debugfs_create_symlink("bdi", client->debugfs_dir,
-						     name);
+	if (client->backing_dev_info.dev) {
+		sprintf(name, "../../bdi/%s",
+			dev_name(client->backing_dev_info.dev));
+		client->debugfs_bdi =
+			debugfs_create_symlink("bdi",
+					       client->debugfs_dir,
+					       name);
+	}
 
 	return 0;
 
diff --git a/fs/ceph/mon_client.c b/fs/ceph/mon_client.c
index 07a5399..4e022ed 100644
--- a/fs/ceph/mon_client.c
+++ b/fs/ceph/mon_client.c
@@ -787,7 +787,8 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
 		break;
 
 	case CEPH_MSG_MDS_MAP:
-		ceph_mdsc_handle_map(&monc->client->mdsc, msg);
+		if (monc->client->have_mdsc)
+			ceph_mdsc_handle_map(&monc->client->mdsc, msg);
 		break;
 
 	case CEPH_MSG_OSD_MAP:
diff --git a/fs/ceph/super.c b/fs/ceph/super.c
index fa87f51..14c8f5a 100644
--- a/fs/ceph/super.c
+++ b/fs/ceph/super.c
@@ -584,7 +584,8 @@ static void destroy_mount_args(struct ceph_mount_args *args)
 /*
  * create a fresh client instance
  */
-static struct ceph_client *ceph_create_client(struct ceph_mount_args *args)
+struct ceph_client *ceph_create_client(struct ceph_mount_args *args,
+				       int need_mdsc)
 {
 	struct ceph_client *client;
 	int err = -ENOMEM;
@@ -639,9 +640,13 @@ static struct ceph_client *ceph_create_client(struct ceph_mount_args *args)
 	err = ceph_osdc_init(&client->osdc, client);
 	if (err < 0)
 		goto fail_monc;
-	err = ceph_mdsc_init(&client->mdsc, client);
-	if (err < 0)
-		goto fail_osdc;
+	if (need_mdsc) {
+		err = ceph_mdsc_init(&client->mdsc, client);
+		if (err < 0)
+			goto fail_osdc;
+		client->have_mdsc = 1;
+	}
+
 	return client;
 
 fail_osdc:
@@ -668,7 +673,8 @@ static void ceph_destroy_client(struct ceph_client *client)
 	dout("destroy_client %p\n", client);
 
 	/* unmount */
-	ceph_mdsc_stop(&client->mdsc);
+	if (client->have_mdsc)
+		ceph_mdsc_stop(&client->mdsc);
 	ceph_osdc_stop(&client->osdc);
 
 	/*
@@ -963,7 +969,7 @@ static int ceph_get_sb(struct file_system_type *fs_type,
 	}
 
 	/* create client (which we may/may not use) */
-	client = ceph_create_client(args);
+	client = ceph_create_client(args, 1);
 	if (IS_ERR(client)) {
 		err = PTR_ERR(client);
 		goto out_final;
diff --git a/fs/ceph/super.h b/fs/ceph/super.h
index 10a4a40..2602248 100644
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -139,6 +139,8 @@ struct ceph_client {
 
 	int min_caps;                  /* min caps i added */
 
+	int have_mdsc;
+
 	struct ceph_messenger *msgr;   /* messenger instance */
 	struct ceph_mon_client monc;
 	struct ceph_mds_client mdsc;
-- 
1.5.6.5


^ permalink raw reply	[flat|nested] 11+ messages in thread

* [PATCH v3 05/10] ceph-rbd: refactor mount related functions, add helpers
  2010-07-07 22:11 [PATCH v3 00/10] ceph-rbd: ceph RADOS block device Yehuda Sadeh
                   ` (3 preceding siblings ...)
  2010-07-07 22:11 ` [PATCH v3 04/10] ceph-rbd: enable creation of clients that don't need mds Yehuda Sadeh
@ 2010-07-07 22:11 ` Yehuda Sadeh
  2010-07-07 22:11 ` [PATCH v3 06/10] ceph-rbd: generalize mon requests, add pool op support Yehuda Sadeh
                   ` (4 subsequent siblings)
  9 siblings, 0 replies; 11+ messages in thread
From: Yehuda Sadeh @ 2010-07-07 22:11 UTC (permalink / raw)
  To: linux-kernel; +Cc: ceph-devel, linux-fsdevel, sage, Yehuda Sadeh

Removed some functions' static declarations, separated mount
operation to __open_session and open_root_dentry for clients
that don't need the latter (rbd). Added other helper functions
that will be used later in the rbd.

Signed-off-by: Yehuda Sadeh <yehuda@hq.newdream.net>
Signed-off-by: Sage Weil <sage@newdream.net>
---
 fs/ceph/file.c       |   46 +++++++++++++++
 fs/ceph/osd_client.h |    1 +
 fs/ceph/super.c      |  149 +++++++++++++++++++++++++++++++++++++------------
 fs/ceph/super.h      |   27 ++++++++-
 4 files changed, 183 insertions(+), 40 deletions(-)

diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index 6251a15..4c21b51 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -363,6 +363,52 @@ static int copy_user_to_page_vector(struct page **pages,
 	return len;
 }
 
+int ceph_copy_to_page_vector(struct page **pages,
+				    const char *data,
+				    loff_t off, size_t len)
+{
+	int i = 0;
+	size_t po = off & ~PAGE_CACHE_MASK;
+	size_t left = len;
+	size_t l;
+
+	while (left > 0) {
+		l = min_t(size_t, PAGE_CACHE_SIZE-po, left);
+		memcpy(page_address(pages[i]) + po, data, l);
+		data += l;
+		left -= l;
+		po += l;
+		if (po == PAGE_CACHE_SIZE) {
+			po = 0;
+			i++;
+		}
+	}
+	return len;
+}
+
+int ceph_copy_from_page_vector(struct page **pages,
+				    char *data,
+				    loff_t off, size_t len)
+{
+	int i = 0;
+	size_t po = off & ~PAGE_CACHE_MASK;
+	size_t left = len;
+	size_t l;
+
+	while (left > 0) {
+		l = min_t(size_t, PAGE_CACHE_SIZE-po, left);
+		memcpy(data, page_address(pages[i]) + po, l);
+		data += l;
+		left -= l;
+		po += l;
+		if (po == PAGE_CACHE_SIZE) {
+			po = 0;
+			i++;
+		}
+	}
+	return len;
+}
+
 /*
  * copy user data from a page vector into a user pointer
  */
diff --git a/fs/ceph/osd_client.h b/fs/ceph/osd_client.h
index 51c2b79..38c4615 100644
--- a/fs/ceph/osd_client.h
+++ b/fs/ceph/osd_client.h
@@ -69,6 +69,7 @@ struct ceph_osd_request {
 	struct list_head  r_unsafe_item;
 
 	struct inode *r_inode;         	      /* for use by callbacks */
+	void *r_priv;			      /* ditto */
 
 	char              r_oid[40];          /* object name */
 	int               r_oid_len;
diff --git a/fs/ceph/super.c b/fs/ceph/super.c
index 14c8f5a..43b1589 100644
--- a/fs/ceph/super.c
+++ b/fs/ceph/super.c
@@ -387,14 +387,15 @@ static match_table_t arg_tokens = {
 };
 
 
-static struct ceph_mount_args *parse_mount_args(int flags, char *options,
-						const char *dev_name,
-						const char **path)
+struct ceph_mount_args *parse_mount_args(int flags, char *options,
+					 const char *dev_name,
+					 const char **path)
 {
 	struct ceph_mount_args *args;
 	const char *c;
 	int err = -ENOMEM;
 	substring_t argstr[MAX_OPT_ARGS];
+	const char *end_path;
 
 	args = kzalloc(sizeof(*args), GFP_KERNEL);
 	if (!args)
@@ -426,23 +427,29 @@ static struct ceph_mount_args *parse_mount_args(int flags, char *options,
 	err = -EINVAL;
 	if (!dev_name)
 		goto out;
-	*path = strstr(dev_name, ":/");
-	if (*path == NULL) {
-		pr_err("device name is missing path (no :/ in %s)\n",
-		       dev_name);
-		goto out;
+
+	if (path) {
+		*path = strstr(dev_name, ":/");
+		if (*path == NULL) {
+			pr_err("device name is missing path (no :/ in %s)\n",
+			       dev_name);
+			goto out;
+		}
+		end_path = *path;
+
+		/* path on server */
+		*path += 2;
+		dout("server path '%s'\n", *path);
+	} else {
+		end_path = dev_name + strlen(dev_name);
 	}
 
 	/* get mon ip(s) */
-	err = ceph_parse_ips(dev_name, *path, args->mon_addr,
+	err = ceph_parse_ips(dev_name, end_path, args->mon_addr,
 			     CEPH_MAX_MON, &args->num_mon);
 	if (err < 0)
 		goto out;
 
-	/* path on server */
-	*path += 2;
-	dout("server path '%s'\n", *path);
-
 	/* parse mount options */
 	while ((c = strsep(&options, ",")) != NULL) {
 		int token, intval, ret;
@@ -569,18 +576,60 @@ out:
 	return ERR_PTR(err);
 }
 
-static void destroy_mount_args(struct ceph_mount_args *args)
+void ceph_destroy_mount_args(struct ceph_mount_args *args)
 {
 	dout("destroy_mount_args %p\n", args);
 	kfree(args->snapdir_name);
-	args->snapdir_name = NULL;
 	kfree(args->name);
-	args->name = NULL;
 	kfree(args->secret);
-	args->secret = NULL;
 	kfree(args);
 }
 
+static int strcmp_null(const char *s1, const char *s2)
+{
+	if (!s1 && !s2)
+		return 0;
+	if (s1 && !s2)
+		return -1;
+	if (!s1 && s2)
+		return 1;
+	return strcmp(s1, s2);
+}
+
+int ceph_compare_mount_args(struct ceph_mount_args *new_args,
+			    struct ceph_client *client)
+{
+	struct ceph_mount_args *args1 = new_args;
+	struct ceph_mount_args *args2 = client->mount_args;
+	int ofs = offsetof(struct ceph_mount_args, mon_addr);
+	int i;
+	int ret;
+
+	ret = memcmp(args1, args2, ofs);
+	if (ret)
+		return ret;
+
+	ret = strcmp_null(args1->snapdir_name, args2->snapdir_name);
+	if (ret)
+		return ret;
+
+	ret = strcmp_null(args1->name, args2->name);
+	if (ret)
+		return ret;
+
+	ret = strcmp_null(args1->secret, args2->secret);
+	if (ret)
+		return ret;
+
+	for (i = 0; i < args1->num_mon; i++) {
+		if (ceph_monmap_contains(client->monc.monmap,
+				 &args1->mon_addr[i]))
+			return 0;
+	}
+
+	return -1;
+}
+
 /*
  * create a fresh client instance
  */
@@ -668,7 +717,7 @@ fail:
 	return ERR_PTR(err);
 }
 
-static void ceph_destroy_client(struct ceph_client *client)
+void ceph_destroy_client(struct ceph_client *client)
 {
 	dout("destroy_client %p\n", client);
 
@@ -699,7 +748,7 @@ static void ceph_destroy_client(struct ceph_client *client)
 		ceph_messenger_destroy(client->msgr);
 	mempool_destroy(client->wb_pagevec_pool);
 
-	destroy_mount_args(client->mount_args);
+	ceph_destroy_mount_args(client->mount_args);
 
 	kfree(client);
 	dout("destroy_client %p done\n", client);
@@ -780,17 +829,12 @@ static struct dentry *open_root_dentry(struct ceph_client *client,
 /*
  * mount: join the ceph cluster, and open root directory.
  */
-static int ceph_mount(struct ceph_client *client, struct vfsmount *mnt,
-		      const char *path)
+static int __ceph_open_session(struct ceph_client *client,
+			       unsigned long started)
 {
 	struct ceph_entity_addr *myaddr = NULL;
 	int err;
 	unsigned long timeout = client->mount_args->mount_timeout * HZ;
-	unsigned long started = jiffies;  /* note the start time */
-	struct dentry *root;
-
-	dout("mount start\n");
-	mutex_lock(&client->mount_mutex);
 
 	/* initialize the messenger */
 	if (client->msgr == NULL) {
@@ -798,9 +842,8 @@ static int ceph_mount(struct ceph_client *client, struct vfsmount *mnt,
 			myaddr = &client->mount_args->my_addr;
 		client->msgr = ceph_messenger_create(myaddr);
 		if (IS_ERR(client->msgr)) {
-			err = PTR_ERR(client->msgr);
 			client->msgr = NULL;
-			goto out;
+			return PTR_ERR(client->msgr);
 		}
 		client->msgr->nocrc = ceph_test_opt(client, NOCRC);
 	}
@@ -808,26 +851,58 @@ static int ceph_mount(struct ceph_client *client, struct vfsmount *mnt,
 	/* open session, and wait for mon, mds, and osd maps */
 	err = ceph_monc_open_session(&client->monc);
 	if (err < 0)
-		goto out;
+		return err;
 
 	while (!have_mon_and_osd_map(client)) {
 		err = -EIO;
 		if (timeout && time_after_eq(jiffies, started + timeout))
-			goto out;
+			return err;
 
 		/* wait */
 		dout("mount waiting for mon_map\n");
 		err = wait_event_interruptible_timeout(client->auth_wq,
-		       have_mon_and_osd_map(client) || (client->auth_err < 0),
-		       timeout);
+			have_mon_and_osd_map(client) || (client->auth_err < 0),
+			timeout);
 		if (err == -EINTR || err == -ERESTARTSYS)
-			goto out;
-		if (client->auth_err < 0) {
-			err = client->auth_err;
-			goto out;
-		}
+			return err;
+		if (client->auth_err < 0)
+			return client->auth_err;
 	}
 
+	return 0;
+}
+
+int ceph_open_session(struct ceph_client *client)
+{
+	int ret;
+	unsigned long started = jiffies;  /* note the start time */
+
+	dout("open_session start\n");
+	mutex_lock(&client->mount_mutex);
+
+	ret = __ceph_open_session(client, started);
+
+	mutex_unlock(&client->mount_mutex);
+	return ret;
+}
+
+/*
+ * mount: join the ceph cluster, and open root directory.
+ */
+static int ceph_mount(struct ceph_client *client, struct vfsmount *mnt,
+		      const char *path)
+{
+	int err;
+	unsigned long started = jiffies;  /* note the start time */
+	struct dentry *root;
+
+	dout("mount start\n");
+	mutex_lock(&client->mount_mutex);
+
+	err = __ceph_open_session(client, started);
+	if (err < 0)
+		goto out;
+
 	dout("mount opening root\n");
 	root = open_root_dentry(client, "", started);
 	if (IS_ERR(root)) {
diff --git a/fs/ceph/super.h b/fs/ceph/super.h
index 2602248..ee67b81 100644
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -48,14 +48,11 @@
 #define ceph_test_opt(client, opt) \
 	(!!((client)->mount_args->flags & CEPH_OPT_##opt))
 
-
 struct ceph_mount_args {
 	int sb_flags;
 	int flags;
 	struct ceph_fsid fsid;
 	struct ceph_entity_addr my_addr;
-	int num_mon;
-	struct ceph_entity_addr *mon_addr;
 	int mount_timeout;
 	int osd_idle_ttl;
 	int osd_timeout;
@@ -67,6 +64,13 @@ struct ceph_mount_args {
 	int cap_release_safety;
 	int max_readdir;       /* max readdir result (entires) */
 	int max_readdir_bytes; /* max readdir result (bytes) */
+
+	/* any type that can't be simply compared or doesn't need
+	   need to be compared should go beyond this point,
+	   ceph_compare_mount_args() should be updated accordingly */
+	struct ceph_entity_addr *mon_addr; /* should be the first
+					      pointer type of args */
+	int num_mon;
 	char *snapdir_name;   /* default ".snap" */
 	char *name;
 	char *secret;
@@ -739,6 +743,16 @@ extern struct kmem_cache *ceph_file_cachep;
 
 extern const char *ceph_msg_type_name(int type);
 extern int ceph_check_fsid(struct ceph_client *client, struct ceph_fsid *fsid);
+extern struct ceph_mount_args *parse_mount_args(int flags, char *options,
+						const char *dev_name,
+						const char **path);
+extern void ceph_destroy_mount_args(struct ceph_mount_args *args);
+extern int ceph_compare_mount_args(struct ceph_mount_args *new_args,
+			    struct ceph_client *client);
+extern struct ceph_client *ceph_create_client(struct ceph_mount_args *args,
+					      int need_mdsc);
+extern void ceph_destroy_client(struct ceph_client *client);
+extern int ceph_open_session(struct ceph_client *client);
 
 #define FSID_FORMAT "%02x%02x%02x%02x-%02x%02x-%02x%02x-%02x%02x-" \
 	"%02x%02x%02x%02x%02x%02x"
@@ -849,6 +863,13 @@ extern int ceph_mmap(struct file *file, struct vm_area_struct *vma);
 /* file.c */
 extern const struct file_operations ceph_file_fops;
 extern const struct address_space_operations ceph_aops;
+extern int ceph_copy_to_page_vector(struct page **pages,
+				    const char *data,
+				    loff_t off, size_t len);
+extern int ceph_copy_from_page_vector(struct page **pages,
+				    char *data,
+				    loff_t off, size_t len);
+extern struct page **ceph_alloc_page_vector(int num_pages, gfp_t flags);
 extern int ceph_open(struct inode *inode, struct file *file);
 extern struct dentry *ceph_lookup_open(struct inode *dir, struct dentry *dentry,
 				       struct nameidata *nd, int mode,
-- 
1.5.6.5


^ permalink raw reply	[flat|nested] 11+ messages in thread

* [PATCH v3 06/10] ceph-rbd: generalize mon requests, add pool op support
  2010-07-07 22:11 [PATCH v3 00/10] ceph-rbd: ceph RADOS block device Yehuda Sadeh
                   ` (4 preceding siblings ...)
  2010-07-07 22:11 ` [PATCH v3 05/10] ceph-rbd: refactor mount related functions, add helpers Yehuda Sadeh
@ 2010-07-07 22:11 ` Yehuda Sadeh
  2010-07-07 22:11 ` [PATCH v3 07/10] ceph-rbd: some super.c changes for rbd Yehuda Sadeh
                   ` (3 subsequent siblings)
  9 siblings, 0 replies; 11+ messages in thread
From: Yehuda Sadeh @ 2010-07-07 22:11 UTC (permalink / raw)
  To: linux-kernel; +Cc: ceph-devel, linux-fsdevel, sage, Yehuda Sadeh

Generalize the current statfs synchronous requests, and support pool_ops.

Signed-off-by: Yehuda Sadeh <yehuda@hq.newdream.net>
Signed-off-by: Sage Weil <sage@newdream.net>
---
 fs/ceph/mon_client.c |  170 +++++++++++++++++++++++++++++++++++++++++++++-----
 fs/ceph/mon_client.h |    5 ++
 2 files changed, 158 insertions(+), 17 deletions(-)

diff --git a/fs/ceph/mon_client.c b/fs/ceph/mon_client.c
index 4e022ed..c1082e8 100644
--- a/fs/ceph/mon_client.c
+++ b/fs/ceph/mon_client.c
@@ -349,7 +349,7 @@ out:
 }
 
 /*
- * statfs
+ * generic requests (e.g., statfs, poolop)
  */
 static struct ceph_mon_generic_request *__lookup_generic_req(
 	struct ceph_mon_client *monc, u64 tid)
@@ -442,6 +442,35 @@ static struct ceph_msg *get_generic_reply(struct ceph_connection *con,
 	return m;
 }
 
+static int do_generic_request(struct ceph_mon_client *monc,
+			      struct ceph_mon_generic_request *req)
+{
+	int err;
+
+	/* register request */
+	mutex_lock(&monc->mutex);
+	req->tid = ++monc->last_tid;
+	req->request->hdr.tid = cpu_to_le64(req->tid);
+	__insert_generic_request(monc, req);
+	monc->num_generic_requests++;
+	ceph_con_send(monc->con, ceph_msg_get(req->request));
+	mutex_unlock(&monc->mutex);
+
+	err = wait_for_completion_interruptible(&req->completion);
+
+	mutex_lock(&monc->mutex);
+	rb_erase(&req->node, &monc->generic_request_tree);
+	monc->num_generic_requests--;
+	mutex_unlock(&monc->mutex);
+
+	if (!err)
+		err = req->result;
+	return err;
+}
+
+/*
+ * statfs
+ */
 static void handle_statfs_reply(struct ceph_mon_client *monc,
 				struct ceph_msg *msg)
 {
@@ -468,7 +497,7 @@ static void handle_statfs_reply(struct ceph_mon_client *monc,
 	return;
 
 bad:
-	pr_err("corrupt generic reply, no tid\n");
+	pr_err("corrupt generic reply, tid %llu\n", tid);
 	ceph_msg_dump(msg);
 }
 
@@ -487,6 +516,7 @@ int ceph_monc_do_statfs(struct ceph_mon_client *monc, struct ceph_statfs *buf)
 
 	kref_init(&req->kref);
 	req->buf = buf;
+	req->buf_len = sizeof(*buf);
 	init_completion(&req->completion);
 
 	err = -ENOMEM;
@@ -504,33 +534,134 @@ int ceph_monc_do_statfs(struct ceph_mon_client *monc, struct ceph_statfs *buf)
 	h->monhdr.session_mon_tid = 0;
 	h->fsid = monc->monmap->fsid;
 
-	/* register request */
-	mutex_lock(&monc->mutex);
-	req->tid = ++monc->last_tid;
-	req->request->hdr.tid = cpu_to_le64(req->tid);
-	__insert_generic_request(monc, req);
-	monc->num_generic_requests++;
-	mutex_unlock(&monc->mutex);
+	err = do_generic_request(monc, req);
 
-	/* send request and wait */
-	ceph_con_send(monc->con, ceph_msg_get(req->request));
-	err = wait_for_completion_interruptible(&req->completion);
+out:
+	kref_put(&req->kref, release_generic_request);
+	return err;
+}
+
+/*
+ * pool ops
+ */
+static int get_poolop_reply_buf(const char *src, size_t src_len,
+				char *dst, size_t dst_len)
+{
+	u32 buf_len;
+
+	if (src_len != sizeof(u32) + dst_len)
+		return -EINVAL;
+
+	buf_len = le32_to_cpu(*(u32 *)src);
+	if (buf_len != dst_len)
+		return -EINVAL;
+
+	memcpy(dst, src + sizeof(u32), dst_len);
+	return 0;
+}
+
+static void handle_poolop_reply(struct ceph_mon_client *monc,
+				struct ceph_msg *msg)
+{
+	struct ceph_mon_generic_request *req;
+	struct ceph_mon_poolop_reply *reply = msg->front.iov_base;
+	u64 tid = le64_to_cpu(msg->hdr.tid);
+
+	if (msg->front.iov_len < sizeof(*reply))
+		goto bad;
+	dout("handle_poolop_reply %p tid %llu\n", msg, tid);
 
 	mutex_lock(&monc->mutex);
-	rb_erase(&req->node, &monc->generic_request_tree);
-	monc->num_generic_requests--;
+	req = __lookup_generic_req(monc, tid);
+	if (req) {
+		if (req->buf_len &&
+		    get_poolop_reply_buf(msg->front.iov_base + sizeof(*reply),
+				     msg->front.iov_len - sizeof(*reply),
+				     req->buf, req->buf_len) < 0) {
+			mutex_unlock(&monc->mutex);
+			goto bad;
+		}
+		req->result = le32_to_cpu(reply->reply_code);
+		get_generic_request(req);
+	}
 	mutex_unlock(&monc->mutex);
+	if (req) {
+		complete(&req->completion);
+		put_generic_request(req);
+	}
+	return;
 
-	if (!err)
-		err = req->result;
+bad:
+	pr_err("corrupt generic reply, tid %llu\n", tid);
+	ceph_msg_dump(msg);
+}
+
+/*
+ * Do a synchronous pool op.
+ */
+int ceph_monc_do_poolop(struct ceph_mon_client *monc, u32 op,
+			u32 pool, u64 snapid,
+			char *buf, int len)
+{
+	struct ceph_mon_generic_request *req;
+	struct ceph_mon_poolop *h;
+	int err;
+
+	req = kzalloc(sizeof(*req), GFP_NOFS);
+	if (!req)
+		return -ENOMEM;
+
+	kref_init(&req->kref);
+	req->buf = buf;
+	req->buf_len = len;
+	init_completion(&req->completion);
+
+	err = -ENOMEM;
+	req->request = ceph_msg_new(CEPH_MSG_POOLOP, sizeof(*h), GFP_NOFS);
+	if (!req->request)
+		goto out;
+	req->reply = ceph_msg_new(CEPH_MSG_POOLOP_REPLY, 1024, GFP_NOFS);
+	if (!req->reply)
+		goto out;
+
+	/* fill out request */
+	req->request->hdr.version = cpu_to_le16(2);
+	h = req->request->front.iov_base;
+	h->monhdr.have_version = 0;
+	h->monhdr.session_mon = cpu_to_le16(-1);
+	h->monhdr.session_mon_tid = 0;
+	h->fsid = monc->monmap->fsid;
+	h->pool = cpu_to_le32(pool);
+	h->op = cpu_to_le32(op);
+	h->auid = 0;
+	h->snapid = cpu_to_le64(snapid);
+	h->name_len = 0;
+
+	err = do_generic_request(monc, req);
 
 out:
 	kref_put(&req->kref, release_generic_request);
 	return err;
 }
 
+int ceph_monc_create_snapid(struct ceph_mon_client *monc,
+			    u32 pool, u64 *snapid)
+{
+	return ceph_monc_do_poolop(monc,  POOL_OP_CREATE_UNMANAGED_SNAP,
+				   pool, 0, (char *)snapid, sizeof(*snapid));
+
+}
+
+int ceph_monc_delete_snapid(struct ceph_mon_client *monc,
+			    u32 pool, u64 snapid)
+{
+	return ceph_monc_do_poolop(monc,  POOL_OP_CREATE_UNMANAGED_SNAP,
+				   pool, snapid, 0, 0);
+
+}
+
 /*
- * Resend pending statfs requests.
+ * Resend pending generic requests.
  */
 static void __resend_generic_request(struct ceph_mon_client *monc)
 {
@@ -782,6 +913,10 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
 		handle_statfs_reply(monc, msg);
 		break;
 
+	case CEPH_MSG_POOLOP_REPLY:
+		handle_poolop_reply(monc, msg);
+		break;
+
 	case CEPH_MSG_MON_MAP:
 		ceph_monc_handle_map(monc, msg);
 		break;
@@ -820,6 +955,7 @@ static struct ceph_msg *mon_alloc_msg(struct ceph_connection *con,
 	case CEPH_MSG_MON_SUBSCRIBE_ACK:
 		m = ceph_msg_get(monc->m_subscribe_ack);
 		break;
+	case CEPH_MSG_POOLOP_REPLY:
 	case CEPH_MSG_STATFS_REPLY:
 		return get_generic_reply(con, hdr, skip);
 	case CEPH_MSG_AUTH_REPLY:
diff --git a/fs/ceph/mon_client.h b/fs/ceph/mon_client.h
index 174d794..8e396f2 100644
--- a/fs/ceph/mon_client.h
+++ b/fs/ceph/mon_client.h
@@ -50,6 +50,7 @@ struct ceph_mon_generic_request {
 	struct rb_node node;
 	int result;
 	void *buf;
+	int buf_len;
 	struct completion completion;
 	struct ceph_msg *request;  /* original request */
 	struct ceph_msg *reply;    /* and reply */
@@ -111,6 +112,10 @@ extern int ceph_monc_open_session(struct ceph_mon_client *monc);
 
 extern int ceph_monc_validate_auth(struct ceph_mon_client *monc);
 
+extern int ceph_monc_create_snapid(struct ceph_mon_client *monc,
+				   u32 pool, u64 *snapid);
 
+extern int ceph_monc_delete_snapid(struct ceph_mon_client *monc,
+				   u32 pool, u64 snapid);
 
 #endif
-- 
1.5.6.5


^ permalink raw reply	[flat|nested] 11+ messages in thread

* [PATCH v3 07/10] ceph-rbd: some super.c changes for rbd
  2010-07-07 22:11 [PATCH v3 00/10] ceph-rbd: ceph RADOS block device Yehuda Sadeh
                   ` (5 preceding siblings ...)
  2010-07-07 22:11 ` [PATCH v3 06/10] ceph-rbd: generalize mon requests, add pool op support Yehuda Sadeh
@ 2010-07-07 22:11 ` Yehuda Sadeh
  2010-07-07 22:11 ` [PATCH v3 08/10] ceph-rbd: osdc support for osd call and rollback operations Yehuda Sadeh
                   ` (2 subsequent siblings)
  9 siblings, 0 replies; 11+ messages in thread
From: Yehuda Sadeh @ 2010-07-07 22:11 UTC (permalink / raw)
  To: linux-kernel; +Cc: ceph-devel, linux-fsdevel, sage, Yehuda Sadeh

 - add 'snap' mount option
 - add ceph_client_id helper

Signed-off-by: Yehuda Sadeh <yehuda@hq.newdream.net>
Signed-off-by: Sage Weil <sage@newdream.net>
---
 fs/ceph/super.c |   19 ++++++++++++++++++-
 fs/ceph/super.h |    2 ++
 2 files changed, 20 insertions(+), 1 deletions(-)

diff --git a/fs/ceph/super.c b/fs/ceph/super.c
index 43b1589..acff12f 100644
--- a/fs/ceph/super.c
+++ b/fs/ceph/super.c
@@ -342,6 +342,7 @@ enum {
 	Opt_snapdirname,
 	Opt_name,
 	Opt_secret,
+	Opt_snap,
 	Opt_last_string,
 	/* string args above */
 	Opt_ip,
@@ -374,6 +375,7 @@ static match_table_t arg_tokens = {
 	{Opt_snapdirname, "snapdirname=%s"},
 	{Opt_name, "name=%s"},
 	{Opt_secret, "secret=%s"},
+	{Opt_snap, "snap=%s"},
 	/* string args above */
 	{Opt_ip, "ip=%s"},
 	{Opt_noshare, "noshare"},
@@ -508,6 +510,11 @@ struct ceph_mount_args *parse_mount_args(int flags, char *options,
 						argstr[0].to-argstr[0].from,
 						GFP_KERNEL);
 			break;
+		case Opt_snap:
+			args->snap = kstrndup(argstr[0].from,
+					      argstr[0].to-argstr[0].from,
+					      GFP_KERNEL);
+			break;
 
 			/* misc */
 		case Opt_wsize:
@@ -582,6 +589,7 @@ void ceph_destroy_mount_args(struct ceph_mount_args *args)
 	kfree(args->snapdir_name);
 	kfree(args->name);
 	kfree(args->secret);
+	kfree(args->snap);
 	kfree(args);
 }
 
@@ -621,6 +629,10 @@ int ceph_compare_mount_args(struct ceph_mount_args *new_args,
 	if (ret)
 		return ret;
 
+	ret = strcmp_null(args1->snap, args2->snap);
+	if (ret)
+		return ret;
+
 	for (i = 0; i < args1->num_mon; i++) {
 		if (ceph_monmap_contains(client->monc.monmap,
 				 &args1->mon_addr[i]))
@@ -717,6 +729,11 @@ fail:
 	return ERR_PTR(err);
 }
 
+u64 ceph_client_id(struct ceph_client *client)
+{
+	return client->monc.auth->global_id;
+}
+
 void ceph_destroy_client(struct ceph_client *client)
 {
 	dout("destroy_client %p\n", client);
@@ -767,7 +784,7 @@ int ceph_check_fsid(struct ceph_client *client, struct ceph_fsid *fsid)
 		}
 	} else {
 		pr_info("client%lld fsid " FSID_FORMAT "\n",
-			client->monc.auth->global_id, PR_FSID(fsid));
+			ceph_client_id(client), PR_FSID(fsid));
 		memcpy(&client->fsid, fsid, sizeof(*fsid));
 		ceph_debugfs_client_init(client);
 		client->have_fsid = true;
diff --git a/fs/ceph/super.h b/fs/ceph/super.h
index ee67b81..67d5b75 100644
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -74,6 +74,7 @@ struct ceph_mount_args {
 	char *snapdir_name;   /* default ".snap" */
 	char *name;
 	char *secret;
+	char *snap;	/* rbd snapshot */
 };
 
 /*
@@ -751,6 +752,7 @@ extern int ceph_compare_mount_args(struct ceph_mount_args *new_args,
 			    struct ceph_client *client);
 extern struct ceph_client *ceph_create_client(struct ceph_mount_args *args,
 					      int need_mdsc);
+extern u64 ceph_client_id(struct ceph_client *client);
 extern void ceph_destroy_client(struct ceph_client *client);
 extern int ceph_open_session(struct ceph_client *client);
 
-- 
1.5.6.5


^ permalink raw reply	[flat|nested] 11+ messages in thread

* [PATCH v3 08/10] ceph-rbd: osdc support for osd call and rollback operations
  2010-07-07 22:11 [PATCH v3 00/10] ceph-rbd: ceph RADOS block device Yehuda Sadeh
                   ` (6 preceding siblings ...)
  2010-07-07 22:11 ` [PATCH v3 07/10] ceph-rbd: some super.c changes for rbd Yehuda Sadeh
@ 2010-07-07 22:11 ` Yehuda Sadeh
  2010-07-07 22:11 ` [PATCH v3 09/10] ceph-rbd: sync common header and source files with server version Yehuda Sadeh
  2010-07-07 22:11 ` [PATCH v3 10/10] ceph-rbd: rados block device Yehuda Sadeh
  9 siblings, 0 replies; 11+ messages in thread
From: Yehuda Sadeh @ 2010-07-07 22:11 UTC (permalink / raw)
  To: linux-kernel; +Cc: ceph-devel, linux-fsdevel, sage, Yehuda Sadeh

This will be used for rbd snapshots administration.

Signed-off-by: Yehuda Sadeh <yehuda@hq.newdream.net>
---
 fs/ceph/decode.h     |    5 +++++
 fs/ceph/osd_client.c |   22 ++++++++++++++++++++++
 fs/ceph/osd_client.h |    6 ++++++
 3 files changed, 33 insertions(+), 0 deletions(-)

diff --git a/fs/ceph/decode.h b/fs/ceph/decode.h
index 65b3e02..5f2d803 100644
--- a/fs/ceph/decode.h
+++ b/fs/ceph/decode.h
@@ -189,6 +189,11 @@ static inline void ceph_encode_string(void **p, void *end,
 		ceph_encode_need(p, end, n, bad);		\
 		ceph_encode_copy(p, pv, n);			\
 	} while (0)
+#define ceph_encode_string_safe(p, end, s, n, bad)		\
+	do {							\
+		ceph_encode_need(p, end, n, bad);		\
+		ceph_encode_string(p, end, s, n);		\
+	} while (0)
 
 
 #endif
diff --git a/fs/ceph/osd_client.c b/fs/ceph/osd_client.c
index 1cb9533..8b718cb 100644
--- a/fs/ceph/osd_client.c
+++ b/fs/ceph/osd_client.c
@@ -136,6 +136,7 @@ static int op_needs_trail(int op)
 	case CEPH_OSD_OP_GETXATTR:
 	case CEPH_OSD_OP_SETXATTR:
 	case CEPH_OSD_OP_CMPXATTR:
+	case CEPH_OSD_OP_CALL:
 		return 1;
 	default:
 		return 0;
@@ -286,6 +287,27 @@ static void osd_req_encode_op(struct ceph_osd_request *req,
 		dst->payload_len = cpu_to_le32(src->xattr.name_len +
 					       src->xattr.value_len);
 		break;
+	case CEPH_OSD_OP_CALL:
+		BUG_ON(!req->r_trail);
+
+		dst->cls.class_len = src->cls.class_len;
+		dst->cls.method_len = src->cls.method_len;
+		dst->cls.indata_len = cpu_to_le32(src->cls.indata_len);
+
+		ceph_pagelist_append(req->r_trail, src->cls.class_name,
+				     src->cls.class_len);
+		ceph_pagelist_append(req->r_trail, src->cls.method_name,
+				     src->cls.method_len);
+		ceph_pagelist_append(req->r_trail, src->cls.indata,
+				     src->cls.indata_len);
+		dst->payload_len = cpu_to_le32(src->cls.class_len + 
+					       src->cls.method_len +
+					       src->cls.indata_len);
+
+		break;
+	case CEPH_OSD_OP_ROLLBACK:
+		dst->snap.snapid = cpu_to_le64(src->snap.snapid);
+		break;
 	}
 	dst->payload_len = cpu_to_le32(src->payload_len);
 }
diff --git a/fs/ceph/osd_client.h b/fs/ceph/osd_client.h
index 38c4615..979efce 100644
--- a/fs/ceph/osd_client.h
+++ b/fs/ceph/osd_client.h
@@ -135,14 +135,20 @@ struct ceph_osd_req_op {
 			__u8 cmp_mode;     /* CEPH_OSD_CMPXATTR_MODE_* */
 		} xattr;
 		struct {
+			const char *class_name;
 			__u8 class_len;
+			const char *method_name;
 			__u8 method_len;
 			__u8 argc;
+			const char *indata;
 			u32 indata_len;
 		} cls;
 		struct {
 			u64 cookie, count;
 		} pgls;
+	        struct {
+		        u64 snapid;
+	        } snap;
 	};
 	u32 payload_len;
 };
-- 
1.5.6.5


^ permalink raw reply	[flat|nested] 11+ messages in thread

* [PATCH v3 09/10] ceph-rbd: sync common header and source files with server version
  2010-07-07 22:11 [PATCH v3 00/10] ceph-rbd: ceph RADOS block device Yehuda Sadeh
                   ` (7 preceding siblings ...)
  2010-07-07 22:11 ` [PATCH v3 08/10] ceph-rbd: osdc support for osd call and rollback operations Yehuda Sadeh
@ 2010-07-07 22:11 ` Yehuda Sadeh
  2010-07-07 22:11 ` [PATCH v3 10/10] ceph-rbd: rados block device Yehuda Sadeh
  9 siblings, 0 replies; 11+ messages in thread
From: Yehuda Sadeh @ 2010-07-07 22:11 UTC (permalink / raw)
  To: linux-kernel; +Cc: ceph-devel, linux-fsdevel, sage, Yehuda Sadeh

Signed-off-by: Yehuda Sadeh <yehuda@hq.newdream.net>
---
 fs/ceph/ceph_fs.c      |   50 +++++++++++++++++++++++------------------------
 fs/ceph/ceph_fs.h      |    8 +++---
 fs/ceph/ceph_strings.c |    1 +
 fs/ceph/rados.h        |    8 +++++++
 4 files changed, 37 insertions(+), 30 deletions(-)

diff --git a/fs/ceph/ceph_fs.c b/fs/ceph/ceph_fs.c
index 79d76bc..3ac6cc7 100644
--- a/fs/ceph/ceph_fs.c
+++ b/fs/ceph/ceph_fs.c
@@ -29,46 +29,44 @@ int ceph_file_layout_is_valid(const struct ceph_file_layout *layout)
 
 int ceph_flags_to_mode(int flags)
 {
+	int mode;
+
 #ifdef O_DIRECTORY  /* fixme */
 	if ((flags & O_DIRECTORY) == O_DIRECTORY)
 		return CEPH_FILE_MODE_PIN;
 #endif
+	if ((flags & O_APPEND) == O_APPEND)
+		flags |= O_WRONLY;
+
+	if ((flags & O_ACCMODE) == O_RDWR)
+		mode = CEPH_FILE_MODE_RDWR;
+	else if ((flags & O_ACCMODE) == O_WRONLY)
+		mode = CEPH_FILE_MODE_WR;
+	else
+		mode = CEPH_FILE_MODE_RD;
+
 #ifdef O_LAZY
 	if (flags & O_LAZY)
-		return CEPH_FILE_MODE_LAZY;
+		mode |= CEPH_FILE_MODE_LAZY;
 #endif
-	if ((flags & O_APPEND) == O_APPEND)
-		flags |= O_WRONLY;
 
-	flags &= O_ACCMODE;
-	if ((flags & O_RDWR) == O_RDWR)
-		return CEPH_FILE_MODE_RDWR;
-	if ((flags & O_WRONLY) == O_WRONLY)
-		return CEPH_FILE_MODE_WR;
-	return CEPH_FILE_MODE_RD;
+	return mode;
 }
 
 int ceph_caps_for_mode(int mode)
 {
-	switch (mode) {
-	case CEPH_FILE_MODE_PIN:
-		return CEPH_CAP_PIN;
-	case CEPH_FILE_MODE_RD:
-		return CEPH_CAP_PIN | CEPH_CAP_FILE_SHARED |
+	int caps = CEPH_CAP_PIN;
+
+	if (mode & CEPH_FILE_MODE_RD)
+		caps |= CEPH_CAP_FILE_SHARED |
 			CEPH_CAP_FILE_RD | CEPH_CAP_FILE_CACHE;
-	case CEPH_FILE_MODE_RDWR:
-		return CEPH_CAP_PIN | CEPH_CAP_FILE_SHARED |
-			CEPH_CAP_FILE_EXCL |
-			CEPH_CAP_FILE_RD | CEPH_CAP_FILE_CACHE |
-			CEPH_CAP_FILE_WR | CEPH_CAP_FILE_BUFFER |
-			CEPH_CAP_AUTH_SHARED | CEPH_CAP_AUTH_EXCL |
-			CEPH_CAP_XATTR_SHARED | CEPH_CAP_XATTR_EXCL;
-	case CEPH_FILE_MODE_WR:
-		return CEPH_CAP_PIN | CEPH_CAP_FILE_SHARED |
-			CEPH_CAP_FILE_EXCL |
+	if (mode & CEPH_FILE_MODE_WR)
+		caps |= CEPH_CAP_FILE_EXCL |
 			CEPH_CAP_FILE_WR | CEPH_CAP_FILE_BUFFER |
 			CEPH_CAP_AUTH_SHARED | CEPH_CAP_AUTH_EXCL |
 			CEPH_CAP_XATTR_SHARED | CEPH_CAP_XATTR_EXCL;
-	}
-	return 0;
+	if (mode & CEPH_FILE_MODE_LAZY)
+		caps |= CEPH_CAP_FILE_LAZYIO;
+
+	return caps;
 }
diff --git a/fs/ceph/ceph_fs.h b/fs/ceph/ceph_fs.h
index 2fa992e..d8b7e58 100644
--- a/fs/ceph/ceph_fs.h
+++ b/fs/ceph/ceph_fs.h
@@ -55,16 +55,15 @@
  */
 #define CEPH_FEATURE_UID        1
 #define CEPH_FEATURE_NOSRCADDR  2
-#define CEPH_FEATURE_FLOCK      4
 
 #define CEPH_FEATURE_SUPPORTED_MON  CEPH_FEATURE_UID|CEPH_FEATURE_NOSRCADDR
 #define CEPH_FEATURE_REQUIRED_MON   CEPH_FEATURE_UID
-#define CEPH_FEATURE_SUPPORTED_MDS  CEPH_FEATURE_UID|CEPH_FEATURE_NOSRCADDR|CEPH_FEATURE_FLOCK
+#define CEPH_FEATURE_SUPPORTED_MDS  CEPH_FEATURE_UID|CEPH_FEATURE_NOSRCADDR
 #define CEPH_FEATURE_REQUIRED_MDS   CEPH_FEATURE_UID
 #define CEPH_FEATURE_SUPPORTED_OSD  CEPH_FEATURE_UID|CEPH_FEATURE_NOSRCADDR
 #define CEPH_FEATURE_REQUIRED_OSD   CEPH_FEATURE_UID
 #define CEPH_FEATURE_SUPPORTED_CLIENT CEPH_FEATURE_NOSRCADDR
-#define CEPH_FEATURE_REQUIRED_CLIENT CEPH_FEATURE_NOSRCADDR
+#define CEPH_FEATURE_REQUIRED_CLIENT 0
 
 
 /*
@@ -563,7 +562,8 @@ int ceph_flags_to_mode(int flags);
 			      CEPH_CAP_FILE_EXCL)
 #define CEPH_CAP_ANY_WR   (CEPH_CAP_ANY_EXCL | CEPH_CAP_ANY_FILE_WR)
 #define CEPH_CAP_ANY      (CEPH_CAP_ANY_RD | CEPH_CAP_ANY_EXCL | \
-			   CEPH_CAP_ANY_FILE_WR | CEPH_CAP_PIN)
+			   CEPH_CAP_ANY_FILE_WR | CEPH_CAP_FILE_LAZYIO | \
+			   CEPH_CAP_PIN)
 
 #define CEPH_CAP_LOCKS (CEPH_LOCK_IFILE | CEPH_LOCK_IAUTH | CEPH_LOCK_ILINK | \
 			CEPH_LOCK_IXATTR)
diff --git a/fs/ceph/ceph_strings.c b/fs/ceph/ceph_strings.c
index 7503aee..0f943a0 100644
--- a/fs/ceph/ceph_strings.c
+++ b/fs/ceph/ceph_strings.c
@@ -28,6 +28,7 @@ const char *ceph_osd_op_name(int op)
 	case CEPH_OSD_OP_TRUNCATE: return "truncate";
 	case CEPH_OSD_OP_ZERO: return "zero";
 	case CEPH_OSD_OP_WRITEFULL: return "writefull";
+	case CEPH_OSD_OP_ROLLBACK: return "rollback";
 
 	case CEPH_OSD_OP_APPEND: return "append";
 	case CEPH_OSD_OP_STARTSYNC: return "startsync";
diff --git a/fs/ceph/rados.h b/fs/ceph/rados.h
index 8fcc023..ccc27fc 100644
--- a/fs/ceph/rados.h
+++ b/fs/ceph/rados.h
@@ -203,6 +203,7 @@ enum {
 	CEPH_OSD_OP_TMAPGET = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 12,
 
 	CEPH_OSD_OP_CREATE  = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 13,
+	CEPH_OSD_OP_ROLLBACK= CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 14,
 
 	/** attrs **/
 	/* read */
@@ -272,6 +273,10 @@ static inline int ceph_osd_op_mode_modify(int op)
 	return (op & CEPH_OSD_OP_MODE) == CEPH_OSD_OP_MODE_WR;
 }
 
+/*
+ * note that the following tmap stuff is also defined in the ceph librados.h
+ * any modification here needs to be updated there
+ */
 #define CEPH_OSD_TMAP_HDR 'h'
 #define CEPH_OSD_TMAP_SET 's'
 #define CEPH_OSD_TMAP_RM  'r'
@@ -350,6 +355,9 @@ struct ceph_osd_op {
 		struct {
 			__le64 cookie, count;
 		} __attribute__ ((packed)) pgls;
+	        struct {
+		        __le64 snapid;
+	        } __attribute__ ((packed)) snap;
 	};
 	__le32 payload_len;
 } __attribute__ ((packed));
-- 
1.5.6.5


^ permalink raw reply	[flat|nested] 11+ messages in thread

* [PATCH v3 10/10] ceph-rbd: rados block device
  2010-07-07 22:11 [PATCH v3 00/10] ceph-rbd: ceph RADOS block device Yehuda Sadeh
                   ` (8 preceding siblings ...)
  2010-07-07 22:11 ` [PATCH v3 09/10] ceph-rbd: sync common header and source files with server version Yehuda Sadeh
@ 2010-07-07 22:11 ` Yehuda Sadeh
  9 siblings, 0 replies; 11+ messages in thread
From: Yehuda Sadeh @ 2010-07-07 22:11 UTC (permalink / raw)
  To: linux-kernel; +Cc: ceph-devel, linux-fsdevel, sage, Yehuda Sadeh

This adds the rbd under fs/ceph. The rados block device is
based on the drivers/block/osdblk.c. It allows mapping of
rados (ther ceph block layer) objects over a block device.
Each device has a metadata object, and data is being striped
across different objects.

This includes adds snapshots capabilities to the ceph-rbd.
The snapshots can be created per a single rbd-image, and the
relevant snapshot information is being kept in the rbd header.

Signed-off-by: Yehuda Sadeh <yehuda@hq.newdream.net>
Signed-off-by: Sage Weil <sage@newdream.net>
---
 fs/ceph/Kconfig     |   10 +
 fs/ceph/Makefile    |    2 +-
 fs/ceph/README      |    1 +
 fs/ceph/rbd.c       | 1835 +++++++++++++++++++++++++++++++++++++++++++++++++++
 fs/ceph/rbd.h       |    8 +
 fs/ceph/rbd_types.h |   71 ++
 fs/ceph/super.c     |    8 +
 fs/ceph/super.h     |    5 +
 8 files changed, 1939 insertions(+), 1 deletions(-)

diff --git a/fs/ceph/Kconfig b/fs/ceph/Kconfig
index 04b8280..4c49d18 100644
--- a/fs/ceph/Kconfig
+++ b/fs/ceph/Kconfig
@@ -25,3 +25,13 @@ config CEPH_FS_PRETTYDEBUG
 
 	  If unsure, say N.
 
+config CEPH_RBD
+	bool "Rados block device (RBD)"
+	depends on CEPH_FS
+	select  CONFIG_BLOCK
+	default y
+	help
+	  If you say Y here, ceph will include rbd, the RADOS block
+	  device which stripes a block device over objects stored in
+	  the Ceph distributed object store.
+
diff --git a/fs/ceph/Makefile b/fs/ceph/Makefile
index 6a660e6..33eda3a 100644
--- a/fs/ceph/Makefile
+++ b/fs/ceph/Makefile
@@ -16,7 +16,7 @@ ceph-objs := super.o inode.o dir.o file.o addr.o ioctl.o \
 	auth.o auth_none.o \
 	crypto.o armor.o \
 	auth_x.o \
-	ceph_fs.o ceph_strings.o ceph_hash.o ceph_frag.o
+	ceph_fs.o ceph_strings.o ceph_hash.o ceph_frag.o rbd.o
 
 else
 #Otherwise we were called directly from the command
diff --git a/fs/ceph/README b/fs/ceph/README
index 18352fa..e2046ec 100644
--- a/fs/ceph/README
+++ b/fs/ceph/README
@@ -7,6 +7,7 @@ src/include/ceph_fs.h	    fs/ceph/ceph_fs.h
 src/include/ceph_fs.cc	    fs/ceph/ceph_fs.c
 src/include/msgr.h	    fs/ceph/msgr.h
 src/include/rados.h	    fs/ceph/rados.h
+src/include/rbd_types.h	    fs/ceph/rbd_types.h
 src/include/ceph_strings.cc fs/ceph/ceph_strings.c
 src/include/ceph_frag.h	    fs/ceph/ceph_frag.h
 src/include/ceph_frag.cc    fs/ceph/ceph_frag.c
diff --git a/fs/ceph/rbd.c b/fs/ceph/rbd.c
new file mode 100644
index 0000000..092f26b
--- /dev/null
+++ b/fs/ceph/rbd.c
@@ -0,0 +1,1835 @@
+/*
+   rbd.c -- Export ceph rados objects as a Linux block device
+
+
+   based on drivers/block/osdblk.c:
+
+   Copyright 2009 Red Hat, Inc.
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; see the file COPYING.  If not, write to
+   the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
+
+
+
+   Instructions for use
+   --------------------
+
+   1) Map a Linux block device to an existing rbd image.
+
+      Usage: <mon ip addr> <options> <pool name> <rbd image name>
+
+      $ echo "192.168.0.1 name=admin rbd foo" > /sys/class/rbd/add
+
+
+   2) List all active blkdev<->object mappings.
+
+      In this example, we have performed step #1 twice, creating two blkdevs,
+      mapped to two separate rados objects in the rados rbd pool
+
+      $ cat /sys/class/rbd/list
+      0 254 rbd foo
+      1 253 rbd bar
+
+      The columns, in order, are:
+      - blkdev unique id
+      - blkdev assigned major
+      - rados pool name
+      - rados block device name
+
+
+   3) Create a snapshot.
+
+      Usage: <blkdev id> <snapname>
+
+      $ echo "0 mysnap" > /sys/class/rbd/snap_create
+
+
+   4) Listing a snapshot.
+
+      $ cat /sys/class/rbd/snaps_list
+
+
+   5) Rollback to snapshot.
+
+      Usage: <blkdev id> <snapname>
+
+      $ echo "0 mysnap" > /sys/class/rbd/snap_rollback
+
+
+   6) Mapping an image using snapshot.
+
+      A snapshot mapping is read-only. This is being done by passing
+      snap=<snapname> to the options when adding a device.
+
+      $ echo "192.168.0.1 name=admin,snap=mysnap rbd foo" > /sys/class/rbd/add
+
+
+   7) Remove an active blkdev<->rbd image mapping.
+
+      In this example, we remove the mapping with blkdev unique id 1.
+
+      $ echo 1 > /sys/class/rbd/remove
+
+
+   NOTE:  The actual creation and deletion of rados objects is outside the scope
+   of this driver.
+
+ */
+
+#ifdef CONFIG_CEPH_RBD
+
+#include "super.h"
+#include "osd_client.h"
+#include "rbd_types.h"
+#include "mon_client.h"
+#include "decode.h"
+
+#include <linux/kernel.h>
+#include <linux/device.h>
+#include <linux/module.h>
+#include <linux/fs.h>
+#include <linux/blkdev.h>
+
+#define DRV_NAME "rbd"
+#define DRV_NAME_LONG "rbd (rados block device)"
+
+enum {
+	RBD_MINORS_PER_MAJOR	= 256,		/* max minors per blkdev */
+};
+
+#define RBD_MAX_MD_NAME_SIZE	(96 + sizeof(RBD_SUFFIX))
+#define RBD_MAX_POOL_NAME_SIZE	64
+
+#define RBD_STRIPE_UNIT (1 << 22)
+
+#define RBD_MAX_OPT_LEN		1024
+#define RBD_MAX_SNAP_NAME_LEN	32
+
+#define RBD_SNAP_HEAD_NAME	"head"
+
+struct rbd_obj_header {
+	u64 image_size;
+	char block_name[32];
+	__u8 obj_order;
+	__u8 crypt_type;
+	__u8 comp_type;
+	struct rw_semaphore snap_rwsem;
+	struct ceph_snap_context *snapc;
+	size_t snap_names_len;
+	u64 snap_seq;
+	u32 total_snaps;
+
+	char *snap_names;
+	u64 *snap_sizes;
+};
+
+struct rbd_request {
+	struct request		*rq;		/* blk layer request */
+	struct bio		*bio;		/* cloned bio */
+	struct page		**pages;	/* list of used pages */
+	u64			len;
+};
+
+struct rbd_client_node {
+	struct ceph_client	*client;
+	const char		*opt;
+	struct kref		kref;
+	struct list_head	node;
+};
+
+#define DEV_NAME_LEN		32
+
+struct rbd_device {
+	int			id;		/* blkdev unique id */
+
+	int			major;		/* blkdev assigned major */
+	struct gendisk		*disk;		/* blkdev's gendisk and rq */
+	struct request_queue	*q;
+
+	struct ceph_client	*client;	/* associated OSD */
+
+	char			name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
+
+	spinlock_t		lock;		/* queue lock */
+
+	struct rbd_obj_header	header;
+	char			obj[RBD_MAX_OBJ_NAME_SIZE]; /* rbd image name */
+	int			obj_len;
+	char			obj_md_name[RBD_MAX_MD_NAME_SIZE]; /* rbd image header name */
+	char			pool_name[RBD_MAX_POOL_NAME_SIZE];
+	int			poolid;
+
+	u32 cur_snap;	/* index+1 of current snapshot within snap context
+			   0 - for the head */
+	int read_only;
+
+	struct list_head	node;
+	struct rbd_client_node	*client_node;
+};
+
+static spinlock_t node_lock; /* protects client get/put */
+
+static struct class *class_rbd;		/* /sys/class/rbd */
+static DEFINE_MUTEX(ctl_mutex);	/* Serialize open/close/setup/teardown */
+static LIST_HEAD(rbddev_list);
+static LIST_HEAD(node_list);
+
+
+static int rbd_open(struct block_device *bdev, fmode_t mode)
+{
+	struct gendisk *disk = bdev->bd_disk;
+	struct rbd_device *rbd_dev = disk->private_data;
+
+	set_device_ro(bdev, rbd_dev->read_only);
+
+	if (mode & FMODE_WRITE && rbd_dev->read_only)
+		return -EROFS;
+
+	return 0;
+}
+
+static const struct block_device_operations rbd_bd_ops = {
+	.owner			= THIS_MODULE,
+	.open			= rbd_open,
+};
+
+/*
+ * Initialize ceph client for a specific device.
+ */
+static int rbd_init_client(struct rbd_device *rbd_dev,
+			   struct ceph_mount_args *args)
+{
+	struct ceph_osd_client *osdc;
+	int ret;
+	dout("rbd_init_device\n");
+	rbd_dev->client = ceph_create_client(args, 0);
+	if (IS_ERR(rbd_dev->client))
+		return PTR_ERR(rbd_dev->client);
+
+	ret = ceph_open_session(rbd_dev->client);
+	if (ret < 0)
+		goto done_err;
+
+	osdc = &rbd_dev->client->osdc;
+	ret = ceph_pg_poolid_by_name(osdc->osdmap,
+				     rbd_dev->pool_name);
+	if (ret < 0)
+		goto done_err;
+
+	rbd_dev->poolid = ret;
+	return 0;
+
+done_err:
+	ceph_destroy_client(rbd_dev->client);
+	rbd_dev->client = NULL;
+	return ret;
+}
+
+/*
+ * Find a ceph client with specific addr and configuration.
+ */
+static struct rbd_client_node *__get_client_node(struct ceph_mount_args *args)
+{
+	struct rbd_client_node *client_node;
+
+	if (args->flags & CEPH_OPT_NOSHARE)
+		return NULL;
+
+	list_for_each_entry(client_node, &node_list, node)
+		if (ceph_compare_mount_args(args, client_node->client) == 0)
+			return client_node;
+	return NULL;
+}
+
+/*
+ * Get a ceph client with specific addr and configuration, if one does
+ * not exist create it.
+ */
+static int rbd_get_client(struct rbd_device *rbd_dev, const char *mon_addr,
+			  char *opt)
+{
+	struct rbd_client_node *client_node;
+	struct ceph_mount_args *args;
+	int ret;
+
+
+	args = parse_mount_args(0, opt, mon_addr, NULL);
+	if (IS_ERR(args))
+		return PTR_ERR(args);
+
+	spin_lock(&node_lock);
+
+	client_node = __get_client_node(args);
+	if (client_node) {
+		ceph_destroy_mount_args(args);
+
+		kref_get(&client_node->kref);
+		rbd_dev->client_node = client_node;
+		rbd_dev->client = client_node->client;
+		spin_unlock(&node_lock);
+		return 0;
+	}
+
+	spin_unlock(&node_lock);
+
+	ret = -ENOMEM;
+	client_node = kmalloc(sizeof(struct rbd_client_node), GFP_KERNEL);
+	if (!client_node)
+		goto out_args;
+
+	ret = rbd_init_client(rbd_dev, args);
+	if (ret < 0)
+		goto out_free;
+
+	client_node->client = rbd_dev->client;
+	client_node->opt = kstrdup(opt, GFP_KERNEL);
+	kref_init(&client_node->kref);
+	INIT_LIST_HEAD(&client_node->node);
+
+	rbd_dev->client_node = client_node;
+
+	spin_lock(&node_lock);
+	list_add_tail(&client_node->node, &node_list);
+	spin_unlock(&node_lock);
+
+	return 0;
+
+out_free:
+	kfree(client_node);
+out_args:
+	ceph_destroy_mount_args(args);
+	return ret;
+}
+
+/*
+ * Destroy ceph client
+ */
+static void rbd_release_client(struct kref *kref)
+{
+	struct rbd_client_node *node =
+			container_of(kref, struct rbd_client_node, kref);
+
+	dout("rbd_release_client\n");
+
+	spin_lock(&node_lock);
+	list_del(&node->node);
+	spin_unlock(&node_lock);
+
+	ceph_destroy_client(node->client);
+	kfree(node->opt);
+	kfree(node);
+}
+
+/*
+ * Drop reference to ceph client node. If it's not referenced anymore, release
+ * it.
+ */
+static void rbd_put_client(struct rbd_device *rbd_dev)
+{
+	if (!rbd_dev->client_node)
+		return;
+
+	kref_put(&rbd_dev->client_node->kref, rbd_release_client);
+	rbd_dev->client_node = NULL;
+}
+
+static int snap_index(struct rbd_obj_header *header, int snap_num)
+{
+	return header->total_snaps - snap_num;
+}
+
+static u64 cur_snap_id(struct rbd_device *rbd_dev)
+{
+	struct rbd_obj_header *header = &rbd_dev->header;
+
+	if (!rbd_dev->cur_snap)
+		return 0;
+
+	return header->snapc->snaps[snap_index(header, rbd_dev->cur_snap)];
+}
+
+
+/*
+ * Create a new header structure, translate header format from the on-disk
+ * header.
+ */
+static int rbd_header_from_disk(struct rbd_obj_header *header,
+				 struct rbd_obj_header_ondisk *ondisk,
+				 int allocated_snaps,
+				 gfp_t gfp_flags)
+{
+	int i;
+	u32 snap_count = le32_to_cpu(ondisk->snap_count);
+	int ret = -ENOMEM;
+
+	init_rwsem(&header->snap_rwsem);
+
+	header->snap_names_len = le64_to_cpu(ondisk->snap_names_len);
+	header->snapc = kmalloc(sizeof(struct ceph_snap_context) +
+				snap_count *
+					sizeof(struct rbd_obj_snap_ondisk),
+				gfp_flags);
+	if (!header->snapc)
+		return -ENOMEM;
+	if (snap_count) {
+		header->snap_names = kmalloc(header->snap_names_len,
+					     GFP_KERNEL);
+		if (!header->snap_names)
+			goto err_snapc;
+		header->snap_sizes = kmalloc(snap_count * sizeof(u64),
+					     GFP_KERNEL);
+		if (!header->snap_sizes)
+			goto err_names;
+	} else {
+		header->snap_names = NULL;
+		header->snap_sizes = NULL;
+	}
+	memcpy(header->block_name, ondisk->block_name, sizeof(ondisk->block_name));
+
+	header->image_size = le64_to_cpu(ondisk->image_size);
+	header->obj_order = ondisk->options.order;
+	header->crypt_type = ondisk->options.crypt_type;
+	header->comp_type = ondisk->options.comp_type;
+
+	atomic_set(&header->snapc->nref, 1);
+	header->snap_seq = le64_to_cpu(ondisk->snap_seq);
+	header->snapc->num_snaps = snap_count;
+	header->total_snaps = snap_count;
+
+	if (snap_count &&
+	    allocated_snaps == snap_count) {
+		for (i = 0; i < snap_count; i++) {
+			header->snapc->snaps[i] =
+				le64_to_cpu(ondisk->snaps[i].id);
+			header->snap_sizes[i] =
+				le64_to_cpu(ondisk->snaps[i].image_size);
+		}
+
+		/* copy snapshot names */
+		memcpy(header->snap_names, &ondisk->snaps[i],
+			header->snap_names_len);
+	}
+
+	return 0;
+
+err_names:
+	kfree(header->snap_names);
+err_snapc:
+	kfree(header->snapc);
+	return ret;
+}
+
+static int snap_by_name(struct rbd_obj_header *header, const char *snap_name,
+			u64 *seq, u64 *size)
+{
+	int i;
+	char *p = header->snap_names;
+
+	for (i = 0; i < header->total_snaps; i++, p += strlen(p) + 1) {
+		if (strcmp(snap_name, p) == 0)
+			break;
+	}
+	if (i == header->total_snaps)
+		return -ENOENT;
+	if (seq)
+		*seq = header->snapc->snaps[i];
+
+	if (size)
+		*size = header->snap_sizes[i];
+
+	return i;
+}
+
+static int rbd_header_set_snap(struct rbd_device *dev,
+			       const char *snap_name,
+			       u64 *size)
+{
+	struct rbd_obj_header *header = &dev->header;
+	struct ceph_snap_context *snapc = header->snapc;
+	int ret = -ENOENT;
+
+	down_write(&header->snap_rwsem);
+
+	if (!snap_name ||
+	    !*snap_name ||
+	    strcmp(snap_name, "-") == 0 ||
+	    strcmp(snap_name, RBD_SNAP_HEAD_NAME) == 0) {
+		if (header->total_snaps)
+			snapc->seq = header->snap_seq;
+		else
+			snapc->seq = 0;
+		dev->cur_snap = 0;
+		dev->read_only = 0;
+		if (size)
+			*size = header->image_size;
+	} else {
+		ret = snap_by_name(header, snap_name, &snapc->seq, size);
+		if (ret < 0)
+			goto done;
+
+		dev->cur_snap = header->total_snaps - ret;
+		dev->read_only = 1;
+	}
+
+	ret = 0;
+done:
+	up_write(&header->snap_rwsem);
+	return ret;
+}
+
+static void rbd_header_free(struct rbd_obj_header *header)
+{
+	kfree(header->snapc);
+	kfree(header->snap_names);
+	kfree(header->snap_sizes);
+}
+
+/*
+ * get the actual striped segment name, offset and length
+ */
+static u64 rbd_get_segment(struct rbd_obj_header *header,
+			   const char *block_name,
+			   u64 ofs, u64 len,
+			   char *seg_name, u64 *segofs)
+{
+	u64 seg = ofs >> header->obj_order;
+
+	if (seg_name)
+		snprintf(seg_name, RBD_MAX_SEG_NAME_SIZE,
+			 "%s.%012llx", block_name, seg);
+
+	ofs = ofs & ((1 << header->obj_order) - 1);
+	len = min_t(u64, len, (1 << header->obj_order) - ofs);
+
+	if (segofs)
+		*segofs = ofs;
+
+	return len;
+}
+
+static void bio_chain_put(struct bio *chain)
+{
+	struct bio *tmp;
+
+	while (chain) {
+		tmp = chain;
+		chain = chain->bi_next;
+
+		bio_put(tmp);
+	}
+}
+
+/*
+ * zeros a bio chain, starting at specific offset
+ */
+static void zero_bio_chain(struct bio *chain, int start_ofs)
+{
+	struct bio_vec *bv;
+	unsigned long flags;
+	void *buf;
+	int i;
+	int pos = 0;
+
+	while (chain) {
+		bio_for_each_segment(bv, chain, i) {
+			if (pos + bv->bv_len > start_ofs) {
+				int remainder = max(start_ofs - pos, 0);
+				buf = bvec_kmap_irq(bv, &flags);
+				memset(buf + remainder, 0,
+				       bv->bv_len - remainder);
+				bvec_kunmap_irq(bv, &flags);
+			}
+			pos += bv->bv_len;
+		}
+
+		chain = chain->bi_next;
+	}
+}
+
+/*
+ * bio_chain_clone - clone a chain of bios up to a certain length.
+ * might return a bio_pair that will need to be released.
+ */
+static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
+				   struct bio_pair **bp,
+				   int len, gfp_t gfpmask)
+{
+	struct bio *tmp, *old_chain = *old, *new_chain = NULL, *tail = NULL;
+	int total = 0;
+
+	if (*bp) {
+		bio_pair_release(*bp);
+		*bp = NULL;
+	}
+
+	while (old_chain && (total < len)) {
+		tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
+		if (!tmp)
+			goto err_out;
+
+		if (total + old_chain->bi_size > len) {
+			struct bio_pair *bp;
+
+			/*
+			 * this split can only happen with a single paged bio,
+			 * split_bio will BUG_ON if this is not the case
+			 */
+			dout("bio_chain_clone split! total=%d remaining=%d"
+			     "bi_size=%d\n",
+			     (int)total, (int)len-total,
+			     (int)old_chain->bi_size);
+
+			/* split the bio. We'll release it either in the next
+			   call, or it will have to be released outside */
+			bp = bio_split(old_chain, (len - total) / 512ULL);
+			if (!bp)
+				goto err_out;
+
+			__bio_clone(tmp, &bp->bio1);
+
+			*next = &bp->bio2;
+		} else {
+			__bio_clone(tmp, old_chain);
+			*next = old_chain->bi_next;
+		}
+
+		tmp->bi_bdev = NULL;
+		gfpmask &= ~__GFP_WAIT;
+		tmp->bi_next = NULL;
+
+		if (!new_chain) {
+			new_chain = tail = tmp;
+		} else {
+			tail->bi_next = tmp;
+			tail = tmp;
+		}
+		old_chain = old_chain->bi_next;
+
+		total += tmp->bi_size;
+	}
+
+	BUG_ON(total < len);
+
+	if (tail)
+		tail->bi_next = NULL;
+
+	*old = old_chain;
+
+	return new_chain;
+
+err_out:
+	dout("bio_chain_clone with err\n");
+	bio_chain_put(new_chain);
+	return NULL;
+}
+
+static int rbd_create_rw_ops(struct ceph_osd_req_op **ops,
+			    int num_ops,
+			    int opcode,
+			    u64 ofs, u64 len,
+			    u32 payload_len)
+{
+	int i;
+	*ops = kmalloc(sizeof(struct ceph_osd_req_op) * (num_ops + 1),
+		       GFP_NOIO);
+	if (!*ops)
+		return -ENOMEM;
+
+	(*ops)[0].op = opcode;
+	(*ops)[0].extent.offset = ofs;
+	(*ops)[0].extent.length = len;
+	(*ops)[0].extent.truncate_seq = 0;
+	(*ops)[0].extent.truncate_size = 0;
+	(*ops)[0].payload_len = payload_len;
+	for (i = 1; i <= num_ops; i++)
+		(*ops)[i].op = 0;
+
+	return 0;
+}
+
+static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
+{
+	kfree(ops);
+}
+
+/*
+ * Send ceph osd request
+ */
+static int rbd_do_request(struct request *rq,
+			  struct rbd_device *dev,
+			  struct ceph_snap_context *snapc,
+			  u64 snapid,
+			  const char *obj, u64 ofs, u64 len,
+			  struct bio *bio,
+			  struct page **pages,
+			  int num_pages,
+			  int flags,
+			  struct ceph_osd_req_op *ops,
+			  int num_reply,
+			  void (*rbd_cb)(struct ceph_osd_request *req,
+					 struct ceph_msg *msg))
+{
+	struct ceph_osd_request *req;
+	struct ceph_file_layout *layout;
+	int ret;
+	u64 bno;
+	struct timespec mtime = CURRENT_TIME;
+	struct rbd_request *req_data;
+	struct ceph_osd_request_head *reqhead;
+	struct rbd_obj_header *header = &dev->header;
+
+	ret = -ENOMEM;
+	req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
+	if (!req_data)
+		goto done;
+
+	dout("rbd_do_request len=%lld ofs=%lld\n", len, ofs);
+
+	down_read(&header->snap_rwsem);
+
+	req = ceph_osdc_alloc_request(&dev->client->osdc, flags,
+				      snapc,
+				      ops,
+				      false,
+				      GFP_NOIO, pages, bio);
+	if (IS_ERR(req)) {
+		up_read(&header->snap_rwsem);
+		ret = PTR_ERR(req);
+		goto done_pages;
+	}
+
+	req->r_callback = rbd_cb;
+
+	req_data->rq = rq;
+	req_data->bio = bio;
+	req_data->pages = pages;
+	req_data->len = len;
+
+	req->r_priv = req_data;
+
+	reqhead = req->r_request->front.iov_base;
+	reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
+
+	strncpy(req->r_oid, obj, sizeof(req->r_oid));
+	req->r_oid_len = strlen(req->r_oid);
+
+	layout = &req->r_file_layout;
+	memset(layout, 0, sizeof(*layout));
+	layout->fl_stripe_unit = RBD_STRIPE_UNIT;
+	layout->fl_stripe_count = 1;
+	layout->fl_object_size = RBD_STRIPE_UNIT;
+	layout->fl_pg_preferred = -1;
+	layout->fl_pg_pool = dev->poolid;
+	ceph_calc_raw_layout(&dev->client->osdc, layout, snapid,
+			     ofs, &len, &bno, req);
+
+	ceph_osdc_build_request(req, ofs, &len,
+				ops,
+				snapc,
+				&mtime,
+				req->r_oid, req->r_oid_len);
+	up_read(&header->snap_rwsem);
+
+	ret = ceph_osdc_start_request(&dev->client->osdc, req, false);
+	if (ret < 0)
+		goto done_err;
+
+	if (!rbd_cb) {
+		ret = ceph_osdc_wait_request(&dev->client->osdc, req);
+		ceph_osdc_put_request(req);
+	}
+	return ret;
+
+done_err:
+	bio_chain_put(req_data->bio);
+	ceph_osdc_put_request(req);
+done_pages:
+	kfree(req_data);
+done:
+	if (rq)
+		blk_end_request(rq, ret, len);
+	return ret;
+}
+
+/*
+ * Ceph osd op callback
+ */
+static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
+{
+	struct rbd_request *req_data = req->r_priv;
+	struct ceph_osd_reply_head *replyhead;
+	struct ceph_osd_op *op;
+	__s32 rc;
+	u64 bytes;
+	int read_op;
+
+	/* parse reply */
+	replyhead = msg->front.iov_base;
+	WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
+	op = (void *)(replyhead + 1);
+	rc = le32_to_cpu(replyhead->result);
+	bytes = le64_to_cpu(op->extent.length);
+
+	dout("rbd_req_cb bytes=%lld rc=%d\n", bytes, rc);
+
+	read_op = (le32_to_cpu(op->op) == CEPH_OSD_OP_READ);
+
+	if (rc == -ENOENT && read_op) {
+		zero_bio_chain(req_data->bio, 0);
+		rc = 0;
+	} else if (rc == 0 && read_op && bytes < req_data->len) {
+		zero_bio_chain(req_data->bio, bytes);
+		bytes = req_data->len;
+	}
+
+	blk_end_request(req_data->rq, rc, bytes);
+
+	if (req_data->bio)
+		bio_chain_put(req_data->bio);
+
+	ceph_osdc_put_request(req);
+	kfree(req_data);
+}
+
+/*
+ * Do a synchronous ceph osd operation
+ */
+static int rbd_req_sync_op(struct rbd_device *dev,
+			   struct ceph_snap_context *snapc,
+			   u64 snapid,
+			   int opcode,
+			   int flags,
+			   struct ceph_osd_req_op *orig_ops,
+			   int num_reply,
+			   const char *obj,
+			   u64 ofs, u64 len,
+			   char *buf)
+{
+	int ret;
+	struct page **pages;
+	int num_pages;
+	struct ceph_osd_req_op *ops = orig_ops;
+	u32 payload_len;
+
+	num_pages = calc_pages_for(ofs , len);
+	pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
+	if (!pages)
+		return -ENOMEM;
+
+	if (!orig_ops) {
+		payload_len = (flags & CEPH_OSD_FLAG_WRITE ? len : 0);
+		ret = rbd_create_rw_ops(&ops, 1, opcode, ofs, len, payload_len);
+		if (ret < 0)
+			goto done;
+
+		if ((flags & CEPH_OSD_FLAG_WRITE) && buf) {
+			ret = ceph_copy_to_page_vector(pages, buf, ofs, len);
+			if (ret < 0)
+				goto done_ops;
+		}
+	}
+
+	
+	ret = rbd_do_request(NULL, dev, snapc, snapid,
+			  obj, ofs, len, NULL,
+			  pages, num_pages,
+			  flags,
+			  ops,
+			  2,
+			  NULL);
+	if (ret < 0)
+		goto done_ops;
+
+	if ((flags & CEPH_OSD_FLAG_READ) && buf)
+		ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
+
+done_ops:
+	if (!orig_ops)
+		rbd_destroy_ops(ops);
+done:
+	ceph_release_page_vector(pages, num_pages);
+	return ret;
+}
+
+/*
+ * Do an asynchronous ceph osd operation
+ */
+static int rbd_do_op(struct request *rq,
+		     struct rbd_device *rbd_dev ,
+		     struct ceph_snap_context *snapc,
+		     u64 snapid,
+		     int opcode, int flags, int num_reply,
+		     u64 ofs, u64 len,
+		     struct bio *bio)
+{
+	char *seg_name;
+	u64 seg_ofs;
+	u64 seg_len;
+	int ret;
+	struct ceph_osd_req_op *ops;
+	u32 payload_len;
+
+	seg_name = kmalloc(RBD_MAX_SEG_NAME_SIZE + 1, GFP_NOIO);
+	if (!seg_name)
+		return -ENOMEM;
+
+	seg_len = rbd_get_segment(&rbd_dev->header,
+				  rbd_dev->header.block_name,
+				  ofs, len,
+				  seg_name, &seg_ofs);
+	if (seg_len < 0)
+		return seg_len;
+
+	payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
+
+	ret = rbd_create_rw_ops(&ops, 1, opcode, seg_ofs, seg_len,
+				payload_len);
+	if (ret < 0)
+		goto done;
+
+	/* we've taken care of segment sizes earlier when we
+	   cloned the bios. We should never have a segment
+	   truncated at this point */
+	BUG_ON(seg_len < len);
+
+	ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
+			     seg_name, seg_ofs, seg_len,
+			     bio,
+			     NULL, 0,
+			     flags,
+			     ops,
+			     num_reply,
+			     rbd_req_cb);
+done:
+	kfree(seg_name);
+	return ret;
+}
+
+/*
+ * Request async osd write
+ */
+static int rbd_req_write(struct request *rq,
+			 struct rbd_device *rbd_dev,
+			 struct ceph_snap_context *snapc,
+			 u64 ofs, u64 len,
+			 struct bio *bio)
+{
+	return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
+			 CEPH_OSD_OP_WRITE,
+			 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
+			 2,
+			 ofs, len, bio);
+}
+
+/*
+ * Request async osd read
+ */
+static int rbd_req_read(struct request *rq,
+			 struct rbd_device *rbd_dev,
+			 u64 snapid,
+			 u64 ofs, u64 len,
+			 struct bio *bio)
+{
+	return rbd_do_op(rq, rbd_dev, NULL,
+			 (snapid ? snapid : CEPH_NOSNAP),
+			 CEPH_OSD_OP_READ,
+			 CEPH_OSD_FLAG_READ,
+			 2,
+			 ofs, len, bio);
+}
+
+/*
+ * Request sync osd read
+ */
+static int rbd_req_sync_read(struct rbd_device *dev,
+			  struct ceph_snap_context *snapc,
+			  u64 snapid,
+			  const char *obj,
+			  u64 ofs, u64 len,
+			  char *buf)
+{
+	return rbd_req_sync_op(dev, NULL,
+			       (snapid ? snapid : CEPH_NOSNAP),
+			       CEPH_OSD_OP_READ,
+			       CEPH_OSD_FLAG_READ,
+			       NULL,
+			       1, obj, ofs, len, buf);
+}
+
+/*
+ * Request sync osd read
+ */
+static int rbd_req_sync_rollback_obj(struct rbd_device *dev,
+				     u64 snapid,
+				     const char *obj)
+{
+	struct ceph_osd_req_op *ops;
+	int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_ROLLBACK, 0, 0, 0);
+	if (ret < 0)
+		return ret;
+
+	ops[0].snap.snapid = snapid;
+
+	ret = rbd_req_sync_op(dev, NULL,
+			       CEPH_NOSNAP,
+			       0,
+			       CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
+			       ops,
+			       1, obj, 0, 0, NULL);
+
+	rbd_destroy_ops(ops);
+
+	if (ret < 0)
+		return ret;
+
+	return ret;
+}
+
+/*
+ * Request sync osd read
+ */
+static int rbd_req_sync_exec(struct rbd_device *dev,
+		          const char *obj,
+			  const char *cls,
+			  const char *method,
+			  const char *data,
+			  int len)
+{
+	struct ceph_osd_req_op *ops;
+	int cls_len = strlen(cls);
+	int method_len = strlen(method);
+	int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_CALL, 0, 0,
+				    cls_len + method_len + len);
+	if (ret < 0)
+		return ret;
+
+	ops[0].cls.class_name = cls;
+	ops[0].cls.class_len = (__u8)cls_len;
+	ops[0].cls.method_name = method;
+	ops[0].cls.method_len = (__u8)method_len;
+	ops[0].cls.argc = 0;
+	ops[0].cls.indata = data;
+	ops[0].cls.indata_len = len;
+
+	ret = rbd_req_sync_op(dev, NULL,
+			       CEPH_NOSNAP,
+			       0,
+			       CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
+			       ops,
+			       1, obj, 0, 0, NULL);
+
+	rbd_destroy_ops(ops);
+
+	printk(KERN_ERR "cls_exec returned %d\n", ret);
+	return ret;
+}
+
+static int rbd_header_add_snap(struct rbd_device *dev,
+			       const char *snap_name,
+			       gfp_t gfp_flags)
+{
+	int name_len = strlen(snap_name);
+	u64 new_snapid;
+	int ret;
+	void *data, *data_start, *data_end;
+
+	/* we should create a snapshot only if we're pointing at the head */
+	if (dev->cur_snap)
+		return -EINVAL;
+
+	ret = ceph_monc_create_snapid(&dev->client->monc, dev->poolid,
+				      &new_snapid);
+	dout("created snapid=%lld\n", new_snapid);
+	if (ret < 0)
+		return ret;
+
+	data = kmalloc(name_len + 16, gfp_flags);
+	if (!data)
+		return -ENOMEM;
+
+	data_start = data;
+	data_end = data + name_len + 16;
+
+	ceph_encode_string_safe(&data, data_end, snap_name, name_len, bad);
+	ceph_encode_64_safe(&data, data_end, new_snapid, bad);
+
+	ret = rbd_req_sync_exec(dev, dev->obj_md_name, "rbd", "snap_add",
+				data_start, data - data_start);
+
+	kfree(data_start);
+
+	if (ret < 0)
+		return ret;
+
+	dev->header.snapc->seq =  new_snapid;
+
+	return 0;
+bad:
+	return -ERANGE;
+}
+
+/*
+ * block device queue callback
+ */
+static void rbd_rq_fn(struct request_queue *q)
+{
+	struct rbd_device *rbd_dev = q->queuedata;
+	struct request *rq;
+	struct bio_pair *bp = NULL;
+
+	rq = blk_fetch_request(q);
+
+	while (1) {
+		struct bio *bio;
+		struct bio *rq_bio, *next_bio = NULL;
+		bool do_write;
+		int size, op_size = 0;
+		u64 ofs;
+
+		/* peek at request from block layer */
+		if (!rq)
+			break;
+
+		dout("fetched request\n");
+
+		/* filter out block requests we don't understand */
+		if (!blk_fs_request(rq) && !blk_barrier_rq(rq)) {
+			__blk_end_request_all(rq, 0);
+			goto next;
+		}
+
+		/* deduce our operation (read, write) */
+		do_write = (rq_data_dir(rq) == WRITE);
+
+		size = blk_rq_bytes(rq);
+		ofs = blk_rq_pos(rq) * 512ULL;
+		rq_bio = rq->bio;
+		if (do_write && rbd_dev->read_only) {
+			__blk_end_request_all(rq, -EROFS);
+			goto next;
+		}
+
+		spin_unlock_irq(q->queue_lock);
+
+		dout("%s 0x%x bytes at 0x%llx\n",
+		     do_write ? "write" : "read",
+		     size, blk_rq_pos(rq) * 512ULL);
+
+		do {
+			/* a bio clone to be passed down to OSD req */
+			dout("rq->bio->bi_vcnt=%d\n", rq->bio->bi_vcnt);
+			op_size = rbd_get_segment(&rbd_dev->header,
+						  rbd_dev->header.block_name,
+						  ofs, size,
+						  NULL, NULL);
+			bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
+					      op_size, GFP_ATOMIC);
+			if (!bio) {
+				spin_lock_irq(q->queue_lock);
+				__blk_end_request_all(rq, -ENOMEM);
+				goto next;
+			}
+
+			/* init OSD command: write or read */
+			if (do_write)
+				rbd_req_write(rq, rbd_dev,
+					      rbd_dev->header.snapc,
+					      ofs,
+					      op_size, bio);
+			else
+				rbd_req_read(rq, rbd_dev,
+					     cur_snap_id(rbd_dev),
+					     ofs,
+					     op_size, bio);
+
+			size -= op_size;
+			ofs += op_size;
+
+			rq_bio = next_bio;
+		} while (size > 0);
+
+		if (bp)
+			bio_pair_release(bp);
+
+		spin_lock_irq(q->queue_lock);
+next:
+		rq = blk_fetch_request(q);
+	}
+}
+
+/*
+ * a queue callback. Makes sure that we don't create a bio that spans across
+ * multiple osd objects. One exception would be with a single page bios,
+ * which we handle later at bio_chain_clone
+ */
+static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
+			  struct bio_vec *bvec)
+{
+	sector_t sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
+	unsigned int chunk_sectors = (RBD_STRIPE_UNIT >> 9);
+	unsigned int bio_sectors = bmd->bi_size >> 9;
+	int max;
+
+	max =  (chunk_sectors - ((sector & (chunk_sectors - 1))
+				 + bio_sectors)) << 9;
+	if (max < 0)
+		max = 0; /* bio_add cannot handle a negative return */
+	if (max <= bvec->bv_len && bio_sectors == 0)
+		return bvec->bv_len;
+	return max;
+}
+
+static void rbd_free_disk(struct rbd_device *rbd_dev)
+{
+	struct gendisk *disk = rbd_dev->disk;
+
+	if (!disk)
+		return;
+
+	rbd_header_free(&rbd_dev->header);
+
+	if (disk->flags & GENHD_FL_UP)
+		del_gendisk(disk);
+	if (disk->queue)
+		blk_cleanup_queue(disk->queue);
+	put_disk(disk);
+}
+
+static int rbd_read_header(struct rbd_device *rbd_dev,
+			   struct rbd_obj_header *header)
+{
+	ssize_t rc;
+	struct rbd_obj_header_ondisk *dh;
+	int snap_count = 0;
+	u64 snap_names_len = 0;
+
+	while (1) {
+		int len = sizeof(*dh) +
+			  snap_count * sizeof(struct rbd_obj_snap_ondisk) +
+			  snap_names_len;
+
+		rc = -ENOMEM;
+		dh = kmalloc(len, GFP_KERNEL);
+		if (!dh)
+			return -ENOMEM;
+
+		rc = rbd_req_sync_read(rbd_dev,
+				       NULL, CEPH_NOSNAP,
+				       rbd_dev->obj_md_name,
+				       0, len,
+				       (char *)dh);
+		if (rc < 0)
+			goto out_dh;
+
+		rc = rbd_header_from_disk(header, dh, snap_count, GFP_KERNEL);
+		if (rc < 0)
+			goto out_dh;
+
+		if (snap_count != header->total_snaps) {
+			snap_count = header->total_snaps;
+			snap_names_len = header->snap_names_len;
+			rbd_header_free(header);
+			kfree(dh);
+			continue;
+		}
+		break;
+	}
+
+out_dh:
+	kfree(dh);
+	return rc;
+}
+
+/*
+ * only read the first part of the ondisk header, without the snaps info
+ */
+static int rbd_update_snaps(struct rbd_device *rbd_dev)
+{
+	int ret;
+	struct rbd_obj_header h;
+	u64 snap_seq;
+
+	ret = rbd_read_header(rbd_dev, &h);
+	if (ret < 0)
+		return ret;
+
+	down_write(&rbd_dev->header.snap_rwsem);
+
+	snap_seq = rbd_dev->header.snapc->seq;
+
+	kfree(rbd_dev->header.snapc);
+	kfree(rbd_dev->header.snap_names);
+	kfree(rbd_dev->header.snap_sizes);
+
+	rbd_dev->header.total_snaps = h.total_snaps;
+	rbd_dev->header.snapc = h.snapc;
+	rbd_dev->header.snap_names = h.snap_names;
+	rbd_dev->header.snap_sizes = h.snap_sizes;
+	rbd_dev->header.snapc->seq = snap_seq;
+
+	up_write(&rbd_dev->header.snap_rwsem);
+
+	return 0;
+}
+
+static int rbd_init_disk(struct rbd_device *rbd_dev)
+{
+	struct gendisk *disk;
+	struct request_queue *q;
+	int rc;
+	u64 total_size = 0;
+	const char *snap = NULL;
+
+	/* contact OSD, request size info about the object being mapped */
+	rc = rbd_read_header(rbd_dev, &rbd_dev->header);
+	if (rc)
+		return rc;
+
+	if (rbd_dev->client->mount_args)
+		snap = rbd_dev->client->mount_args->snap;
+	rc = rbd_header_set_snap(rbd_dev, snap, &total_size);
+	if (rc)
+		return rc;
+
+	/* create gendisk info */
+	rc = -ENOMEM;
+	disk = alloc_disk(RBD_MINORS_PER_MAJOR);
+	if (!disk)
+		goto out;
+
+	sprintf(disk->disk_name, DRV_NAME "%d", rbd_dev->id);
+	disk->major = rbd_dev->major;
+	disk->first_minor = 0;
+	disk->fops = &rbd_bd_ops;
+	disk->private_data = rbd_dev;
+
+	/* init rq */
+	rc = -ENOMEM;
+	q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
+	if (!q)
+		goto out_disk;
+	blk_queue_merge_bvec(q, rbd_merge_bvec);
+	disk->queue = q;
+
+	q->queuedata = rbd_dev;
+
+	rbd_dev->disk = disk;
+	rbd_dev->q = q;
+
+	/* finally, announce the disk to the world */
+	set_capacity(disk, total_size / 512ULL);
+	add_disk(disk);
+
+	pr_info("%s: added with size 0x%llx\n",
+		disk->disk_name, (unsigned long long)total_size);
+	return 0;
+
+out_disk:
+	put_disk(disk);
+out:
+	return rc;
+}
+
+/********************************************************************
+ * /sys/class/rbd/
+ *                   add	map rados objects to blkdev
+ *                   remove	unmap rados objects
+ *                   list	show mappings
+ *******************************************************************/
+
+static void class_rbd_release(struct class *cls)
+{
+	kfree(cls);
+}
+
+static ssize_t class_rbd_list(struct class *c,
+			      struct class_attribute *attr,
+			      char *data)
+{
+	int n = 0;
+	struct list_head *tmp;
+
+	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
+
+	list_for_each(tmp, &rbddev_list) {
+		struct rbd_device *rbd_dev;
+
+		rbd_dev = list_entry(tmp, struct rbd_device, node);
+		n += sprintf(data+n, "%d %d client%lld %s %s\n",
+			     rbd_dev->id,
+			     rbd_dev->major,
+			     ceph_client_id(rbd_dev->client),
+			     rbd_dev->pool_name,
+			     rbd_dev->obj);
+	}
+
+	mutex_unlock(&ctl_mutex);
+	return n;
+}
+
+static ssize_t class_rbd_add(struct class *c,
+			     struct class_attribute *attr,
+			     const char *buf, size_t count)
+{
+	struct rbd_device *rbd_dev;
+	ssize_t rc = -ENOMEM;
+	int irc, new_id = 0;
+	struct list_head *tmp;
+	char *mon_dev_name;
+	char *opt;
+
+	if (!try_module_get(THIS_MODULE))
+		return -ENODEV;
+
+	mon_dev_name = kmalloc(RBD_MAX_OPT_LEN, GFP_KERNEL);
+	if (!mon_dev_name)
+		goto err_out_mod;
+
+	opt = kmalloc(RBD_MAX_OPT_LEN, GFP_KERNEL);
+	if (!opt)
+		goto err_mon_dev;
+
+	/* new rbd_device object */
+	rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
+	if (!rbd_dev)
+		goto err_out_opt;
+
+	/* static rbd_device initialization */
+	spin_lock_init(&rbd_dev->lock);
+	INIT_LIST_HEAD(&rbd_dev->node);
+
+	/* generate unique id: find highest unique id, add one */
+	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
+
+	list_for_each(tmp, &rbddev_list) {
+		struct rbd_device *rbd_dev;
+
+		rbd_dev = list_entry(tmp, struct rbd_device, node);
+		if (rbd_dev->id >= new_id)
+			new_id = rbd_dev->id + 1;
+	}
+
+	rbd_dev->id = new_id;
+
+	/* add to global list */
+	list_add_tail(&rbd_dev->node, &rbddev_list);
+
+	/* parse add command */
+	if (sscanf(buf, "%" __stringify(RBD_MAX_OPT_LEN) "s "
+		   "%" __stringify(RBD_MAX_OPT_LEN) "s "
+		   "%" __stringify(RBD_MAX_POOL_NAME_SIZE) "s "
+		   "%" __stringify(RBD_MAX_OBJ_NAME_SIZE) "s",
+		   mon_dev_name, opt, rbd_dev->pool_name,
+		   rbd_dev->obj) != 4) {
+		rc = -EINVAL;
+		goto err_out_slot;
+	}
+
+	rbd_dev->obj_len = strlen(rbd_dev->obj);
+	snprintf(rbd_dev->obj_md_name, sizeof(rbd_dev->obj_md_name), "%s%s",
+		 rbd_dev->obj, RBD_SUFFIX);
+
+	/* initialize rest of new object */
+	snprintf(rbd_dev->name, DEV_NAME_LEN, DRV_NAME "%d", rbd_dev->id);
+	rc = rbd_get_client(rbd_dev, mon_dev_name, opt);
+	if (rc < 0)
+		goto err_out_slot;
+
+	mutex_unlock(&ctl_mutex);
+	/* register our block device */
+	irc = register_blkdev(0, rbd_dev->name);
+	if (irc < 0) {
+		rc = irc;
+		goto err_out_client;
+	}
+	rbd_dev->major = irc;
+
+	/* set up and announce blkdev mapping */
+	rc = rbd_init_disk(rbd_dev);
+	if (rc)
+		goto err_out_blkdev;
+
+	return count;
+
+err_out_blkdev:
+	unregister_blkdev(rbd_dev->major, rbd_dev->name);
+err_out_client:
+	rbd_put_client(rbd_dev);
+	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
+err_out_slot:
+	list_del_init(&rbd_dev->node);
+	mutex_unlock(&ctl_mutex);
+
+	kfree(rbd_dev);
+err_out_opt:
+	kfree(opt);
+err_mon_dev:
+	kfree(mon_dev_name);
+err_out_mod:
+	dout("Error adding device %s\n", buf);
+	module_put(THIS_MODULE);
+	return rc;
+}
+
+static struct rbd_device *__rbd_get_dev(unsigned long id)
+{
+	struct list_head *tmp;
+	struct rbd_device *rbd_dev = NULL;
+
+	list_for_each(tmp, &rbddev_list) {
+		rbd_dev = list_entry(tmp, struct rbd_device, node);
+		if (rbd_dev->id == id)
+			break;
+
+		rbd_dev = NULL;
+	}
+
+	return rbd_dev;
+}
+
+static ssize_t class_rbd_remove(struct class *c,
+				struct class_attribute *attr,
+				const char *buf,
+				size_t count)
+{
+	struct rbd_device *rbd_dev = NULL;
+	int target_id, rc;
+	unsigned long ul;
+
+	rc = strict_strtoul(buf, 10, &ul);
+	if (rc)
+		return rc;
+
+	/* convert to int; abort if we lost anything in the conversion */
+	target_id = (int) ul;
+	if (target_id != ul)
+		return -EINVAL;
+
+	/* remove object from list immediately */
+	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
+
+	rbd_dev = __rbd_get_dev(target_id);
+	if (rbd_dev)
+		list_del_init(&rbd_dev->node);
+
+	mutex_unlock(&ctl_mutex);
+
+	if (!rbd_dev)
+		return -ENOENT;
+
+	rbd_put_client(rbd_dev);
+
+	/* clean up and free blkdev and associated OSD connection */
+	rbd_free_disk(rbd_dev);
+	unregister_blkdev(rbd_dev->major, rbd_dev->name);
+	kfree(rbd_dev);
+
+	/* release module ref */
+	module_put(THIS_MODULE);
+
+	return count;
+}
+
+static void get_size_and_suffix(u64 orig_size, u64 *size, char *suffix)
+{
+	if (orig_size >= 1024*1024*1024) {
+		*size = orig_size / (1024*1024*1024);
+		*suffix = 'G';
+	} else if (orig_size >= 1024*1024) {
+		*size = orig_size / (1024*1024);
+		*suffix = 'M';
+	} else if (orig_size >= 1024) {
+		*size = orig_size / 1024;
+		*suffix = 'K';
+	} else {
+		*size = orig_size;
+		*suffix = ' ';
+	}
+}
+
+static ssize_t class_rbd_snaps_list(struct class *c,
+			      struct class_attribute *attr,
+			      char *data)
+{
+	struct rbd_device *rbd_dev = NULL;
+	struct list_head *tmp;
+	struct rbd_obj_header *header;
+	char size_suffix;
+	u64 size;
+	int i, n = 0, max = PAGE_SIZE;
+	int ret;
+
+	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
+
+	list_for_each(tmp, &rbddev_list) {
+		char *names, *p;
+		struct ceph_snap_context *snapc;
+
+		rbd_dev = list_entry(tmp, struct rbd_device, node);
+		header = &rbd_dev->header;
+		names = header->snap_names;
+		snapc = header->snapc;
+		n += snprintf(data + n, max - n,
+			      "snapshots for device id %d:\n",
+			      rbd_dev->id);
+		if (n == max)
+			break;
+
+		down_read(&header->snap_rwsem);
+
+		get_size_and_suffix(header->image_size, &size,
+				    &size_suffix);
+		n += snprintf(data + n, max - n, "%s\t%lld%c%s\n",
+				      RBD_SNAP_HEAD_NAME,
+				      size, size_suffix,
+				      (!rbd_dev->cur_snap ?
+				       " (*)" : ""));
+		if (n == max)
+			break;
+
+		p = names;
+		for (i = 0; i < header->total_snaps; i++, p += strlen(p) + 1) {
+			get_size_and_suffix(header->snap_sizes[i], &size,
+					    &size_suffix);
+			n += snprintf(data + n, max - n, "%s\t%lld%c%s\n",
+			      p, size, size_suffix,
+			      (rbd_dev->cur_snap &&
+			       (snap_index(header, i) == rbd_dev->cur_snap) ?
+			       " (*)" : ""));
+			if (n == max)
+				break;
+		}
+
+		up_read(&header->snap_rwsem);
+	}
+
+
+	ret = n;
+	mutex_unlock(&ctl_mutex);
+	return ret;
+}
+
+static ssize_t class_rbd_snaps_refresh(struct class *c,
+				struct class_attribute *attr,
+				const char *buf,
+				size_t count)
+{
+	struct rbd_device *rbd_dev = NULL;
+	int target_id, rc;
+	unsigned long ul;
+	int ret = count;
+
+	rc = strict_strtoul(buf, 10, &ul);
+	if (rc)
+		return rc;
+
+	/* convert to int; abort if we lost anything in the conversion */
+	target_id = (int) ul;
+	if (target_id != ul)
+		return -EINVAL;
+
+	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
+
+	rbd_dev = __rbd_get_dev(target_id);
+	if (!rbd_dev) {
+		ret = -ENOENT;
+		goto done;
+	}
+
+	rc = rbd_update_snaps(rbd_dev);
+	if (rc < 0)
+		ret = rc;
+
+done:
+	mutex_unlock(&ctl_mutex);
+	return ret;
+}
+
+static ssize_t class_rbd_snap_create(struct class *c,
+				struct class_attribute *attr,
+				const char *buf,
+				size_t count)
+{
+	struct rbd_device *rbd_dev = NULL;
+	int target_id, ret;
+	char *name;
+
+	name = kmalloc(RBD_MAX_SNAP_NAME_LEN + 1, GFP_KERNEL);
+	if (!name)
+		return -ENOMEM;
+
+	/* parse snaps add command */
+	if (sscanf(buf, "%d "
+		   "%" __stringify(RBD_MAX_SNAP_NAME_LEN) "s",
+		   &target_id,
+		   name) != 2) {
+		ret = -EINVAL;
+		goto done;
+	}
+
+	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
+
+	rbd_dev = __rbd_get_dev(target_id);
+	if (!rbd_dev) {
+		ret = -ENOENT;
+		goto done_unlock;
+	}
+
+	ret = rbd_header_add_snap(rbd_dev,
+				  name, GFP_KERNEL);
+	if (ret < 0)
+		goto done_unlock;
+
+	ret = rbd_update_snaps(rbd_dev);
+	if (ret < 0)
+		goto done_unlock;
+
+	ret = count;
+done_unlock:
+	mutex_unlock(&ctl_mutex);
+done:
+	kfree(name);
+	return ret;
+}
+
+static ssize_t class_rbd_rollback(struct class *c,
+				struct class_attribute *attr,
+				const char *buf,
+				size_t count)
+{
+	struct rbd_device *rbd_dev = NULL;
+	int target_id, ret;
+	u64 snapid;
+	char snap_name[RBD_MAX_SNAP_NAME_LEN];
+	u64 cur_ofs;
+	char *seg_name;
+
+	/* parse snaps add command */
+	if (sscanf(buf, "%d "
+		   "%" __stringify(RBD_MAX_SNAP_NAME_LEN) "s",
+		   &target_id,
+		   snap_name) != 2) {
+		return -EINVAL;
+	}
+
+	ret = -ENOMEM;
+	seg_name = kmalloc(RBD_MAX_SEG_NAME_SIZE + 1, GFP_NOIO);
+	if (!seg_name)
+		return ret;
+
+	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
+
+	rbd_dev = __rbd_get_dev(target_id);
+	if (!rbd_dev) {
+		ret = -ENOENT;
+		goto done_unlock;
+	}
+
+	ret = snap_by_name(&rbd_dev->header, snap_name, &snapid, NULL);
+	if (ret < 0)
+		goto done_unlock;
+
+	printk(KERN_ERR "snapid=%lld\n", snapid);
+
+	cur_ofs = 0;
+	while (cur_ofs < rbd_dev->header.image_size) {
+		cur_ofs += rbd_get_segment(&rbd_dev->header,
+					   rbd_dev->obj,
+					   cur_ofs, (u64)-1,
+					   seg_name, NULL);
+		printk(KERN_ERR "seg_name=%s\n", seg_name);
+
+		ret = rbd_req_sync_rollback_obj(rbd_dev, snapid, seg_name);
+		if (ret < 0)
+			printk("could not roll back obj %s err=%d\n", seg_name, ret);
+	}
+
+	ret = rbd_update_snaps(rbd_dev);
+	if (ret < 0)
+		goto done_unlock;
+
+	ret = count;
+
+done_unlock:
+	mutex_unlock(&ctl_mutex);
+	kfree(seg_name);
+
+	return ret;
+}
+
+static struct class_attribute class_rbd_attrs[] = {
+	__ATTR(add,		0200, NULL, class_rbd_add),
+	__ATTR(remove,		0200, NULL, class_rbd_remove),
+	__ATTR(list,		0444, class_rbd_list, NULL),
+	__ATTR(snaps_refresh,	0200, NULL, class_rbd_snaps_refresh),
+	__ATTR(snap_create,	0200, NULL, class_rbd_snap_create),
+	__ATTR(snaps_list,	0444, class_rbd_snaps_list, NULL),
+	__ATTR(snap_rollback,	0200, NULL, class_rbd_rollback),
+	__ATTR_NULL
+};
+
+/*
+ * create control files in sysfs
+ * /sys/class/rbd/...
+ */
+static int rbd_sysfs_init(void)
+{
+	int ret = -ENOMEM;
+
+	class_rbd = kzalloc(sizeof(*class_rbd), GFP_KERNEL);
+	if (!class_rbd)
+		goto out;
+
+	class_rbd->name = DRV_NAME;
+	class_rbd->owner = THIS_MODULE;
+	class_rbd->class_release = class_rbd_release;
+	class_rbd->class_attrs = class_rbd_attrs;
+
+	ret = class_register(class_rbd);
+	if (ret)
+		goto out_class;
+	return 0;
+
+out_class:
+	kfree(class_rbd);
+	class_rbd = NULL;
+	pr_err(DRV_NAME ": failed to create class rbd\n");
+out:
+	return ret;
+}
+
+static void rbd_sysfs_cleanup(void)
+{
+	if (class_rbd)
+		class_destroy(class_rbd);
+	class_rbd = NULL;
+}
+
+int __init rbd_init(void)
+{
+	int rc;
+
+	rc = rbd_sysfs_init();
+	if (rc)
+		return rc;
+	spin_lock_init(&node_lock);
+	pr_info("loaded " DRV_NAME_LONG);
+	return 0;
+}
+
+void __exit rbd_exit(void)
+{
+	rbd_sysfs_cleanup();
+}
+
+#endif
diff --git a/fs/ceph/rbd.h b/fs/ceph/rbd.h
new file mode 100644
index 0000000..68e0a5c
--- /dev/null
+++ b/fs/ceph/rbd.h
@@ -0,0 +1,8 @@
+#ifndef _FS_CEPH_RBD
+#define _FS_CEPH_RBD
+
+extern void rbd_set_osdc(struct ceph_osd_client *o);
+extern int __init rbd_init(void);
+extern void __exit rbd_exit(void);
+
+#endif
diff --git a/fs/ceph/rbd_types.h b/fs/ceph/rbd_types.h
new file mode 100644
index 0000000..a7c0a39
--- /dev/null
+++ b/fs/ceph/rbd_types.h
@@ -0,0 +1,71 @@
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2010 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#ifndef CEPH_RBD_TYPES_H
+#define CEPH_RBD_TYPES_H
+
+#include <linux/types.h>
+
+/*
+ * rbd image 'foo' consists of objects
+ *   foo.rbd      - image metadata
+ *   foo.00000000
+ *   foo.00000001
+ *   ...          - data
+ */
+
+#define RBD_SUFFIX	 	".rbd"
+#define RBD_DIRECTORY           "rbd_directory"
+#define RBD_INFO                "rbd_info"
+
+#define RBD_DEFAULT_OBJ_ORDER	22   /* 4MB */
+
+#define RBD_MAX_OBJ_NAME_SIZE	96
+#define RBD_MAX_SEG_NAME_SIZE	128
+
+#define RBD_COMP_NONE		0
+#define RBD_CRYPT_NONE		0
+
+#define RBD_HEADER_TEXT		"<<< Rados Block Device Image >>>\n"
+#define RBD_HEADER_SIGNATURE	"RBD"
+#define RBD_HEADER_VERSION	"001.005"
+
+struct rbd_info {
+	__le64 max_id;
+} __attribute__ ((packed));
+
+struct rbd_obj_snap_ondisk {
+	__le64 id;
+	__le64 image_size;
+} __attribute__((packed));
+
+struct rbd_obj_header_ondisk {
+	char text[40];
+	char block_name[24];
+	char signature[4];
+	char version[8];
+	struct {
+		__u8 order;
+		__u8 crypt_type;
+		__u8 comp_type;
+		__u8 unused;
+	} __attribute__((packed)) options;
+	__le64 image_size;
+	__le64 snap_seq;
+	__le32 snap_count;
+	__le32 reserved;
+	__le64 snap_names_len;
+	struct rbd_obj_snap_ondisk snaps[0];
+} __attribute__((packed));
+
+
+#endif
diff --git a/fs/ceph/super.c b/fs/ceph/super.c
index acff12f..5d5fe14 100644
--- a/fs/ceph/super.c
+++ b/fs/ceph/super.c
@@ -18,6 +18,7 @@
 #include "super.h"
 #include "mon_client.h"
 #include "auth.h"
+#include "rbd.h"
 
 /*
  * Ceph superblock operations
@@ -1151,8 +1152,14 @@ static int __init init_ceph(void)
 		CEPH_MONC_PROTOCOL, CEPH_MDSC_PROTOCOL, CEPH_OSDC_PROTOCOL,
 		CEPH_OSDMAP_VERSION, CEPH_OSDMAP_VERSION_EXT,
 		CEPH_OSDMAP_INC_VERSION, CEPH_OSDMAP_INC_VERSION_EXT);
+
+	ret = rbd_init();
+	if (ret)
+		goto out_fs;
 	return 0;
 
+out_fs:
+	unregister_filesystem(&ceph_fs_type);
 out_icache:
 	destroy_caches();
 out_msgr:
@@ -1166,6 +1173,7 @@ out:
 static void __exit exit_ceph(void)
 {
 	dout("exit_ceph\n");
+	rbd_exit();
 	unregister_filesystem(&ceph_fs_type);
 	ceph_caps_finalize();
 	destroy_caches();
diff --git a/fs/ceph/super.h b/fs/ceph/super.h
index 67d5b75..27a6767 100644
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -921,4 +921,9 @@ static inline struct inode *get_dentry_parent_inode(struct dentry *dentry)
 	return NULL;
 }
 
+#ifndef CONFIG_CEPH_RBD
+static inline int __init rbd_init(void) { return 0; }
+static inline void __exit rbd_exit(void) {}
+#endif
+
 #endif /* _FS_CEPH_SUPER_H */
-- 
1.5.6.5


^ permalink raw reply	[flat|nested] 11+ messages in thread

end of thread, back to index

Thread overview: 11+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2010-07-07 22:11 [PATCH v3 00/10] ceph-rbd: ceph RADOS block device Yehuda Sadeh
2010-07-07 22:11 ` [PATCH v3 01/10] ceph-rbd: lookup pool in osdmap by name Yehuda Sadeh
2010-07-07 22:11 ` [PATCH v3 02/10] ceph-rbd: refactor osdc requests creation functions Yehuda Sadeh
2010-07-07 22:11 ` [PATCH v3 03/10] ceph-rbd: messenger and osdc changes for rbd Yehuda Sadeh
2010-07-07 22:11 ` [PATCH v3 04/10] ceph-rbd: enable creation of clients that don't need mds Yehuda Sadeh
2010-07-07 22:11 ` [PATCH v3 05/10] ceph-rbd: refactor mount related functions, add helpers Yehuda Sadeh
2010-07-07 22:11 ` [PATCH v3 06/10] ceph-rbd: generalize mon requests, add pool op support Yehuda Sadeh
2010-07-07 22:11 ` [PATCH v3 07/10] ceph-rbd: some super.c changes for rbd Yehuda Sadeh
2010-07-07 22:11 ` [PATCH v3 08/10] ceph-rbd: osdc support for osd call and rollback operations Yehuda Sadeh
2010-07-07 22:11 ` [PATCH v3 09/10] ceph-rbd: sync common header and source files with server version Yehuda Sadeh
2010-07-07 22:11 ` [PATCH v3 10/10] ceph-rbd: rados block device Yehuda Sadeh

CEPH-Devel Archive on lore.kernel.org

Archives are clonable:
	git clone --mirror https://lore.kernel.org/ceph-devel/0 ceph-devel/git/0.git

	# If you have public-inbox 1.1+ installed, you may
	# initialize and index your mirror using the following commands:
	public-inbox-init -V2 ceph-devel ceph-devel/ https://lore.kernel.org/ceph-devel \
		ceph-devel@vger.kernel.org
	public-inbox-index ceph-devel

Example config snippet for mirrors

Newsgroup available over NNTP:
	nntp://nntp.lore.kernel.org/org.kernel.vger.ceph-devel


AGPL code for this site: git clone https://public-inbox.org/public-inbox.git