linux-kernel.vger.kernel.org archive mirror
 help / color / mirror / Atom feed
* [PATCH 0/8] rados block device and ceph refactor
@ 2010-08-13 17:40 Sage Weil
  2010-08-13 17:40 ` [PATCH 1/8] ceph-rbd: lookup pool in osdmap by name Sage Weil
                   ` (7 more replies)
  0 siblings, 8 replies; 14+ messages in thread
From: Sage Weil @ 2010-08-13 17:40 UTC (permalink / raw)
  To: linux-kernel, linux-fsdevel, linux-scsi
  Cc: ceph-devel, hch, akpm, yehuda, Sage Weil

Hi,

The rados block device (rbd) implements a network block device backed by 
the Ceph distributed object store (think nbd/iSCSI, but distributed and 
fault tolerant).  At the suggestion of Christoph and James, this version 
of the patchset factors out the common Ceph bits (the network protocol, 
cluster membership, and object storage parts) into a libceph module 
(currently in net/ceph/ and include/linux/ceph/) that is shared by the 
file system component (fs/ceph) and rbd (drivers/block/rbd.c). The first 
few patches lay some groundwork, #7 moves does the ceph -> libceph+ceph 
split, and #8 adds the block device driver.

The block device code has been in linux-next for a while, but it could 
use some review by someone more familiar with the block layer. The rbd 
code is originally based on osdblk, and has a similar sysfs interface 
(rbd also supports snapshots, so there are a few more knobs for that).

Two questions --

1- Are net/ceph/ and include/linux/ceph/ appropriate locations for the 
libceph code?  (It seemed more similar to other net/ residents than the 
stuff in lib/.)

2- Do I need an explicit ACK from any block people before sending this 
to Linus?  Carrying/rebasing the refactoring patch out of tree will be 
tedious, so I would like to push it sooner rather than later.

Thanks!
sage

PS This code is also available in git at

  git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client.git rbd

---

Sage Weil (1):
  ceph: factor out libceph from Ceph file system

Yehuda Sadeh (7):
  ceph-rbd: lookup pool in osdmap by name
  ceph-rbd: refactor osdc requests creation functions
  ceph-rbd: messenger and osdc changes for rbd
  ceph-rbd: enable creation of clients that don't need mds
  ceph-rbd: refactor mount related functions, add helpers
  ceph-rbd: osdc support for osd call and rollback operations
  rbd: introduce rados block device (rbd), based on libceph

 MAINTAINERS                       |   11 +
 drivers/block/Kconfig             |   13 +
 drivers/block/Makefile            |    1 +
 drivers/block/rbd.c               | 1844 ++++++++++++++++++++++++++++
 drivers/block/rbd_types.h         |   73 ++
 fs/ceph/Kconfig                   |   14 +-
 fs/ceph/Makefile                  |   11 +-
 fs/ceph/README                    |   20 -
 fs/ceph/addr.c                    |   65 +-
 fs/ceph/armor.c                   |  103 --
 fs/ceph/auth.c                    |  259 ----
 fs/ceph/auth.h                    |   92 --
 fs/ceph/auth_none.c               |  131 --
 fs/ceph/auth_none.h               |   30 -
 fs/ceph/auth_x.c                  |  684 -----------
 fs/ceph/auth_x.h                  |   49 -
 fs/ceph/auth_x_protocol.h         |   90 --
 fs/ceph/buffer.c                  |   65 -
 fs/ceph/buffer.h                  |   39 -
 fs/ceph/caps.c                    |   35 +-
 fs/ceph/ceph_debug.h              |   37 -
 fs/ceph/ceph_frag.c               |    3 +-
 fs/ceph/ceph_frag.h               |  109 --
 fs/ceph/ceph_fs.c                 |   72 --
 fs/ceph/ceph_fs.h                 |  728 -----------
 fs/ceph/ceph_hash.c               |  118 --
 fs/ceph/ceph_hash.h               |   13 -
 fs/ceph/ceph_strings.c            |  193 ---
 fs/ceph/crush/crush.c             |  151 ---
 fs/ceph/crush/crush.h             |  180 ---
 fs/ceph/crush/hash.c              |  149 ---
 fs/ceph/crush/hash.h              |   17 -
 fs/ceph/crush/mapper.c            |  609 ---------
 fs/ceph/crush/mapper.h            |   20 -
 fs/ceph/crypto.c                  |  412 -------
 fs/ceph/crypto.h                  |   48 -
 fs/ceph/debugfs.c                 |  407 ++-----
 fs/ceph/decode.h                  |  196 ---
 fs/ceph/dir.c                     |   55 +-
 fs/ceph/export.c                  |    5 +-
 fs/ceph/file.c                    |  207 +---
 fs/ceph/inode.c                   |   19 +-
 fs/ceph/ioctl.c                   |   11 +-
 fs/ceph/locks.c                   |    6 +-
 fs/ceph/mds_client.c              |   85 +-
 fs/ceph/mds_client.h              |   20 +-
 fs/ceph/mdsmap.c                  |   11 +-
 fs/ceph/mdsmap.h                  |   62 -
 fs/ceph/messenger.c               | 2277 ----------------------------------
 fs/ceph/messenger.h               |  253 ----
 fs/ceph/mon_client.c              | 1018 ---------------
 fs/ceph/mon_client.h              |  121 --
 fs/ceph/msgpool.c                 |   64 -
 fs/ceph/msgpool.h                 |   25 -
 fs/ceph/msgr.h                    |  175 ---
 fs/ceph/osd_client.c              | 1539 -----------------------
 fs/ceph/osd_client.h              |  167 ---
 fs/ceph/osdmap.c                  | 1110 -----------------
 fs/ceph/osdmap.h                  |  128 --
 fs/ceph/pagelist.c                |   55 -
 fs/ceph/pagelist.h                |   54 -
 fs/ceph/rados.h                   |  405 ------
 fs/ceph/snap.c                    |   10 +-
 fs/ceph/strings.c                 |  117 ++
 fs/ceph/super.c                   | 1154 +++++++----------
 fs/ceph/super.h                   |  397 +++----
 fs/ceph/types.h                   |   29 -
 fs/ceph/xattr.c                   |   15 +-
 include/linux/ceph/auth.h         |   92 ++
 include/linux/ceph/buffer.h       |   39 +
 include/linux/ceph/ceph_debug.h   |   37 +
 include/linux/ceph/ceph_frag.h    |  109 ++
 include/linux/ceph/ceph_fs.h      |  728 +++++++++++
 include/linux/ceph/ceph_hash.h    |   13 +
 include/linux/ceph/crush/crush.h  |  180 +++
 include/linux/ceph/crush/hash.h   |   17 +
 include/linux/ceph/crush/mapper.h |   20 +
 include/linux/ceph/debugfs.h      |   33 +
 include/linux/ceph/decode.h       |  201 +++
 include/linux/ceph/libceph.h      |  249 ++++
 include/linux/ceph/mdsmap.h       |   62 +
 include/linux/ceph/messenger.h    |  261 ++++
 include/linux/ceph/mon_client.h   |  122 ++
 include/linux/ceph/msgpool.h      |   25 +
 include/linux/ceph/msgr.h         |  175 +++
 include/linux/ceph/osd_client.h   |  234 ++++
 include/linux/ceph/osdmap.h       |  130 ++
 include/linux/ceph/pagelist.h     |   54 +
 include/linux/ceph/rados.h        |  405 ++++++
 include/linux/ceph/types.h        |   29 +
 net/Kconfig                       |    1 +
 net/Makefile                      |    1 +
 net/ceph/Kconfig                  |   27 +
 net/ceph/Makefile                 |   37 +
 net/ceph/armor.c                  |  103 ++
 net/ceph/auth.c                   |  259 ++++
 net/ceph/auth_none.c              |  132 ++
 net/ceph/auth_none.h              |   29 +
 net/ceph/auth_x.c                 |  685 +++++++++++
 net/ceph/auth_x.h                 |   50 +
 net/ceph/auth_x_protocol.h        |   90 ++
 net/ceph/buffer.c                 |   68 +
 net/ceph/ceph_common.c            |  529 ++++++++
 net/ceph/ceph_fs.c                |   75 ++
 net/ceph/ceph_hash.c              |  118 ++
 net/ceph/ceph_strings.c           |   84 ++
 net/ceph/crush/crush.c            |  151 +++
 net/ceph/crush/hash.c             |  149 +++
 net/ceph/crush/mapper.c           |  609 +++++++++
 net/ceph/crypto.c                 |  412 +++++++
 net/ceph/crypto.h                 |   48 +
 net/ceph/debugfs.c                |  268 ++++
 net/ceph/messenger.c              | 2453 +++++++++++++++++++++++++++++++++++++
 net/ceph/mon_client.c             | 1027 ++++++++++++++++
 net/ceph/msgpool.c                |   64 +
 net/ceph/osd_client.c             | 1773 +++++++++++++++++++++++++++
 net/ceph/osdmap.c                 | 1128 +++++++++++++++++
 net/ceph/pagelist.c               |   57 +
 net/ceph/pagevec.c                |  223 ++++
 119 files changed, 16847 insertions(+), 13703 deletions(-)
 create mode 100644 drivers/block/rbd.c
 create mode 100644 drivers/block/rbd_types.h
 delete mode 100644 fs/ceph/README
 delete mode 100644 fs/ceph/armor.c
 delete mode 100644 fs/ceph/auth.c
 delete mode 100644 fs/ceph/auth.h
 delete mode 100644 fs/ceph/auth_none.c
 delete mode 100644 fs/ceph/auth_none.h
 delete mode 100644 fs/ceph/auth_x.c
 delete mode 100644 fs/ceph/auth_x.h
 delete mode 100644 fs/ceph/auth_x_protocol.h
 delete mode 100644 fs/ceph/buffer.c
 delete mode 100644 fs/ceph/buffer.h
 delete mode 100644 fs/ceph/ceph_debug.h
 delete mode 100644 fs/ceph/ceph_frag.h
 delete mode 100644 fs/ceph/ceph_fs.c
 delete mode 100644 fs/ceph/ceph_fs.h
 delete mode 100644 fs/ceph/ceph_hash.c
 delete mode 100644 fs/ceph/ceph_hash.h
 delete mode 100644 fs/ceph/ceph_strings.c
 delete mode 100644 fs/ceph/crush/crush.c
 delete mode 100644 fs/ceph/crush/crush.h
 delete mode 100644 fs/ceph/crush/hash.c
 delete mode 100644 fs/ceph/crush/hash.h
 delete mode 100644 fs/ceph/crush/mapper.c
 delete mode 100644 fs/ceph/crush/mapper.h
 delete mode 100644 fs/ceph/crypto.c
 delete mode 100644 fs/ceph/crypto.h
 delete mode 100644 fs/ceph/decode.h
 delete mode 100644 fs/ceph/mdsmap.h
 delete mode 100644 fs/ceph/messenger.c
 delete mode 100644 fs/ceph/messenger.h
 delete mode 100644 fs/ceph/mon_client.c
 delete mode 100644 fs/ceph/mon_client.h
 delete mode 100644 fs/ceph/msgpool.c
 delete mode 100644 fs/ceph/msgpool.h
 delete mode 100644 fs/ceph/msgr.h
 delete mode 100644 fs/ceph/osd_client.c
 delete mode 100644 fs/ceph/osd_client.h
 delete mode 100644 fs/ceph/osdmap.c
 delete mode 100644 fs/ceph/osdmap.h
 delete mode 100644 fs/ceph/pagelist.c
 delete mode 100644 fs/ceph/pagelist.h
 delete mode 100644 fs/ceph/rados.h
 create mode 100644 fs/ceph/strings.c
 delete mode 100644 fs/ceph/types.h
 create mode 100644 include/linux/ceph/auth.h
 create mode 100644 include/linux/ceph/buffer.h
 create mode 100644 include/linux/ceph/ceph_debug.h
 create mode 100644 include/linux/ceph/ceph_frag.h
 create mode 100644 include/linux/ceph/ceph_fs.h
 create mode 100644 include/linux/ceph/ceph_hash.h
 create mode 100644 include/linux/ceph/crush/crush.h
 create mode 100644 include/linux/ceph/crush/hash.h
 create mode 100644 include/linux/ceph/crush/mapper.h
 create mode 100644 include/linux/ceph/debugfs.h
 create mode 100644 include/linux/ceph/decode.h
 create mode 100644 include/linux/ceph/libceph.h
 create mode 100644 include/linux/ceph/mdsmap.h
 create mode 100644 include/linux/ceph/messenger.h
 create mode 100644 include/linux/ceph/mon_client.h
 create mode 100644 include/linux/ceph/msgpool.h
 create mode 100644 include/linux/ceph/msgr.h
 create mode 100644 include/linux/ceph/osd_client.h
 create mode 100644 include/linux/ceph/osdmap.h
 create mode 100644 include/linux/ceph/pagelist.h
 create mode 100644 include/linux/ceph/rados.h
 create mode 100644 include/linux/ceph/types.h
 create mode 100644 net/ceph/Kconfig
 create mode 100644 net/ceph/Makefile
 create mode 100644 net/ceph/armor.c
 create mode 100644 net/ceph/auth.c
 create mode 100644 net/ceph/auth_none.c
 create mode 100644 net/ceph/auth_none.h
 create mode 100644 net/ceph/auth_x.c
 create mode 100644 net/ceph/auth_x.h
 create mode 100644 net/ceph/auth_x_protocol.h
 create mode 100644 net/ceph/buffer.c
 create mode 100644 net/ceph/ceph_common.c
 create mode 100644 net/ceph/ceph_fs.c
 create mode 100644 net/ceph/ceph_hash.c
 create mode 100644 net/ceph/ceph_strings.c
 create mode 100644 net/ceph/crush/crush.c
 create mode 100644 net/ceph/crush/hash.c
 create mode 100644 net/ceph/crush/mapper.c
 create mode 100644 net/ceph/crypto.c
 create mode 100644 net/ceph/crypto.h
 create mode 100644 net/ceph/debugfs.c
 create mode 100644 net/ceph/messenger.c
 create mode 100644 net/ceph/mon_client.c
 create mode 100644 net/ceph/msgpool.c
 create mode 100644 net/ceph/osd_client.c
 create mode 100644 net/ceph/osdmap.c
 create mode 100644 net/ceph/pagelist.c
 create mode 100644 net/ceph/pagevec.c


^ permalink raw reply	[flat|nested] 14+ messages in thread

* [PATCH 1/8] ceph-rbd: lookup pool in osdmap by name
  2010-08-13 17:40 [PATCH 0/8] rados block device and ceph refactor Sage Weil
@ 2010-08-13 17:40 ` Sage Weil
  2010-08-13 17:40 ` [PATCH 2/8] ceph-rbd: refactor osdc requests creation functions Sage Weil
                   ` (6 subsequent siblings)
  7 siblings, 0 replies; 14+ messages in thread
From: Sage Weil @ 2010-08-13 17:40 UTC (permalink / raw)
  To: linux-kernel, linux-fsdevel, linux-scsi
  Cc: ceph-devel, hch, akpm, yehuda, Sage Weil

From: Yehuda Sadeh <yehuda@hq.newdream.net>

Signed-off-by: Yehuda Sadeh <yehuda@hq.newdream.net>
Signed-off-by: Sage Weil <sage@newdream.net>
---
 fs/ceph/osdmap.c |   13 +++++++++++++
 fs/ceph/osdmap.h |    2 ++
 2 files changed, 15 insertions(+), 0 deletions(-)

diff --git a/fs/ceph/osdmap.c b/fs/ceph/osdmap.c
index e31f118..3ccd117 100644
--- a/fs/ceph/osdmap.c
+++ b/fs/ceph/osdmap.c
@@ -417,6 +417,19 @@ static struct ceph_pg_pool_info *__lookup_pg_pool(struct rb_root *root, int id)
 	return NULL;
 }
 
+int ceph_pg_poolid_by_name(struct ceph_osdmap *map, const char *name)
+{
+	struct rb_node *rbp;
+
+	for (rbp = rb_first(&map->pg_pools); rbp; rbp = rb_next(rbp)) {
+		struct ceph_pg_pool_info *pi =
+			rb_entry(rbp, struct ceph_pg_pool_info, node);
+		if (pi->name && strcmp(pi->name, name) == 0)
+			return pi->id;
+	}
+	return -ENOENT;
+}
+
 static void __remove_pg_pool(struct rb_root *root, struct ceph_pg_pool_info *pi)
 {
 	rb_erase(&pi->node, root);
diff --git a/fs/ceph/osdmap.h b/fs/ceph/osdmap.h
index 970b547..a592b21 100644
--- a/fs/ceph/osdmap.h
+++ b/fs/ceph/osdmap.h
@@ -125,4 +125,6 @@ extern int ceph_calc_pg_acting(struct ceph_osdmap *osdmap, struct ceph_pg pgid,
 extern int ceph_calc_pg_primary(struct ceph_osdmap *osdmap,
 				struct ceph_pg pgid);
 
+extern int ceph_pg_poolid_by_name(struct ceph_osdmap *map, const char *name);
+
 #endif
-- 
1.7.0


^ permalink raw reply related	[flat|nested] 14+ messages in thread

* [PATCH 2/8] ceph-rbd: refactor osdc requests creation functions
  2010-08-13 17:40 [PATCH 0/8] rados block device and ceph refactor Sage Weil
  2010-08-13 17:40 ` [PATCH 1/8] ceph-rbd: lookup pool in osdmap by name Sage Weil
@ 2010-08-13 17:40 ` Sage Weil
  2010-08-13 17:40 ` [PATCH 3/8] ceph-rbd: messenger and osdc changes for rbd Sage Weil
                   ` (5 subsequent siblings)
  7 siblings, 0 replies; 14+ messages in thread
From: Sage Weil @ 2010-08-13 17:40 UTC (permalink / raw)
  To: linux-kernel, linux-fsdevel, linux-scsi
  Cc: ceph-devel, hch, akpm, yehuda, Sage Weil

From: Yehuda Sadeh <yehuda@hq.newdream.net>

The osd requests creation are being decoupled from the
vino parameter, allowing clients using the osd to use
other arbitrary object names that are not necessarily
vino based. Also, calc_raw_layout now takes a snap id.

Signed-off-by: Yehuda Sadeh <yehuda@hq.newdream.net>
Signed-off-by: Sage Weil <sage@newdream.net>
---
 fs/ceph/osd_client.c |  187 ++++++++++++++++++++++++++++++++++---------------
 fs/ceph/osd_client.h |   25 +++++++
 2 files changed, 155 insertions(+), 57 deletions(-)

diff --git a/fs/ceph/osd_client.c b/fs/ceph/osd_client.c
index bed6391..ce7f7e0 100644
--- a/fs/ceph/osd_client.c
+++ b/fs/ceph/osd_client.c
@@ -22,6 +22,35 @@ static int __kick_requests(struct ceph_osd_client *osdc,
 
 static void kick_requests(struct ceph_osd_client *osdc, struct ceph_osd *osd);
 
+void ceph_calc_raw_layout(struct ceph_osd_client *osdc,
+			struct ceph_file_layout *layout,
+			u64 snapid,
+			u64 off, u64 len, u64 *bno,
+			struct ceph_osd_request *req)
+{
+	struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
+	struct ceph_osd_op *op = (void *)(reqhead + 1);
+	u64 orig_len = len;
+	u64 objoff, objlen;    /* extent in object */
+
+	reqhead->snapid = cpu_to_le64(snapid);
+
+	/* object extent? */
+	ceph_calc_file_object_mapping(layout, off, &len, bno,
+				      &objoff, &objlen);
+	if (len < orig_len)
+		dout(" skipping last %llu, final file extent %llu~%llu\n",
+		     orig_len - len, off, len);
+
+	op->extent.offset = cpu_to_le64(objoff);
+	op->extent.length = cpu_to_le64(objlen);
+	req->r_num_pages = calc_pages_for(off, len);
+
+	dout("calc_layout bno=%llx %llu~%llu (%d pages)\n",
+	     *bno, objoff, objlen, req->r_num_pages);
+
+}
+
 /*
  * Implement client access to distributed object storage cluster.
  *
@@ -48,34 +77,17 @@ static void kick_requests(struct ceph_osd_client *osdc, struct ceph_osd *osd);
  * fill osd op in request message.
  */
 static void calc_layout(struct ceph_osd_client *osdc,
-			struct ceph_vino vino, struct ceph_file_layout *layout,
+			struct ceph_vino vino,
+			struct ceph_file_layout *layout,
 			u64 off, u64 *plen,
 			struct ceph_osd_request *req)
 {
-	struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
-	struct ceph_osd_op *op = (void *)(reqhead + 1);
-	u64 orig_len = *plen;
-	u64 objoff, objlen;    /* extent in object */
 	u64 bno;
 
-	reqhead->snapid = cpu_to_le64(vino.snap);
-
-	/* object extent? */
-	ceph_calc_file_object_mapping(layout, off, plen, &bno,
-				      &objoff, &objlen);
-	if (*plen < orig_len)
-		dout(" skipping last %llu, final file extent %llu~%llu\n",
-		     orig_len - *plen, off, *plen);
+	ceph_calc_raw_layout(osdc, layout, vino.snap, off, *plen, &bno, req);
 
 	sprintf(req->r_oid, "%llx.%08llx", vino.ino, bno);
 	req->r_oid_len = strlen(req->r_oid);
-
-	op->extent.offset = cpu_to_le64(objoff);
-	op->extent.length = cpu_to_le64(objlen);
-	req->r_num_pages = calc_pages_for(off, *plen);
-
-	dout("calc_layout %s (%d) %llu~%llu (%d pages)\n",
-	     req->r_oid, req->r_oid_len, objoff, objlen, req->r_num_pages);
 }
 
 /*
@@ -108,43 +120,34 @@ void ceph_osdc_release_request(struct kref *kref)
 		kfree(req);
 }
 
-/*
- * build new request AND message, calculate layout, and adjust file
- * extent as needed.
- *
- * if the file was recently truncated, we include information about its
- * old and new size so that the object can be updated appropriately.  (we
- * avoid synchronously deleting truncated objects because it's slow.)
- *
- * if @do_sync, include a 'startsync' command so that the osd will flush
- * data quickly.
- */
-struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
-					       struct ceph_file_layout *layout,
-					       struct ceph_vino vino,
-					       u64 off, u64 *plen,
-					       int opcode, int flags,
+struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
+					       int flags,
 					       struct ceph_snap_context *snapc,
 					       int do_sync,
-					       u32 truncate_seq,
-					       u64 truncate_size,
-					       struct timespec *mtime,
-					       bool use_mempool, int num_reply)
+					       bool use_mempool,
+					       gfp_t gfp_flags,
+					       struct page **pages)
 {
 	struct ceph_osd_request *req;
 	struct ceph_msg *msg;
-	struct ceph_osd_request_head *head;
-	struct ceph_osd_op *op;
-	void *p;
 	int num_op = 1 + do_sync;
-	size_t msg_size = sizeof(*head) + num_op*sizeof(*op);
-	int i;
+	size_t msg_size = sizeof(struct ceph_osd_request_head) +
+			  num_op*sizeof(struct ceph_osd_op);
+
+	if (use_mempool) {
+		req = mempool_alloc(osdc->req_mempool, gfp_flags);
+		memset(req, 0, sizeof(*req));
+	} else {
+		req = kzalloc(sizeof(*req), gfp_flags);
+	}
+	if (!req)
+		return NULL;
 
 	if (use_mempool) {
-		req = mempool_alloc(osdc->req_mempool, GFP_NOFS);
+		req = mempool_alloc(osdc->req_mempool, gfp_flags);
 		memset(req, 0, sizeof(*req));
 	} else {
-		req = kzalloc(sizeof(*req), GFP_NOFS);
+		req = kzalloc(sizeof(*req), gfp_flags);
 	}
 	if (req == NULL)
 		return NULL;
@@ -164,7 +167,7 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
 		msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0);
 	else
 		msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY,
-				   OSD_OPREPLY_FRONT_LEN, GFP_NOFS);
+				   OSD_OPREPLY_FRONT_LEN, gfp_flags);
 	if (!msg) {
 		ceph_osdc_put_request(req);
 		return NULL;
@@ -178,18 +181,48 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
 	if (use_mempool)
 		msg = ceph_msgpool_get(&osdc->msgpool_op, 0);
 	else
-		msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, GFP_NOFS);
+		msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, gfp_flags);
 	if (!msg) {
 		ceph_osdc_put_request(req);
 		return NULL;
 	}
 	msg->hdr.type = cpu_to_le16(CEPH_MSG_OSD_OP);
 	memset(msg->front.iov_base, 0, msg->front.iov_len);
+
+	req->r_request = msg;
+	req->r_pages = pages;
+
+	return req;
+}
+
+/*
+ * build new request AND message
+ *
+ */
+void ceph_osdc_build_request(struct ceph_osd_request *req,
+			    u64 off, u64 *plen,
+			    int opcode,
+			    struct ceph_snap_context *snapc,
+			    int do_sync,
+			    u32 truncate_seq,
+			    u64 truncate_size,
+			    struct timespec *mtime,
+			    const char *oid,
+			    int oid_len)
+{
+	struct ceph_msg *msg = req->r_request;
+	struct ceph_osd_request_head *head;
+	struct ceph_osd_op *op;
+	void *p;
+	int num_op = 1 + do_sync;
+	size_t msg_size = sizeof(*head) + num_op*sizeof(*op);
+	int i;
+	int flags = req->r_flags;
+
 	head = msg->front.iov_base;
 	op = (void *)(head + 1);
 	p = (void *)(op + num_op);
 
-	req->r_request = msg;
 	req->r_snapc = ceph_get_snap_context(snapc);
 
 	head->client_inc = cpu_to_le32(1); /* always, for now. */
@@ -199,10 +232,6 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
 	head->num_ops = cpu_to_le16(num_op);
 	op->op = cpu_to_le16(opcode);
 
-	/* calculate max write size */
-	calc_layout(osdc, vino, layout, off, plen, req);
-	req->r_file_layout = *layout;  /* keep a copy */
-
 	if (flags & CEPH_OSD_FLAG_WRITE) {
 		req->r_request->hdr.data_off = cpu_to_le16(off);
 		req->r_request->hdr.data_len = cpu_to_le32(*plen);
@@ -212,9 +241,9 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
 	op->extent.truncate_seq = cpu_to_le32(truncate_seq);
 
 	/* fill in oid */
-	head->object_len = cpu_to_le32(req->r_oid_len);
-	memcpy(p, req->r_oid, req->r_oid_len);
-	p += req->r_oid_len;
+	head->object_len = cpu_to_le32(oid_len);
+	memcpy(p, oid, oid_len);
+	p += oid_len;
 
 	if (do_sync) {
 		op++;
@@ -233,6 +262,50 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
 	msg_size = p - msg->front.iov_base;
 	msg->front.iov_len = msg_size;
 	msg->hdr.front_len = cpu_to_le32(msg_size);
+	return;
+}
+
+/*
+ * build new request AND message, calculate layout, and adjust file
+ * extent as needed.
+ *
+ * if the file was recently truncated, we include information about its
+ * old and new size so that the object can be updated appropriately.  (we
+ * avoid synchronously deleting truncated objects because it's slow.)
+ *
+ * if @do_sync, include a 'startsync' command so that the osd will flush
+ * data quickly.
+ */
+struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
+					       struct ceph_file_layout *layout,
+					       struct ceph_vino vino,
+					       u64 off, u64 *plen,
+					       int opcode, int flags,
+					       struct ceph_snap_context *snapc,
+					       int do_sync,
+					       u32 truncate_seq,
+					       u64 truncate_size,
+					       struct timespec *mtime,
+					       bool use_mempool, int num_reply)
+{
+	struct ceph_osd_request *req =
+		ceph_osdc_alloc_request(osdc, flags,
+					 snapc, do_sync,
+					 use_mempool,
+					 GFP_NOFS, NULL);
+	if (IS_ERR(req))
+		return req;
+
+	/* calculate max write size */
+	calc_layout(osdc, vino, layout, off, plen, req);
+	req->r_file_layout = *layout;  /* keep a copy */
+
+	ceph_osdc_build_request(req, off, plen, opcode,
+				snapc, do_sync,
+				truncate_seq, truncate_size,
+				mtime,
+				req->r_oid, req->r_oid_len);
+
 	return req;
 }
 
diff --git a/fs/ceph/osd_client.h b/fs/ceph/osd_client.h
index ce77698..b687c2e 100644
--- a/fs/ceph/osd_client.h
+++ b/fs/ceph/osd_client.h
@@ -119,6 +119,31 @@ extern void ceph_osdc_handle_reply(struct ceph_osd_client *osdc,
 extern void ceph_osdc_handle_map(struct ceph_osd_client *osdc,
 				 struct ceph_msg *msg);
 
+extern void ceph_calc_raw_layout(struct ceph_osd_client *osdc,
+			struct ceph_file_layout *layout,
+			u64 snapid,
+			u64 off, u64 len, u64 *bno,
+			struct ceph_osd_request *req);
+
+extern struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
+					       int flags,
+					       struct ceph_snap_context *snapc,
+					       int do_sync,
+					       bool use_mempool,
+					       gfp_t gfp_flags,
+					       struct page **pages);
+
+extern void ceph_osdc_build_request(struct ceph_osd_request *req,
+			    u64 off, u64 *plen,
+			    int opcode,
+			    struct ceph_snap_context *snapc,
+			    int do_sync,
+			    u32 truncate_seq,
+			    u64 truncate_size,
+			    struct timespec *mtime,
+			    const char *oid,
+			    int oid_len);
+
 extern struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *,
 				      struct ceph_file_layout *layout,
 				      struct ceph_vino vino,
-- 
1.7.0


^ permalink raw reply related	[flat|nested] 14+ messages in thread

* [PATCH 3/8] ceph-rbd: messenger and osdc changes for rbd
  2010-08-13 17:40 [PATCH 0/8] rados block device and ceph refactor Sage Weil
  2010-08-13 17:40 ` [PATCH 1/8] ceph-rbd: lookup pool in osdmap by name Sage Weil
  2010-08-13 17:40 ` [PATCH 2/8] ceph-rbd: refactor osdc requests creation functions Sage Weil
@ 2010-08-13 17:40 ` Sage Weil
  2010-08-13 17:40 ` [PATCH 4/8] ceph-rbd: enable creation of clients that don't need mds Sage Weil
                   ` (4 subsequent siblings)
  7 siblings, 0 replies; 14+ messages in thread
From: Sage Weil @ 2010-08-13 17:40 UTC (permalink / raw)
  To: linux-kernel, linux-fsdevel, linux-scsi
  Cc: ceph-devel, hch, akpm, yehuda, Sage Weil

From: Yehuda Sadeh <yehuda@hq.newdream.net>

The messenger can send/receive data in bio.  This was added
so that we wouldn't need to copy the data from the rados
block device.

We can now have trailing variable sized data for osd
ops. Also osd ops encoding is more modular.

Signed-off-by: Yehuda Sadeh <yehuda@hq.newdream.net>
Signed-off-by: Sage Weil <sage@newdream.net>
---
 fs/ceph/messenger.c  |  219 +++++++++++++++++++++++++++++++++++++-------
 fs/ceph/messenger.h  |    4 +
 fs/ceph/osd_client.c |  249 +++++++++++++++++++++++++++++++++++++++-----------
 fs/ceph/osd_client.h |   61 ++++++++++---
 fs/ceph/pagelist.c   |    2 +-
 fs/ceph/pagelist.h   |    2 +-
 6 files changed, 436 insertions(+), 101 deletions(-)

diff --git a/fs/ceph/messenger.c b/fs/ceph/messenger.c
index 2502d76..17a09b3 100644
--- a/fs/ceph/messenger.c
+++ b/fs/ceph/messenger.c
@@ -9,6 +9,8 @@
 #include <linux/slab.h>
 #include <linux/socket.h>
 #include <linux/string.h>
+#include <linux/bio.h>
+#include <linux/blkdev.h>
 #include <net/tcp.h>
 
 #include "super.h"
@@ -529,8 +531,11 @@ static void prepare_write_message(struct ceph_connection *con)
 	if (le32_to_cpu(m->hdr.data_len) > 0) {
 		/* initialize page iterator */
 		con->out_msg_pos.page = 0;
-		con->out_msg_pos.page_pos =
-			le16_to_cpu(m->hdr.data_off) & ~PAGE_MASK;
+		if (m->pages)
+			con->out_msg_pos.page_pos =
+				le16_to_cpu(m->hdr.data_off) & ~PAGE_MASK;
+		else
+			con->out_msg_pos.page_pos = 0;
 		con->out_msg_pos.data_pos = 0;
 		con->out_msg_pos.did_page_crc = 0;
 		con->out_more = 1;  /* data + footer will follow */
@@ -712,6 +717,31 @@ out:
 	return ret;  /* done! */
 }
 
+#ifdef CONFIG_BLOCK
+static void init_bio_iter(struct bio *bio, struct bio **iter, int *seg)
+{
+	if (!bio) {
+		*iter = NULL;
+		*seg = 0;
+		return;
+	}
+	*iter = bio;
+	*seg = bio->bi_idx;
+}
+
+static void iter_bio_next(struct bio **bio_iter, int *seg)
+{
+	if (*bio_iter == NULL)
+		return;
+
+	BUG_ON(*seg >= (*bio_iter)->bi_vcnt);
+
+	(*seg)++;
+	if (*seg == (*bio_iter)->bi_vcnt)
+		init_bio_iter((*bio_iter)->bi_next, bio_iter, seg);
+}
+#endif
+
 /*
  * Write as much message data payload as we can.  If we finish, queue
  * up the footer.
@@ -726,21 +756,46 @@ static int write_partial_msg_pages(struct ceph_connection *con)
 	size_t len;
 	int crc = con->msgr->nocrc;
 	int ret;
+	int total_max_write;
+	int in_trail = 0;
+	size_t trail_len = (msg->trail ? msg->trail->length : 0);
 
 	dout("write_partial_msg_pages %p msg %p page %d/%d offset %d\n",
 	     con, con->out_msg, con->out_msg_pos.page, con->out_msg->nr_pages,
 	     con->out_msg_pos.page_pos);
 
-	while (con->out_msg_pos.page < con->out_msg->nr_pages) {
+#ifdef CONFIG_BLOCK
+	if (msg->bio && !msg->bio_iter)
+		init_bio_iter(msg->bio, &msg->bio_iter, &msg->bio_seg);
+#endif
+
+	while (data_len > con->out_msg_pos.data_pos) {
 		struct page *page = NULL;
 		void *kaddr = NULL;
+		int max_write = PAGE_SIZE;
+		int page_shift = 0;
+
+		total_max_write = data_len - trail_len -
+			con->out_msg_pos.data_pos;
 
 		/*
 		 * if we are calculating the data crc (the default), we need
 		 * to map the page.  if our pages[] has been revoked, use the
 		 * zero page.
 		 */
-		if (msg->pages) {
+
+		/* have we reached the trail part of the data? */
+		if (con->out_msg_pos.data_pos >= data_len - trail_len) {
+			in_trail = 1;
+
+			total_max_write = data_len - con->out_msg_pos.data_pos;
+
+			page = list_first_entry(&msg->trail->head,
+						struct page, lru);
+			if (crc)
+				kaddr = kmap(page);
+			max_write = PAGE_SIZE;
+		} else if (msg->pages) {
 			page = msg->pages[con->out_msg_pos.page];
 			if (crc)
 				kaddr = kmap(page);
@@ -749,13 +804,25 @@ static int write_partial_msg_pages(struct ceph_connection *con)
 						struct page, lru);
 			if (crc)
 				kaddr = kmap(page);
+#ifdef CONFIG_BLOCK
+		} else if (msg->bio) {
+			struct bio_vec *bv;
+
+			bv = bio_iovec_idx(msg->bio_iter, msg->bio_seg);
+			page = bv->bv_page;
+			page_shift = bv->bv_offset;
+			if (crc)
+				kaddr = kmap(page) + page_shift;
+			max_write = bv->bv_len;
+#endif
 		} else {
 			page = con->msgr->zero_page;
 			if (crc)
 				kaddr = page_address(con->msgr->zero_page);
 		}
-		len = min((int)(PAGE_SIZE - con->out_msg_pos.page_pos),
-			  (int)(data_len - con->out_msg_pos.data_pos));
+		len = min_t(int, max_write - con->out_msg_pos.page_pos,
+			    total_max_write);
+
 		if (crc && !con->out_msg_pos.did_page_crc) {
 			void *base = kaddr + con->out_msg_pos.page_pos;
 			u32 tmpcrc = le32_to_cpu(con->out_msg->footer.data_crc);
@@ -765,13 +832,14 @@ static int write_partial_msg_pages(struct ceph_connection *con)
 				cpu_to_le32(crc32c(tmpcrc, base, len));
 			con->out_msg_pos.did_page_crc = 1;
 		}
-
 		ret = kernel_sendpage(con->sock, page,
-				      con->out_msg_pos.page_pos, len,
+				      con->out_msg_pos.page_pos + page_shift,
+				      len,
 				      MSG_DONTWAIT | MSG_NOSIGNAL |
 				      MSG_MORE);
 
-		if (crc && (msg->pages || msg->pagelist))
+		if (crc &&
+		    (msg->pages || msg->pagelist || msg->bio || in_trail))
 			kunmap(page);
 
 		if (ret <= 0)
@@ -783,9 +851,16 @@ static int write_partial_msg_pages(struct ceph_connection *con)
 			con->out_msg_pos.page_pos = 0;
 			con->out_msg_pos.page++;
 			con->out_msg_pos.did_page_crc = 0;
-			if (msg->pagelist)
+			if (in_trail)
+				list_move_tail(&page->lru,
+					       &msg->trail->head);
+			else if (msg->pagelist)
 				list_move_tail(&page->lru,
 					       &msg->pagelist->head);
+#ifdef CONFIG_BLOCK
+			else if (msg->bio)
+				iter_bio_next(&msg->bio_iter, &msg->bio_seg);
+#endif
 		}
 	}
 
@@ -1305,8 +1380,7 @@ static int read_partial_message_section(struct ceph_connection *con,
 					struct kvec *section,
 					unsigned int sec_len, u32 *crc)
 {
-	int left;
-	int ret;
+	int ret, left;
 
 	BUG_ON(!section);
 
@@ -1329,13 +1403,83 @@ static int read_partial_message_section(struct ceph_connection *con,
 static struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con,
 				struct ceph_msg_header *hdr,
 				int *skip);
+
+
+static int read_partial_message_pages(struct ceph_connection *con,
+				      struct page **pages,
+				      unsigned data_len, int datacrc)
+{
+	void *p;
+	int ret;
+	int left;
+
+	left = min((int)(data_len - con->in_msg_pos.data_pos),
+		   (int)(PAGE_SIZE - con->in_msg_pos.page_pos));
+	/* (page) data */
+	BUG_ON(pages == NULL);
+	p = kmap(pages[con->in_msg_pos.page]);
+	ret = ceph_tcp_recvmsg(con->sock, p + con->in_msg_pos.page_pos,
+			       left);
+	if (ret > 0 && datacrc)
+		con->in_data_crc =
+			crc32c(con->in_data_crc,
+				  p + con->in_msg_pos.page_pos, ret);
+	kunmap(pages[con->in_msg_pos.page]);
+	if (ret <= 0)
+		return ret;
+	con->in_msg_pos.data_pos += ret;
+	con->in_msg_pos.page_pos += ret;
+	if (con->in_msg_pos.page_pos == PAGE_SIZE) {
+		con->in_msg_pos.page_pos = 0;
+		con->in_msg_pos.page++;
+	}
+
+	return ret;
+}
+
+#ifdef CONFIG_BLOCK
+static int read_partial_message_bio(struct ceph_connection *con,
+				    struct bio **bio_iter, int *bio_seg,
+				    unsigned data_len, int datacrc)
+{
+	struct bio_vec *bv = bio_iovec_idx(*bio_iter, *bio_seg);
+	void *p;
+	int ret, left;
+
+	if (IS_ERR(bv))
+		return PTR_ERR(bv);
+
+	left = min((int)(data_len - con->in_msg_pos.data_pos),
+		   (int)(bv->bv_len - con->in_msg_pos.page_pos));
+
+	p = kmap(bv->bv_page) + bv->bv_offset;
+
+	ret = ceph_tcp_recvmsg(con->sock, p + con->in_msg_pos.page_pos,
+			       left);
+	if (ret > 0 && datacrc)
+		con->in_data_crc =
+			crc32c(con->in_data_crc,
+				  p + con->in_msg_pos.page_pos, ret);
+	kunmap(bv->bv_page);
+	if (ret <= 0)
+		return ret;
+	con->in_msg_pos.data_pos += ret;
+	con->in_msg_pos.page_pos += ret;
+	if (con->in_msg_pos.page_pos == bv->bv_len) {
+		con->in_msg_pos.page_pos = 0;
+		iter_bio_next(bio_iter, bio_seg);
+	}
+
+	return ret;
+}
+#endif
+
 /*
  * read (part of) a message.
  */
 static int read_partial_message(struct ceph_connection *con)
 {
 	struct ceph_msg *m = con->in_msg;
-	void *p;
 	int ret;
 	int to, left;
 	unsigned front_len, middle_len, data_len, data_off;
@@ -1422,7 +1566,10 @@ static int read_partial_message(struct ceph_connection *con)
 			m->middle->vec.iov_len = 0;
 
 		con->in_msg_pos.page = 0;
-		con->in_msg_pos.page_pos = data_off & ~PAGE_MASK;
+		if (m->pages)
+			con->in_msg_pos.page_pos = data_off & ~PAGE_MASK;
+		else
+			con->in_msg_pos.page_pos = 0;
 		con->in_msg_pos.data_pos = 0;
 	}
 
@@ -1440,27 +1587,29 @@ static int read_partial_message(struct ceph_connection *con)
 		if (ret <= 0)
 			return ret;
 	}
+#ifdef CONFIG_BLOCK
+	if (m->bio && !m->bio_iter)
+		init_bio_iter(m->bio, &m->bio_iter, &m->bio_seg);
+#endif
 
 	/* (page) data */
 	while (con->in_msg_pos.data_pos < data_len) {
-		left = min((int)(data_len - con->in_msg_pos.data_pos),
-			   (int)(PAGE_SIZE - con->in_msg_pos.page_pos));
-		BUG_ON(m->pages == NULL);
-		p = kmap(m->pages[con->in_msg_pos.page]);
-		ret = ceph_tcp_recvmsg(con->sock, p + con->in_msg_pos.page_pos,
-				       left);
-		if (ret > 0 && datacrc)
-			con->in_data_crc =
-				crc32c(con->in_data_crc,
-					  p + con->in_msg_pos.page_pos, ret);
-		kunmap(m->pages[con->in_msg_pos.page]);
-		if (ret <= 0)
-			return ret;
-		con->in_msg_pos.data_pos += ret;
-		con->in_msg_pos.page_pos += ret;
-		if (con->in_msg_pos.page_pos == PAGE_SIZE) {
-			con->in_msg_pos.page_pos = 0;
-			con->in_msg_pos.page++;
+		if (m->pages) {
+			ret = read_partial_message_pages(con, m->pages,
+						 data_len, datacrc);
+			if (ret <= 0)
+				return ret;
+#ifdef CONFIG_BLOCK
+		} else if (m->bio) {
+
+			ret = read_partial_message_bio(con,
+						 &m->bio_iter, &m->bio_seg,
+						 data_len, datacrc);
+			if (ret <= 0)
+				return ret;
+#endif
+		} else {
+			BUG_ON(1);
 		}
 	}
 
@@ -2136,6 +2285,10 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags)
 	m->nr_pages = 0;
 	m->pages = NULL;
 	m->pagelist = NULL;
+	m->bio = NULL;
+	m->bio_iter = NULL;
+	m->bio_seg = 0;
+	m->trail = NULL;
 
 	dout("ceph_msg_new %p front %d\n", m, front_len);
 	return m;
@@ -2250,6 +2403,8 @@ void ceph_msg_last_put(struct kref *kref)
 		m->pagelist = NULL;
 	}
 
+	m->trail = NULL;
+
 	if (m->pool)
 		ceph_msgpool_put(m->pool, m);
 	else
diff --git a/fs/ceph/messenger.h b/fs/ceph/messenger.h
index 76fbc95..5a79450 100644
--- a/fs/ceph/messenger.h
+++ b/fs/ceph/messenger.h
@@ -82,6 +82,10 @@ struct ceph_msg {
 	struct ceph_pagelist *pagelist; /* instead of pages */
 	struct list_head list_head;
 	struct kref kref;
+	struct bio  *bio;		/* instead of pages/pagelist */
+	struct bio  *bio_iter;		/* bio iterator */
+	int bio_seg;			/* current bio segment */
+	struct ceph_pagelist *trail;	/* the trailing part of the data */
 	bool front_is_vmalloc;
 	bool more_to_follow;
 	bool needs_out_seq;
diff --git a/fs/ceph/osd_client.c b/fs/ceph/osd_client.c
index ce7f7e0..8b1d0a6 100644
--- a/fs/ceph/osd_client.c
+++ b/fs/ceph/osd_client.c
@@ -6,12 +6,16 @@
 #include <linux/pagemap.h>
 #include <linux/slab.h>
 #include <linux/uaccess.h>
+#ifdef CONFIG_BLOCK
+#include <linux/bio.h>
+#endif
 
 #include "super.h"
 #include "osd_client.h"
 #include "messenger.h"
 #include "decode.h"
 #include "auth.h"
+#include "pagelist.h"
 
 #define OSD_OP_FRONT_LEN	4096
 #define OSD_OPREPLY_FRONT_LEN	512
@@ -22,29 +26,50 @@ static int __kick_requests(struct ceph_osd_client *osdc,
 
 static void kick_requests(struct ceph_osd_client *osdc, struct ceph_osd *osd);
 
+static int op_needs_trail(int op)
+{
+	switch (op) {
+	case CEPH_OSD_OP_GETXATTR:
+	case CEPH_OSD_OP_SETXATTR:
+	case CEPH_OSD_OP_CMPXATTR:
+	case CEPH_OSD_OP_CALL:
+		return 1;
+	default:
+		return 0;
+	}
+}
+
+static int op_has_extent(int op)
+{
+	return (op == CEPH_OSD_OP_READ ||
+		op == CEPH_OSD_OP_WRITE);
+}
+
 void ceph_calc_raw_layout(struct ceph_osd_client *osdc,
 			struct ceph_file_layout *layout,
 			u64 snapid,
-			u64 off, u64 len, u64 *bno,
-			struct ceph_osd_request *req)
+			u64 off, u64 *plen, u64 *bno,
+			struct ceph_osd_request *req,
+			struct ceph_osd_req_op *op)
 {
 	struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
-	struct ceph_osd_op *op = (void *)(reqhead + 1);
-	u64 orig_len = len;
+	u64 orig_len = *plen;
 	u64 objoff, objlen;    /* extent in object */
 
 	reqhead->snapid = cpu_to_le64(snapid);
 
 	/* object extent? */
-	ceph_calc_file_object_mapping(layout, off, &len, bno,
+	ceph_calc_file_object_mapping(layout, off, plen, bno,
 				      &objoff, &objlen);
-	if (len < orig_len)
+	if (*plen < orig_len)
 		dout(" skipping last %llu, final file extent %llu~%llu\n",
-		     orig_len - len, off, len);
+		     orig_len - *plen, off, *plen);
 
-	op->extent.offset = cpu_to_le64(objoff);
-	op->extent.length = cpu_to_le64(objlen);
-	req->r_num_pages = calc_pages_for(off, len);
+	if (op_has_extent(op->op)) {
+		op->extent.offset = objoff;
+		op->extent.length = objlen;
+	}
+	req->r_num_pages = calc_pages_for(off, *plen);
 
 	dout("calc_layout bno=%llx %llu~%llu (%d pages)\n",
 	     *bno, objoff, objlen, req->r_num_pages);
@@ -80,11 +105,13 @@ static void calc_layout(struct ceph_osd_client *osdc,
 			struct ceph_vino vino,
 			struct ceph_file_layout *layout,
 			u64 off, u64 *plen,
-			struct ceph_osd_request *req)
+			struct ceph_osd_request *req,
+			struct ceph_osd_req_op *op)
 {
 	u64 bno;
 
-	ceph_calc_raw_layout(osdc, layout, vino.snap, off, *plen, &bno, req);
+	ceph_calc_raw_layout(osdc, layout, vino.snap, off,
+			     plen, &bno, req, op);
 
 	sprintf(req->r_oid, "%llx.%08llx", vino.ino, bno);
 	req->r_oid_len = strlen(req->r_oid);
@@ -113,35 +140,64 @@ void ceph_osdc_release_request(struct kref *kref)
 	if (req->r_own_pages)
 		ceph_release_page_vector(req->r_pages,
 					 req->r_num_pages);
+#ifdef CONFIG_BLOCK
+	if (req->r_bio)
+		bio_put(req->r_bio);
+#endif
 	ceph_put_snap_context(req->r_snapc);
+	if (req->r_trail) {
+		ceph_pagelist_release(req->r_trail);
+		kfree(req->r_trail);
+	}
 	if (req->r_mempool)
 		mempool_free(req, req->r_osdc->req_mempool);
 	else
 		kfree(req);
 }
 
+static int op_needs_trail(int op)
+{
+	switch (op) {
+	case CEPH_OSD_OP_GETXATTR:
+	case CEPH_OSD_OP_SETXATTR:
+	case CEPH_OSD_OP_CMPXATTR:
+		return 1;
+	default:
+		return 0;
+	}
+}
+
+static int get_num_ops(struct ceph_osd_req_op *ops, int *needs_trail)
+{
+	int i = 0;
+
+	if (needs_trail)
+		*needs_trail = 0;
+	while (ops[i].op) {
+		if (needs_trail && op_needs_trail(ops[i].op))
+			*needs_trail = 1;
+		i++;
+	}
+
+	return i;
+}
+
 struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
 					       int flags,
 					       struct ceph_snap_context *snapc,
-					       int do_sync,
+					       struct ceph_osd_req_op *ops,
 					       bool use_mempool,
 					       gfp_t gfp_flags,
-					       struct page **pages)
+					       struct page **pages,
+					       struct bio *bio)
 {
 	struct ceph_osd_request *req;
 	struct ceph_msg *msg;
-	int num_op = 1 + do_sync;
-	size_t msg_size = sizeof(struct ceph_osd_request_head) +
-			  num_op*sizeof(struct ceph_osd_op);
+	int needs_trail;
+	int num_op = get_num_ops(ops, &needs_trail);
+	size_t msg_size = sizeof(struct ceph_osd_request_head);
 
-	if (use_mempool) {
-		req = mempool_alloc(osdc->req_mempool, gfp_flags);
-		memset(req, 0, sizeof(*req));
-	} else {
-		req = kzalloc(sizeof(*req), gfp_flags);
-	}
-	if (!req)
-		return NULL;
+	msg_size += num_op*sizeof(struct ceph_osd_op);
 
 	if (use_mempool) {
 		req = mempool_alloc(osdc->req_mempool, gfp_flags);
@@ -154,6 +210,7 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
 
 	req->r_osdc = osdc;
 	req->r_mempool = use_mempool;
+
 	kref_init(&req->r_kref);
 	init_completion(&req->r_completion);
 	init_completion(&req->r_safe_completion);
@@ -174,6 +231,15 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
 	}
 	req->r_reply = msg;
 
+	/* allocate space for the trailing data */
+	if (needs_trail) {
+		req->r_trail = kmalloc(sizeof(struct ceph_pagelist), gfp_flags);
+		if (!req->r_trail) {
+			ceph_osdc_put_request(req);
+			return NULL;
+		}
+		ceph_pagelist_init(req->r_trail);
+	}
 	/* create request message; allow space for oid */
 	msg_size += 40;
 	if (snapc)
@@ -186,38 +252,87 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
 		ceph_osdc_put_request(req);
 		return NULL;
 	}
+
 	msg->hdr.type = cpu_to_le16(CEPH_MSG_OSD_OP);
 	memset(msg->front.iov_base, 0, msg->front.iov_len);
 
 	req->r_request = msg;
 	req->r_pages = pages;
+#ifdef CONFIG_BLOCK
+	if (bio) {
+		req->r_bio = bio;
+		bio_get(req->r_bio);
+	}
+#endif
 
 	return req;
 }
 
+static void osd_req_encode_op(struct ceph_osd_request *req,
+			      struct ceph_osd_op *dst,
+			      struct ceph_osd_req_op *src)
+{
+	dst->op = cpu_to_le16(src->op);
+
+	switch (dst->op) {
+	case CEPH_OSD_OP_READ:
+	case CEPH_OSD_OP_WRITE:
+		dst->extent.offset =
+			cpu_to_le64(src->extent.offset);
+		dst->extent.length =
+			cpu_to_le64(src->extent.length);
+		dst->extent.truncate_size =
+			cpu_to_le64(src->extent.truncate_size);
+		dst->extent.truncate_seq =
+			cpu_to_le32(src->extent.truncate_seq);
+		break;
+
+	case CEPH_OSD_OP_GETXATTR:
+	case CEPH_OSD_OP_SETXATTR:
+	case CEPH_OSD_OP_CMPXATTR:
+		BUG_ON(!req->r_trail);
+
+		dst->xattr.name_len = cpu_to_le32(src->xattr.name_len);
+		dst->xattr.value_len = cpu_to_le32(src->xattr.value_len);
+		dst->xattr.cmp_op = src->xattr.cmp_op;
+		dst->xattr.cmp_mode = src->xattr.cmp_mode;
+		ceph_pagelist_append(req->r_trail, src->xattr.name,
+				     src->xattr.name_len);
+		ceph_pagelist_append(req->r_trail, src->xattr.val,
+				     src->xattr.value_len);
+		break;
+	case CEPH_OSD_OP_STARTSYNC:
+		break;
+	default:
+		pr_err("unrecognized osd opcode %d\n", dst->op);
+		WARN_ON(1);
+		break;
+	}
+	dst->payload_len = cpu_to_le32(src->payload_len);
+}
+
 /*
  * build new request AND message
  *
  */
 void ceph_osdc_build_request(struct ceph_osd_request *req,
-			    u64 off, u64 *plen,
-			    int opcode,
-			    struct ceph_snap_context *snapc,
-			    int do_sync,
-			    u32 truncate_seq,
-			    u64 truncate_size,
-			    struct timespec *mtime,
-			    const char *oid,
-			    int oid_len)
+			     u64 off, u64 *plen,
+			     struct ceph_osd_req_op *src_ops,
+			     struct ceph_snap_context *snapc,
+			     struct timespec *mtime,
+			     const char *oid,
+			     int oid_len)
 {
 	struct ceph_msg *msg = req->r_request;
 	struct ceph_osd_request_head *head;
+	struct ceph_osd_req_op *src_op;
 	struct ceph_osd_op *op;
 	void *p;
-	int num_op = 1 + do_sync;
+	int num_op = get_num_ops(src_ops, NULL);
 	size_t msg_size = sizeof(*head) + num_op*sizeof(*op);
-	int i;
 	int flags = req->r_flags;
+	u64 data_len = 0;
+	int i;
 
 	head = msg->front.iov_base;
 	op = (void *)(head + 1);
@@ -230,25 +345,23 @@ void ceph_osdc_build_request(struct ceph_osd_request *req,
 	if (flags & CEPH_OSD_FLAG_WRITE)
 		ceph_encode_timespec(&head->mtime, mtime);
 	head->num_ops = cpu_to_le16(num_op);
-	op->op = cpu_to_le16(opcode);
 
-	if (flags & CEPH_OSD_FLAG_WRITE) {
-		req->r_request->hdr.data_off = cpu_to_le16(off);
-		req->r_request->hdr.data_len = cpu_to_le32(*plen);
-		op->payload_len = cpu_to_le32(*plen);
-	}
-	op->extent.truncate_size = cpu_to_le64(truncate_size);
-	op->extent.truncate_seq = cpu_to_le32(truncate_seq);
 
 	/* fill in oid */
 	head->object_len = cpu_to_le32(oid_len);
 	memcpy(p, oid, oid_len);
 	p += oid_len;
 
-	if (do_sync) {
+	src_op = src_ops;
+	while (src_op->op) {
+		osd_req_encode_op(req, op, src_op);
+		src_op++;
 		op++;
-		op->op = cpu_to_le16(CEPH_OSD_OP_STARTSYNC);
 	}
+
+	if (req->r_trail)
+		data_len += req->r_trail->length;
+
 	if (snapc) {
 		head->snap_seq = cpu_to_le64(snapc->seq);
 		head->num_snaps = cpu_to_le32(snapc->num_snaps);
@@ -258,6 +371,14 @@ void ceph_osdc_build_request(struct ceph_osd_request *req,
 		}
 	}
 
+	if (flags & CEPH_OSD_FLAG_WRITE) {
+		req->r_request->hdr.data_off = cpu_to_le16(off);
+		req->r_request->hdr.data_len = cpu_to_le32(*plen + data_len);
+	} else if (data_len) {
+		req->r_request->hdr.data_off = 0;
+		req->r_request->hdr.data_len = cpu_to_le32(data_len);
+	}
+
 	BUG_ON(p > msg->front.iov_base + msg->front.iov_len);
 	msg_size = p - msg->front.iov_base;
 	msg->front.iov_len = msg_size;
@@ -288,21 +409,34 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
 					       struct timespec *mtime,
 					       bool use_mempool, int num_reply)
 {
-	struct ceph_osd_request *req =
-		ceph_osdc_alloc_request(osdc, flags,
-					 snapc, do_sync,
+	struct ceph_osd_req_op ops[3];
+	struct ceph_osd_request *req;
+
+	ops[0].op = opcode;
+	ops[0].extent.truncate_seq = truncate_seq;
+	ops[0].extent.truncate_size = truncate_size;
+	ops[0].payload_len = 0;
+
+	if (do_sync) {
+		ops[1].op = CEPH_OSD_OP_STARTSYNC;
+		ops[1].payload_len = 0;
+		ops[2].op = 0;
+	} else
+		ops[1].op = 0;
+
+	req = ceph_osdc_alloc_request(osdc, flags,
+					 snapc, ops,
 					 use_mempool,
-					 GFP_NOFS, NULL);
+					 GFP_NOFS, NULL, NULL);
 	if (IS_ERR(req))
 		return req;
 
 	/* calculate max write size */
-	calc_layout(osdc, vino, layout, off, plen, req);
+	calc_layout(osdc, vino, layout, off, plen, req, ops);
 	req->r_file_layout = *layout;  /* keep a copy */
 
-	ceph_osdc_build_request(req, off, plen, opcode,
-				snapc, do_sync,
-				truncate_seq, truncate_size,
+	ceph_osdc_build_request(req, off, plen, ops,
+				snapc,
 				mtime,
 				req->r_oid, req->r_oid_len);
 
@@ -1177,6 +1311,10 @@ int ceph_osdc_start_request(struct ceph_osd_client *osdc,
 
 	req->r_request->pages = req->r_pages;
 	req->r_request->nr_pages = req->r_num_pages;
+#ifdef CONFIG_BLOCK
+	req->r_request->bio = req->r_bio;
+#endif
+	req->r_request->trail = req->r_trail;
 
 	register_request(osdc, req);
 
@@ -1493,6 +1631,9 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
 		}
 		m->pages = req->r_pages;
 		m->nr_pages = req->r_num_pages;
+#ifdef CONFIG_BLOCK
+		m->bio = req->r_bio;
+#endif
 	}
 	*skip = 0;
 	req->r_con_filling_msg = ceph_con_get(con);
diff --git a/fs/ceph/osd_client.h b/fs/ceph/osd_client.h
index b687c2e..d583d1b 100644
--- a/fs/ceph/osd_client.h
+++ b/fs/ceph/osd_client.h
@@ -15,6 +15,7 @@ struct ceph_snap_context;
 struct ceph_osd_request;
 struct ceph_osd_client;
 struct ceph_authorizer;
+struct ceph_pagelist;
 
 /*
  * completion callback for async writepages
@@ -80,6 +81,11 @@ struct ceph_osd_request {
 	struct page     **r_pages;            /* pages for data payload */
 	int               r_pages_from_pool;
 	int               r_own_pages;        /* if true, i own page list */
+#ifdef CONFIG_BLOCK
+	struct bio       *r_bio;	      /* instead of pages */
+#endif
+
+	struct ceph_pagelist *r_trail;	      /* trailing part of the data */
 };
 
 struct ceph_osd_client {
@@ -110,6 +116,36 @@ struct ceph_osd_client {
 	struct ceph_msgpool	msgpool_op_reply;
 };
 
+struct ceph_osd_req_op {
+	u16 op;           /* CEPH_OSD_OP_* */
+	u32 flags;        /* CEPH_OSD_FLAG_* */
+	union {
+		struct {
+			u64 offset, length;
+			u64 truncate_size;
+			u32 truncate_seq;
+		} extent;
+		struct {
+			const char *name;
+			u32 name_len;
+			const char  *val;
+			u32 value_len;
+			__u8 cmp_op;       /* CEPH_OSD_CMPXATTR_OP_* */
+			__u8 cmp_mode;     /* CEPH_OSD_CMPXATTR_MODE_* */
+		} xattr;
+		struct {
+			__u8 class_len;
+			__u8 method_len;
+			__u8 argc;
+			u32 indata_len;
+		} cls;
+		struct {
+			u64 cookie, count;
+		} pgls;
+	};
+	u32 payload_len;
+};
+
 extern int ceph_osdc_init(struct ceph_osd_client *osdc,
 			  struct ceph_client *client);
 extern void ceph_osdc_stop(struct ceph_osd_client *osdc);
@@ -122,27 +158,26 @@ extern void ceph_osdc_handle_map(struct ceph_osd_client *osdc,
 extern void ceph_calc_raw_layout(struct ceph_osd_client *osdc,
 			struct ceph_file_layout *layout,
 			u64 snapid,
-			u64 off, u64 len, u64 *bno,
-			struct ceph_osd_request *req);
+			u64 off, u64 *plen, u64 *bno,
+			struct ceph_osd_request *req,
+			struct ceph_osd_req_op *op);
 
 extern struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
 					       int flags,
 					       struct ceph_snap_context *snapc,
-					       int do_sync,
+					       struct ceph_osd_req_op *ops,
 					       bool use_mempool,
 					       gfp_t gfp_flags,
-					       struct page **pages);
+					       struct page **pages,
+					       struct bio *bio);
 
 extern void ceph_osdc_build_request(struct ceph_osd_request *req,
-			    u64 off, u64 *plen,
-			    int opcode,
-			    struct ceph_snap_context *snapc,
-			    int do_sync,
-			    u32 truncate_seq,
-			    u64 truncate_size,
-			    struct timespec *mtime,
-			    const char *oid,
-			    int oid_len);
+				    u64 off, u64 *plen,
+				    struct ceph_osd_req_op *src_ops,
+				    struct ceph_snap_context *snapc,
+				    struct timespec *mtime,
+				    const char *oid,
+				    int oid_len);
 
 extern struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *,
 				      struct ceph_file_layout *layout,
diff --git a/fs/ceph/pagelist.c b/fs/ceph/pagelist.c
index b6859f4..8170365 100644
--- a/fs/ceph/pagelist.c
+++ b/fs/ceph/pagelist.c
@@ -31,7 +31,7 @@ static int ceph_pagelist_addpage(struct ceph_pagelist *pl)
 	return 0;
 }
 
-int ceph_pagelist_append(struct ceph_pagelist *pl, void *buf, size_t len)
+int ceph_pagelist_append(struct ceph_pagelist *pl, const void *buf, size_t len)
 {
 	while (pl->room < len) {
 		size_t bit = pl->room;
diff --git a/fs/ceph/pagelist.h b/fs/ceph/pagelist.h
index e8a4187..cc9327a 100644
--- a/fs/ceph/pagelist.h
+++ b/fs/ceph/pagelist.h
@@ -19,7 +19,7 @@ static inline void ceph_pagelist_init(struct ceph_pagelist *pl)
 }
 extern int ceph_pagelist_release(struct ceph_pagelist *pl);
 
-extern int ceph_pagelist_append(struct ceph_pagelist *pl, void *d, size_t l);
+extern int ceph_pagelist_append(struct ceph_pagelist *pl, const void *d, size_t l);
 
 static inline int ceph_pagelist_encode_64(struct ceph_pagelist *pl, u64 v)
 {
-- 
1.7.0


^ permalink raw reply related	[flat|nested] 14+ messages in thread

* [PATCH 4/8] ceph-rbd: enable creation of clients that don't need mds
  2010-08-13 17:40 [PATCH 0/8] rados block device and ceph refactor Sage Weil
                   ` (2 preceding siblings ...)
  2010-08-13 17:40 ` [PATCH 3/8] ceph-rbd: messenger and osdc changes for rbd Sage Weil
@ 2010-08-13 17:40 ` Sage Weil
  2010-08-13 17:40 ` [PATCH 5/8] ceph-rbd: refactor mount related functions, add helpers Sage Weil
                   ` (3 subsequent siblings)
  7 siblings, 0 replies; 14+ messages in thread
From: Sage Weil @ 2010-08-13 17:40 UTC (permalink / raw)
  To: linux-kernel, linux-fsdevel, linux-scsi
  Cc: ceph-devel, hch, akpm, yehuda, Sage Weil

From: Yehuda Sadeh <yehuda@hq.newdream.net>

Preparing grounds for rbd that doesn't need mds client.

Signed-off-by: Yehuda Sadeh <yehuda@hq.newdream.net>
Signed-off-by: Sage Weil <sage@newdream.net>
---
 fs/ceph/debugfs.c    |   11 ++++++++---
 fs/ceph/mon_client.c |    3 ++-
 fs/ceph/super.c      |   18 ++++++++++++------
 fs/ceph/super.h      |    2 ++
 4 files changed, 24 insertions(+), 10 deletions(-)

diff --git a/fs/ceph/debugfs.c b/fs/ceph/debugfs.c
index 360c4f2..02536f7 100644
--- a/fs/ceph/debugfs.c
+++ b/fs/ceph/debugfs.c
@@ -441,9 +441,14 @@ int ceph_debugfs_client_init(struct ceph_client *client)
 	if (!client->debugfs_congestion_kb)
 		goto out;
 
-	sprintf(name, "../../bdi/%s", dev_name(client->sb->s_bdi->dev));
-	client->debugfs_bdi = debugfs_create_symlink("bdi", client->debugfs_dir,
-						     name);
+	if (client->backing_dev_info.dev) {
+		sprintf(name, "../../bdi/%s",
+			dev_name(client->backing_dev_info.dev));
+		client->debugfs_bdi =
+			debugfs_create_symlink("bdi",
+					       client->debugfs_dir,
+					       name);
+	}
 
 	return 0;
 
diff --git a/fs/ceph/mon_client.c b/fs/ceph/mon_client.c
index b2a5a3e..816a9ce 100644
--- a/fs/ceph/mon_client.c
+++ b/fs/ceph/mon_client.c
@@ -923,7 +923,8 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
 		break;
 
 	case CEPH_MSG_MDS_MAP:
-		ceph_mdsc_handle_map(&monc->client->mdsc, msg);
+		if (monc->client->have_mdsc)
+			ceph_mdsc_handle_map(&monc->client->mdsc, msg);
 		break;
 
 	case CEPH_MSG_OSD_MAP:
diff --git a/fs/ceph/super.c b/fs/ceph/super.c
index 9922628..ff295c9 100644
--- a/fs/ceph/super.c
+++ b/fs/ceph/super.c
@@ -620,7 +620,8 @@ static void destroy_mount_args(struct ceph_mount_args *args)
 /*
  * create a fresh client instance
  */
-static struct ceph_client *ceph_create_client(struct ceph_mount_args *args)
+struct ceph_client *ceph_create_client(struct ceph_mount_args *args,
+				       int need_mdsc)
 {
 	struct ceph_client *client;
 	int err = -ENOMEM;
@@ -674,9 +675,13 @@ static struct ceph_client *ceph_create_client(struct ceph_mount_args *args)
 	err = ceph_osdc_init(&client->osdc, client);
 	if (err < 0)
 		goto fail_monc;
-	err = ceph_mdsc_init(&client->mdsc, client);
-	if (err < 0)
-		goto fail_osdc;
+	if (need_mdsc) {
+		err = ceph_mdsc_init(&client->mdsc, client);
+		if (err < 0)
+			goto fail_osdc;
+		client->have_mdsc = 1;
+	}
+
 	return client;
 
 fail_osdc:
@@ -703,7 +708,8 @@ static void ceph_destroy_client(struct ceph_client *client)
 	dout("destroy_client %p\n", client);
 
 	/* unmount */
-	ceph_mdsc_stop(&client->mdsc);
+	if (client->have_mdsc)
+		ceph_mdsc_stop(&client->mdsc);
 	ceph_osdc_stop(&client->osdc);
 
 	/*
@@ -996,7 +1002,7 @@ static int ceph_get_sb(struct file_system_type *fs_type,
 	}
 
 	/* create client (which we may/may not use) */
-	client = ceph_create_client(args);
+	client = ceph_create_client(args, 1);
 	if (IS_ERR(client)) {
 		err = PTR_ERR(client);
 		goto out_final;
diff --git a/fs/ceph/super.h b/fs/ceph/super.h
index 2482d69..bdf089f 100644
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -145,6 +145,8 @@ struct ceph_client {
 
 	int min_caps;                  /* min caps i added */
 
+	int have_mdsc;
+
 	struct ceph_messenger *msgr;   /* messenger instance */
 	struct ceph_mon_client monc;
 	struct ceph_mds_client mdsc;
-- 
1.7.0


^ permalink raw reply related	[flat|nested] 14+ messages in thread

* [PATCH 5/8] ceph-rbd: refactor mount related functions, add helpers
  2010-08-13 17:40 [PATCH 0/8] rados block device and ceph refactor Sage Weil
                   ` (3 preceding siblings ...)
  2010-08-13 17:40 ` [PATCH 4/8] ceph-rbd: enable creation of clients that don't need mds Sage Weil
@ 2010-08-13 17:40 ` Sage Weil
  2010-08-13 17:40 ` [PATCH 6/8] ceph-rbd: osdc support for osd call and rollback operations Sage Weil
                   ` (2 subsequent siblings)
  7 siblings, 0 replies; 14+ messages in thread
From: Sage Weil @ 2010-08-13 17:40 UTC (permalink / raw)
  To: linux-kernel, linux-fsdevel, linux-scsi
  Cc: ceph-devel, hch, akpm, yehuda, Sage Weil

From: Yehuda Sadeh <yehuda@hq.newdream.net>

Removed some functions' static declarations, separated mount
operation to __open_session and open_root_dentry for clients
that don't need the latter (rbd). Added other helper functions
that will be used later in the rbd.

Signed-off-by: Yehuda Sadeh <yehuda@hq.newdream.net>
Signed-off-by: Sage Weil <sage@newdream.net>
---
 fs/ceph/file.c       |   48 ++++++++++++++++-
 fs/ceph/osd_client.h |    1 +
 fs/ceph/super.c      |  149 +++++++++++++++++++++++++++++++++++++------------
 fs/ceph/super.h      |   27 ++++++++-
 4 files changed, 184 insertions(+), 41 deletions(-)

diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index 8c044a4..d1e57c1 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -317,7 +317,7 @@ void ceph_release_page_vector(struct page **pages, int num_pages)
 /*
  * allocate a vector new pages
  */
-static struct page **ceph_alloc_page_vector(int num_pages, gfp_t flags)
+struct page **ceph_alloc_page_vector(int num_pages, gfp_t flags)
 {
 	struct page **pages;
 	int i;
@@ -363,6 +363,52 @@ static int copy_user_to_page_vector(struct page **pages,
 	return len;
 }
 
+int ceph_copy_to_page_vector(struct page **pages,
+				    const char *data,
+				    loff_t off, size_t len)
+{
+	int i = 0;
+	size_t po = off & ~PAGE_CACHE_MASK;
+	size_t left = len;
+	size_t l;
+
+	while (left > 0) {
+		l = min_t(size_t, PAGE_CACHE_SIZE-po, left);
+		memcpy(page_address(pages[i]) + po, data, l);
+		data += l;
+		left -= l;
+		po += l;
+		if (po == PAGE_CACHE_SIZE) {
+			po = 0;
+			i++;
+		}
+	}
+	return len;
+}
+
+int ceph_copy_from_page_vector(struct page **pages,
+				    char *data,
+				    loff_t off, size_t len)
+{
+	int i = 0;
+	size_t po = off & ~PAGE_CACHE_MASK;
+	size_t left = len;
+	size_t l;
+
+	while (left > 0) {
+		l = min_t(size_t, PAGE_CACHE_SIZE-po, left);
+		memcpy(data, page_address(pages[i]) + po, l);
+		data += l;
+		left -= l;
+		po += l;
+		if (po == PAGE_CACHE_SIZE) {
+			po = 0;
+			i++;
+		}
+	}
+	return len;
+}
+
 /*
  * copy user data from a page vector into a user pointer
  */
diff --git a/fs/ceph/osd_client.h b/fs/ceph/osd_client.h
index d583d1b..0a82bd1 100644
--- a/fs/ceph/osd_client.h
+++ b/fs/ceph/osd_client.h
@@ -69,6 +69,7 @@ struct ceph_osd_request {
 	struct list_head  r_unsafe_item;
 
 	struct inode *r_inode;         	      /* for use by callbacks */
+	void *r_priv;			      /* ditto */
 
 	char              r_oid[40];          /* object name */
 	int               r_oid_len;
diff --git a/fs/ceph/super.c b/fs/ceph/super.c
index ff295c9..c7a9ef4 100644
--- a/fs/ceph/super.c
+++ b/fs/ceph/super.c
@@ -421,14 +421,15 @@ static int parse_fsid(const char *str, struct ceph_fsid *fsid)
 	return err;
 }
 
-static struct ceph_mount_args *parse_mount_args(int flags, char *options,
-						const char *dev_name,
-						const char **path)
+struct ceph_mount_args *parse_mount_args(int flags, char *options,
+					 const char *dev_name,
+					 const char **path)
 {
 	struct ceph_mount_args *args;
 	const char *c;
 	int err = -ENOMEM;
 	substring_t argstr[MAX_OPT_ARGS];
+	const char *end_path;
 
 	args = kzalloc(sizeof(*args), GFP_KERNEL);
 	if (!args)
@@ -460,23 +461,29 @@ static struct ceph_mount_args *parse_mount_args(int flags, char *options,
 	err = -EINVAL;
 	if (!dev_name)
 		goto out;
-	*path = strstr(dev_name, ":/");
-	if (*path == NULL) {
-		pr_err("device name is missing path (no :/ in %s)\n",
-		       dev_name);
-		goto out;
+
+	if (path) {
+		*path = strstr(dev_name, ":/");
+		if (*path == NULL) {
+			pr_err("device name is missing path (no :/ in %s)\n",
+			       dev_name);
+			goto out;
+		}
+		end_path = *path;
+
+		/* path on server */
+		*path += 2;
+		dout("server path '%s'\n", *path);
+	} else {
+		end_path = dev_name + strlen(dev_name);
 	}
 
 	/* get mon ip(s) */
-	err = ceph_parse_ips(dev_name, *path, args->mon_addr,
+	err = ceph_parse_ips(dev_name, end_path, args->mon_addr,
 			     CEPH_MAX_MON, &args->num_mon);
 	if (err < 0)
 		goto out;
 
-	/* path on server */
-	*path += 2;
-	dout("server path '%s'\n", *path);
-
 	/* parse mount options */
 	while ((c = strsep(&options, ",")) != NULL) {
 		int token, intval, ret;
@@ -605,18 +612,60 @@ out:
 	return ERR_PTR(err);
 }
 
-static void destroy_mount_args(struct ceph_mount_args *args)
+void ceph_destroy_mount_args(struct ceph_mount_args *args)
 {
 	dout("destroy_mount_args %p\n", args);
 	kfree(args->snapdir_name);
-	args->snapdir_name = NULL;
 	kfree(args->name);
-	args->name = NULL;
 	kfree(args->secret);
-	args->secret = NULL;
 	kfree(args);
 }
 
+static int strcmp_null(const char *s1, const char *s2)
+{
+	if (!s1 && !s2)
+		return 0;
+	if (s1 && !s2)
+		return -1;
+	if (!s1 && s2)
+		return 1;
+	return strcmp(s1, s2);
+}
+
+int ceph_compare_mount_args(struct ceph_mount_args *new_args,
+			    struct ceph_client *client)
+{
+	struct ceph_mount_args *args1 = new_args;
+	struct ceph_mount_args *args2 = client->mount_args;
+	int ofs = offsetof(struct ceph_mount_args, mon_addr);
+	int i;
+	int ret;
+
+	ret = memcmp(args1, args2, ofs);
+	if (ret)
+		return ret;
+
+	ret = strcmp_null(args1->snapdir_name, args2->snapdir_name);
+	if (ret)
+		return ret;
+
+	ret = strcmp_null(args1->name, args2->name);
+	if (ret)
+		return ret;
+
+	ret = strcmp_null(args1->secret, args2->secret);
+	if (ret)
+		return ret;
+
+	for (i = 0; i < args1->num_mon; i++) {
+		if (ceph_monmap_contains(client->monc.monmap,
+				 &args1->mon_addr[i]))
+			return 0;
+	}
+
+	return -1;
+}
+
 /*
  * create a fresh client instance
  */
@@ -703,7 +752,7 @@ fail:
 	return ERR_PTR(err);
 }
 
-static void ceph_destroy_client(struct ceph_client *client)
+void ceph_destroy_client(struct ceph_client *client)
 {
 	dout("destroy_client %p\n", client);
 
@@ -732,7 +781,7 @@ static void ceph_destroy_client(struct ceph_client *client)
 		ceph_messenger_destroy(client->msgr);
 	mempool_destroy(client->wb_pagevec_pool);
 
-	destroy_mount_args(client->mount_args);
+	ceph_destroy_mount_args(client->mount_args);
 
 	kfree(client);
 	dout("destroy_client %p done\n", client);
@@ -813,17 +862,12 @@ static struct dentry *open_root_dentry(struct ceph_client *client,
 /*
  * mount: join the ceph cluster, and open root directory.
  */
-static int ceph_mount(struct ceph_client *client, struct vfsmount *mnt,
-		      const char *path)
+static int __ceph_open_session(struct ceph_client *client,
+			       unsigned long started)
 {
 	struct ceph_entity_addr *myaddr = NULL;
 	int err;
 	unsigned long timeout = client->mount_args->mount_timeout * HZ;
-	unsigned long started = jiffies;  /* note the start time */
-	struct dentry *root;
-
-	dout("mount start\n");
-	mutex_lock(&client->mount_mutex);
 
 	/* initialize the messenger */
 	if (client->msgr == NULL) {
@@ -831,9 +875,8 @@ static int ceph_mount(struct ceph_client *client, struct vfsmount *mnt,
 			myaddr = &client->mount_args->my_addr;
 		client->msgr = ceph_messenger_create(myaddr);
 		if (IS_ERR(client->msgr)) {
-			err = PTR_ERR(client->msgr);
 			client->msgr = NULL;
-			goto out;
+			return PTR_ERR(client->msgr);
 		}
 		client->msgr->nocrc = ceph_test_opt(client, NOCRC);
 	}
@@ -841,26 +884,58 @@ static int ceph_mount(struct ceph_client *client, struct vfsmount *mnt,
 	/* open session, and wait for mon, mds, and osd maps */
 	err = ceph_monc_open_session(&client->monc);
 	if (err < 0)
-		goto out;
+		return err;
 
 	while (!have_mon_and_osd_map(client)) {
 		err = -EIO;
 		if (timeout && time_after_eq(jiffies, started + timeout))
-			goto out;
+			return err;
 
 		/* wait */
 		dout("mount waiting for mon_map\n");
 		err = wait_event_interruptible_timeout(client->auth_wq,
-		       have_mon_and_osd_map(client) || (client->auth_err < 0),
-		       timeout);
+			have_mon_and_osd_map(client) || (client->auth_err < 0),
+			timeout);
 		if (err == -EINTR || err == -ERESTARTSYS)
-			goto out;
-		if (client->auth_err < 0) {
-			err = client->auth_err;
-			goto out;
-		}
+			return err;
+		if (client->auth_err < 0)
+			return client->auth_err;
 	}
 
+	return 0;
+}
+
+int ceph_open_session(struct ceph_client *client)
+{
+	int ret;
+	unsigned long started = jiffies;  /* note the start time */
+
+	dout("open_session start\n");
+	mutex_lock(&client->mount_mutex);
+
+	ret = __ceph_open_session(client, started);
+
+	mutex_unlock(&client->mount_mutex);
+	return ret;
+}
+
+/*
+ * mount: join the ceph cluster, and open root directory.
+ */
+static int ceph_mount(struct ceph_client *client, struct vfsmount *mnt,
+		      const char *path)
+{
+	int err;
+	unsigned long started = jiffies;  /* note the start time */
+	struct dentry *root;
+
+	dout("mount start\n");
+	mutex_lock(&client->mount_mutex);
+
+	err = __ceph_open_session(client, started);
+	if (err < 0)
+		goto out;
+
 	dout("mount opening root\n");
 	root = open_root_dentry(client, "", started);
 	if (IS_ERR(root)) {
diff --git a/fs/ceph/super.h b/fs/ceph/super.h
index bdf089f..a8e70fc 100644
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -54,14 +54,11 @@
 #define ceph_test_opt(client, opt) \
 	(!!((client)->mount_args->flags & CEPH_OPT_##opt))
 
-
 struct ceph_mount_args {
 	int sb_flags;
 	int flags;
 	struct ceph_fsid fsid;
 	struct ceph_entity_addr my_addr;
-	int num_mon;
-	struct ceph_entity_addr *mon_addr;
 	int mount_timeout;
 	int osd_idle_ttl;
 	int osd_timeout;
@@ -73,6 +70,13 @@ struct ceph_mount_args {
 	int cap_release_safety;
 	int max_readdir;       /* max readdir result (entires) */
 	int max_readdir_bytes; /* max readdir result (bytes) */
+
+	/* any type that can't be simply compared or doesn't need
+	   need to be compared should go beyond this point,
+	   ceph_compare_mount_args() should be updated accordingly */
+	struct ceph_entity_addr *mon_addr; /* should be the first
+					      pointer type of args */
+	int num_mon;
 	char *snapdir_name;   /* default ".snap" */
 	char *name;
 	char *secret;
@@ -747,6 +751,16 @@ extern struct kmem_cache *ceph_file_cachep;
 
 extern const char *ceph_msg_type_name(int type);
 extern int ceph_check_fsid(struct ceph_client *client, struct ceph_fsid *fsid);
+extern struct ceph_mount_args *parse_mount_args(int flags, char *options,
+						const char *dev_name,
+						const char **path);
+extern void ceph_destroy_mount_args(struct ceph_mount_args *args);
+extern int ceph_compare_mount_args(struct ceph_mount_args *new_args,
+			    struct ceph_client *client);
+extern struct ceph_client *ceph_create_client(struct ceph_mount_args *args,
+					      int need_mdsc);
+extern void ceph_destroy_client(struct ceph_client *client);
+extern int ceph_open_session(struct ceph_client *client);
 
 /* inode.c */
 extern const struct inode_operations ceph_file_iops;
@@ -853,6 +867,13 @@ extern int ceph_mmap(struct file *file, struct vm_area_struct *vma);
 /* file.c */
 extern const struct file_operations ceph_file_fops;
 extern const struct address_space_operations ceph_aops;
+extern int ceph_copy_to_page_vector(struct page **pages,
+				    const char *data,
+				    loff_t off, size_t len);
+extern int ceph_copy_from_page_vector(struct page **pages,
+				    char *data,
+				    loff_t off, size_t len);
+extern struct page **ceph_alloc_page_vector(int num_pages, gfp_t flags);
 extern int ceph_open(struct inode *inode, struct file *file);
 extern struct dentry *ceph_lookup_open(struct inode *dir, struct dentry *dentry,
 				       struct nameidata *nd, int mode,
-- 
1.7.0


^ permalink raw reply related	[flat|nested] 14+ messages in thread

* [PATCH 6/8] ceph-rbd: osdc support for osd call and rollback operations
  2010-08-13 17:40 [PATCH 0/8] rados block device and ceph refactor Sage Weil
                   ` (4 preceding siblings ...)
  2010-08-13 17:40 ` [PATCH 5/8] ceph-rbd: refactor mount related functions, add helpers Sage Weil
@ 2010-08-13 17:40 ` Sage Weil
  2010-08-13 17:40 ` [PATCH 8/8] rbd: introduce rados block device (rbd), based on libceph Sage Weil
  2010-08-13 22:37 ` [PATCH 0/8] rados block device and ceph refactor Randy Dunlap
  7 siblings, 0 replies; 14+ messages in thread
From: Sage Weil @ 2010-08-13 17:40 UTC (permalink / raw)
  To: linux-kernel, linux-fsdevel, linux-scsi; +Cc: ceph-devel, hch, akpm, yehuda

From: Yehuda Sadeh <yehuda@hq.newdream.net>

This will be used for rbd snapshots administration.

Signed-off-by: Yehuda Sadeh <yehuda@hq.newdream.net>
---
 fs/ceph/decode.h     |    5 +++++
 fs/ceph/osd_client.c |   18 ++++++++++++++++++
 fs/ceph/osd_client.h |    6 ++++++
 3 files changed, 29 insertions(+), 0 deletions(-)

diff --git a/fs/ceph/decode.h b/fs/ceph/decode.h
index 3d25415..c5b6939 100644
--- a/fs/ceph/decode.h
+++ b/fs/ceph/decode.h
@@ -191,6 +191,11 @@ static inline void ceph_encode_string(void **p, void *end,
 		ceph_encode_need(p, end, n, bad);		\
 		ceph_encode_copy(p, pv, n);			\
 	} while (0)
+#define ceph_encode_string_safe(p, end, s, n, bad)		\
+	do {							\
+		ceph_encode_need(p, end, n, bad);		\
+		ceph_encode_string(p, end, s, n);		\
+	} while (0)
 
 
 #endif
diff --git a/fs/ceph/osd_client.c b/fs/ceph/osd_client.c
index 8b1d0a6..34f6fe6 100644
--- a/fs/ceph/osd_client.c
+++ b/fs/ceph/osd_client.c
@@ -161,6 +161,7 @@ static int op_needs_trail(int op)
 	case CEPH_OSD_OP_GETXATTR:
 	case CEPH_OSD_OP_SETXATTR:
 	case CEPH_OSD_OP_CMPXATTR:
+	case CEPH_OSD_OP_CALL:
 		return 1;
 	default:
 		return 0;
@@ -301,6 +302,23 @@ static void osd_req_encode_op(struct ceph_osd_request *req,
 		ceph_pagelist_append(req->r_trail, src->xattr.val,
 				     src->xattr.value_len);
 		break;
+	case CEPH_OSD_OP_CALL:
+		BUG_ON(!req->r_trail);
+
+		dst->cls.class_len = src->cls.class_len;
+		dst->cls.method_len = src->cls.method_len;
+		dst->cls.indata_len = cpu_to_le32(src->cls.indata_len);
+
+		ceph_pagelist_append(req->r_trail, src->cls.class_name,
+				     src->cls.class_len);
+		ceph_pagelist_append(req->r_trail, src->cls.method_name,
+				     src->cls.method_len);
+		ceph_pagelist_append(req->r_trail, src->cls.indata,
+				     src->cls.indata_len);
+		break;
+	case CEPH_OSD_OP_ROLLBACK:
+		dst->snap.snapid = cpu_to_le64(src->snap.snapid);
+		break;
 	case CEPH_OSD_OP_STARTSYNC:
 		break;
 	default:
diff --git a/fs/ceph/osd_client.h b/fs/ceph/osd_client.h
index 0a82bd1..6c91fb0 100644
--- a/fs/ceph/osd_client.h
+++ b/fs/ceph/osd_client.h
@@ -135,14 +135,20 @@ struct ceph_osd_req_op {
 			__u8 cmp_mode;     /* CEPH_OSD_CMPXATTR_MODE_* */
 		} xattr;
 		struct {
+			const char *class_name;
 			__u8 class_len;
+			const char *method_name;
 			__u8 method_len;
 			__u8 argc;
+			const char *indata;
 			u32 indata_len;
 		} cls;
 		struct {
 			u64 cookie, count;
 		} pgls;
+	        struct {
+		        u64 snapid;
+	        } snap;
 	};
 	u32 payload_len;
 };
-- 
1.7.0


^ permalink raw reply related	[flat|nested] 14+ messages in thread

* [PATCH 8/8] rbd: introduce rados block device (rbd), based on libceph
  2010-08-13 17:40 [PATCH 0/8] rados block device and ceph refactor Sage Weil
                   ` (5 preceding siblings ...)
  2010-08-13 17:40 ` [PATCH 6/8] ceph-rbd: osdc support for osd call and rollback operations Sage Weil
@ 2010-08-13 17:40 ` Sage Weil
  2010-08-14  2:44   ` Randy Dunlap
  2010-08-13 22:37 ` [PATCH 0/8] rados block device and ceph refactor Randy Dunlap
  7 siblings, 1 reply; 14+ messages in thread
From: Sage Weil @ 2010-08-13 17:40 UTC (permalink / raw)
  To: linux-kernel, linux-fsdevel, linux-scsi
  Cc: ceph-devel, hch, akpm, yehuda, Sage Weil

From: Yehuda Sadeh <yehuda@hq.newdream.net>

The rados block device (rbd), based on osdblk, creates a block device
that is backed by objects stored in the Ceph distributed object storage
cluster.  Each device consists of a single metadata object and data
striped over many data objects.

The rbd driver supports read-only snapshots.

Signed-off-by: Yehuda Sadeh <yehuda@hq.newdream.net>
Signed-off-by: Sage Weil <sage@newdream.net>
---
 MAINTAINERS               |    9 +
 drivers/block/Kconfig     |   13 +
 drivers/block/Makefile    |    1 +
 drivers/block/rbd.c       | 1844 +++++++++++++++++++++++++++++++++++++++++++++
 drivers/block/rbd_types.h |   73 ++
 5 files changed, 1940 insertions(+), 0 deletions(-)
 create mode 100644 drivers/block/rbd.c
 create mode 100644 drivers/block/rbd_types.h

diff --git a/MAINTAINERS b/MAINTAINERS
index 5102922..cb34b1b 100644
--- a/MAINTAINERS
+++ b/MAINTAINERS
@@ -4694,6 +4694,15 @@ F:	fs/qnx4/
 F:	include/linux/qnx4_fs.h
 F:	include/linux/qnxtypes.h
 
+RADOS BLOCK DEVICE (RBD)
+F:	include/linux/qnxtypes.h
+M:	Yehuda Sadeh <yehuda@hq.newdream.net>
+M:	Sage Weil <sage@newdream.net>
+M:	ceph-devel@vger.kernel.org
+S:	Supported
+F:	drivers/block/rbd.c
+F:	drivers/block/rbd_types.h
+
 RADEON FRAMEBUFFER DISPLAY DRIVER
 M:	Benjamin Herrenschmidt <benh@kernel.crashing.org>
 L:	linux-fbdev@vger.kernel.org
diff --git a/drivers/block/Kconfig b/drivers/block/Kconfig
index de27768..708104b 100644
--- a/drivers/block/Kconfig
+++ b/drivers/block/Kconfig
@@ -488,4 +488,17 @@ config BLK_DEV_HD
 
 	  If unsure, say N.
 
+config BLK_DEV_RBD
+	tristate "Rados block device (RBD)"
+	select CEPH_LIB
+	default n
+	help
+	  Say Y here if you want include the Rados block device, which stripes
+	  a block device over objects stored in the Ceph distributed object
+	  store.
+
+	  More information at http://ceph.newdream.net/.
+
+	  If unsure, say N.
+
 endif # BLK_DEV
diff --git a/drivers/block/Makefile b/drivers/block/Makefile
index aff5ac9..d7f463d 100644
--- a/drivers/block/Makefile
+++ b/drivers/block/Makefile
@@ -37,5 +37,6 @@ obj-$(CONFIG_BLK_DEV_HD)	+= hd.o
 
 obj-$(CONFIG_XEN_BLKDEV_FRONTEND)	+= xen-blkfront.o
 obj-$(CONFIG_BLK_DEV_DRBD)     += drbd/
+obj-$(CONFIG_BLK_DEV_RBD)     += rbd.o
 
 swim_mod-objs	:= swim.o swim_asm.o
diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
new file mode 100644
index 0000000..f4cd870
--- /dev/null
+++ b/drivers/block/rbd.c
@@ -0,0 +1,1844 @@
+/*
+   rbd.c -- Export ceph rados objects as a Linux block device
+
+
+   based on drivers/block/osdblk.c:
+
+   Copyright 2009 Red Hat, Inc.
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; see the file COPYING.  If not, write to
+   the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
+
+
+
+   Instructions for use
+   --------------------
+
+   1) Map a Linux block device to an existing rbd image.
+
+      Usage: <mon ip addr> <options> <pool name> <rbd image name> [snap name]
+
+      $ echo "192.168.0.1 name=admin rbd foo" > /sys/class/rbd/add
+
+      The snapshot name can be "-" or omitted to map the image read/write.
+
+   2) List all active blkdev<->object mappings.
+
+      In this example, we have performed step #1 twice, creating two blkdevs,
+      mapped to two separate rados objects in the rados rbd pool
+
+      $ cat /sys/class/rbd/list
+      #id     major   client_name     pool    name    snap    KB
+      0       254     client4143      rbd     foo     -      1024000
+
+      The columns, in order, are:
+      - blkdev unique id
+      - blkdev assigned major
+      - rados client id
+      - rados pool name
+      - rados block device name
+      - mapped snapshot ("-" if none)
+      - device size in KB
+
+
+   3) Create a snapshot.
+
+      Usage: <blkdev id> <snapname>
+
+      $ echo "0 mysnap" > /sys/class/rbd/snap_create
+
+
+   4) Listing a snapshot.
+
+      $ cat /sys/class/rbd/snaps_list
+      #id     snap    KB
+      0       -       1024000 (*)
+      0       foo     1024000
+
+      The columns, in order, are:
+      - blkdev unique id
+      - snapshot name, '-' means none (active read/write version)
+      - size of device at time of snapshot
+      - the (*) indicates this is the active version
+
+   5) Rollback to snapshot.
+
+      Usage: <blkdev id> <snapname>
+
+      $ echo "0 mysnap" > /sys/class/rbd/snap_rollback
+
+
+   6) Mapping an image using snapshot.
+
+      A snapshot mapping is read-only. This is being done by passing
+      snap=<snapname> to the options when adding a device.
+
+      $ echo "192.168.0.1 name=admin,snap=mysnap rbd foo" > /sys/class/rbd/add
+
+
+   7) Remove an active blkdev<->rbd image mapping.
+
+      In this example, we remove the mapping with blkdev unique id 1.
+
+      $ echo 1 > /sys/class/rbd/remove
+
+
+   NOTE:  The actual creation and deletion of rados objects is outside the scope
+   of this driver.
+
+ */
+
+#include <linux/ceph/libceph.h>
+#include <linux/ceph/osd_client.h>
+#include <linux/ceph/mon_client.h>
+#include <linux/ceph/decode.h>
+
+#include <linux/kernel.h>
+#include <linux/device.h>
+#include <linux/module.h>
+#include <linux/fs.h>
+#include <linux/blkdev.h>
+
+#include "rbd_types.h"
+
+#define DRV_NAME "rbd"
+#define DRV_NAME_LONG "rbd (rados block device)"
+
+#define RBD_MINORS_PER_MAJOR	256		/* max minors per blkdev */
+
+#define RBD_MAX_MD_NAME_LEN	(96 + sizeof(RBD_SUFFIX))
+#define RBD_MAX_POOL_NAME_LEN	64
+#define RBD_MAX_SNAP_NAME_LEN	32
+#define RBD_MAX_OPT_LEN		1024
+
+#define RBD_SNAP_HEAD_NAME	"-"
+
+#define DEV_NAME_LEN		32
+
+/*
+ * block device image metadata (in-memory version)
+ */
+struct rbd_image_header {
+	u64 image_size;
+	char block_name[32];
+	__u8 obj_order;
+	__u8 crypt_type;
+	__u8 comp_type;
+	struct rw_semaphore snap_rwsem;
+	struct ceph_snap_context *snapc;
+	size_t snap_names_len;
+	u64 snap_seq;
+	u32 total_snaps;
+
+	char *snap_names;
+	u64 *snap_sizes;
+};
+
+/*
+ * an instance of the client.  multiple devices may share a client.
+ */
+struct rbd_client {
+	struct ceph_client	*client;
+	struct kref		kref;
+	struct list_head	node;
+};
+
+/*
+ * a single io request
+ */
+struct rbd_request {
+	struct request		*rq;		/* blk layer request */
+	struct bio		*bio;		/* cloned bio */
+	struct page		**pages;	/* list of used pages */
+	u64			len;
+};
+
+/*
+ * a single device
+ */
+struct rbd_device {
+	int			id;		/* blkdev unique id */
+
+	int			major;		/* blkdev assigned major */
+	struct gendisk		*disk;		/* blkdev's gendisk and rq */
+	struct request_queue	*q;
+
+	struct ceph_client	*client;
+	struct rbd_client	*rbd_client;
+
+	char			name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
+
+	spinlock_t		lock;		/* queue lock */
+
+	struct rbd_image_header	header;
+	char			obj[RBD_MAX_OBJ_NAME_LEN]; /* rbd image name */
+	int			obj_len;
+	char			obj_md_name[RBD_MAX_MD_NAME_LEN]; /* hdr nm. */
+	char			pool_name[RBD_MAX_POOL_NAME_LEN];
+	int			poolid;
+
+	char                    snap_name[RBD_MAX_SNAP_NAME_LEN];
+	u32 cur_snap;	/* index+1 of current snapshot within snap context
+			   0 - for the head */
+	int read_only;
+
+	struct list_head	node;
+};
+
+static spinlock_t node_lock;      /* protects client get/put */
+
+static struct class *class_rbd;	  /* /sys/class/rbd */
+static DEFINE_MUTEX(ctl_mutex);	  /* Serialize open/close/setup/teardown */
+static LIST_HEAD(rbd_dev_list);    /* devices */
+static LIST_HEAD(rbd_client_list);      /* clients */
+
+
+static int rbd_open(struct block_device *bdev, fmode_t mode)
+{
+	struct gendisk *disk = bdev->bd_disk;
+	struct rbd_device *rbd_dev = disk->private_data;
+
+	set_device_ro(bdev, rbd_dev->read_only);
+
+	if ((mode & FMODE_WRITE) && rbd_dev->read_only)
+		return -EROFS;
+
+	return 0;
+}
+
+static const struct block_device_operations rbd_bd_ops = {
+	.owner			= THIS_MODULE,
+	.open			= rbd_open,
+};
+
+/*
+ * Initialize an rbd client instance.
+ */
+static struct rbd_client *rbd_client_create(struct ceph_options *opt)
+{
+	struct rbd_client *rbdc;
+	int ret = -ENOMEM;
+
+	dout("rbd_client_create\n");
+	rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
+	if (!rbdc)
+		goto out;
+
+	kref_init(&rbdc->kref);
+	INIT_LIST_HEAD(&rbdc->node);
+
+	rbdc->client = ceph_create_client(opt, rbdc);
+	if (IS_ERR(rbdc->client))
+		goto out_free;
+
+	ret = ceph_open_session(rbdc->client);
+	if (ret < 0)
+		goto out_err;
+
+	spin_lock(&node_lock);
+	list_add_tail(&rbdc->node, &rbd_client_list);
+	spin_unlock(&node_lock);
+
+	dout("rbd_client_create created %p\n", rbdc);
+	return rbdc;
+
+out_err:
+	ceph_destroy_client(rbdc->client);
+out_free:
+	kfree(rbdc);
+out:
+	return ERR_PTR(-ENOMEM);
+}
+
+/*
+ * Find a ceph client with specific addr and configuration.
+ */
+static struct rbd_client *__rbd_client_find(struct ceph_options *opt)
+{
+	struct rbd_client *client_node;
+
+	if (opt->flags & CEPH_OPT_NOSHARE)
+		return NULL;
+
+	list_for_each_entry(client_node, &rbd_client_list, node)
+		if (ceph_compare_options(opt, client_node->client) == 0)
+			return client_node;
+	return NULL;
+}
+
+/*
+ * Get a ceph client with specific addr and configuration, if one does
+ * not exist create it.
+ */
+static int rbd_get_client(struct rbd_device *rbd_dev, const char *mon_addr,
+			  char *options)
+{
+	struct rbd_client *rbdc;
+	struct ceph_options *opt;
+	int ret;
+
+	ret = ceph_parse_options(&opt, options, mon_addr,
+				 mon_addr + strlen(mon_addr), NULL, NULL);
+	if (ret < 0)
+		return ret;
+
+	spin_lock(&node_lock);
+	rbdc = __rbd_client_find(opt);
+	if (rbdc) {
+		ceph_destroy_options(opt);
+
+		/* using an existing client */
+		kref_get(&rbdc->kref);
+		rbd_dev->rbd_client = rbdc;
+		rbd_dev->client = rbdc->client;
+		spin_unlock(&node_lock);
+		return 0;
+	}
+	spin_unlock(&node_lock);
+
+	rbdc = rbd_client_create(opt);
+	if (IS_ERR(rbdc)) {
+		ret = PTR_ERR(rbdc);
+		goto out_args;
+	}
+	rbd_dev->rbd_client = rbdc;
+	rbd_dev->client = rbdc->client;
+	return 0;
+
+out_args:
+	ceph_destroy_options(opt);
+	return ret;
+}
+
+/*
+ * Destroy ceph client
+ */
+static void rbd_client_release(struct kref *kref)
+{
+	struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
+
+	dout("rbd_release_client %p\n", rbdc);
+	spin_lock(&node_lock);
+	list_del(&rbdc->node);
+	spin_unlock(&node_lock);
+
+	ceph_destroy_client(rbdc->client);
+	kfree(rbdc);
+}
+
+/*
+ * Drop reference to ceph client node. If it's not referenced anymore, release
+ * it.
+ */
+static void rbd_put_client(struct rbd_device *rbd_dev)
+{
+	kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
+	rbd_dev->rbd_client = NULL;
+	rbd_dev->client = NULL;
+}
+
+
+/*
+ * Create a new header structure, translate header format from the on-disk
+ * header.
+ */
+static int rbd_header_from_disk(struct rbd_image_header *header,
+				 struct rbd_image_header_ondisk *ondisk,
+				 int allocated_snaps,
+				 gfp_t gfp_flags)
+{
+	int i;
+	u32 snap_count = le32_to_cpu(ondisk->snap_count);
+	int ret = -ENOMEM;
+
+	init_rwsem(&header->snap_rwsem);
+
+	header->snap_names_len = le64_to_cpu(ondisk->snap_names_len);
+	header->snapc = kmalloc(sizeof(struct ceph_snap_context) +
+				snap_count *
+				 sizeof(struct rbd_image_snap_ondisk),
+				gfp_flags);
+	if (!header->snapc)
+		return -ENOMEM;
+	if (snap_count) {
+		header->snap_names = kmalloc(header->snap_names_len,
+					     GFP_KERNEL);
+		if (!header->snap_names)
+			goto err_snapc;
+		header->snap_sizes = kmalloc(snap_count * sizeof(u64),
+					     GFP_KERNEL);
+		if (!header->snap_sizes)
+			goto err_names;
+	} else {
+		header->snap_names = NULL;
+		header->snap_sizes = NULL;
+	}
+	memcpy(header->block_name, ondisk->block_name,
+	       sizeof(ondisk->block_name));
+
+	header->image_size = le64_to_cpu(ondisk->image_size);
+	header->obj_order = ondisk->options.order;
+	header->crypt_type = ondisk->options.crypt_type;
+	header->comp_type = ondisk->options.comp_type;
+
+	atomic_set(&header->snapc->nref, 1);
+	header->snap_seq = le64_to_cpu(ondisk->snap_seq);
+	header->snapc->num_snaps = snap_count;
+	header->total_snaps = snap_count;
+
+	if (snap_count &&
+	    allocated_snaps == snap_count) {
+		for (i = 0; i < snap_count; i++) {
+			header->snapc->snaps[i] =
+				le64_to_cpu(ondisk->snaps[i].id);
+			header->snap_sizes[i] =
+				le64_to_cpu(ondisk->snaps[i].image_size);
+		}
+
+		/* copy snapshot names */
+		memcpy(header->snap_names, &ondisk->snaps[i],
+			header->snap_names_len);
+	}
+
+	return 0;
+
+err_names:
+	kfree(header->snap_names);
+err_snapc:
+	kfree(header->snapc);
+	return ret;
+}
+
+static int snap_index(struct rbd_image_header *header, int snap_num)
+{
+	return header->total_snaps - snap_num;
+}
+
+static u64 cur_snap_id(struct rbd_device *rbd_dev)
+{
+	struct rbd_image_header *header = &rbd_dev->header;
+
+	if (!rbd_dev->cur_snap)
+		return 0;
+
+	return header->snapc->snaps[snap_index(header, rbd_dev->cur_snap)];
+}
+
+static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
+			u64 *seq, u64 *size)
+{
+	int i;
+	char *p = header->snap_names;
+
+	for (i = 0; i < header->total_snaps; i++, p += strlen(p) + 1) {
+		if (strcmp(snap_name, p) == 0)
+			break;
+	}
+	if (i == header->total_snaps)
+		return -ENOENT;
+	if (seq)
+		*seq = header->snapc->snaps[i];
+
+	if (size)
+		*size = header->snap_sizes[i];
+
+	return i;
+}
+
+static int rbd_header_set_snap(struct rbd_device *dev,
+			       const char *snap_name,
+			       u64 *size)
+{
+	struct rbd_image_header *header = &dev->header;
+	struct ceph_snap_context *snapc = header->snapc;
+	int ret = -ENOENT;
+
+	down_write(&header->snap_rwsem);
+
+	if (!snap_name ||
+	    !*snap_name ||
+	    strcmp(snap_name, "-") == 0 ||
+	    strcmp(snap_name, RBD_SNAP_HEAD_NAME) == 0) {
+		if (header->total_snaps)
+			snapc->seq = header->snap_seq;
+		else
+			snapc->seq = 0;
+		dev->cur_snap = 0;
+		dev->read_only = 0;
+		if (size)
+			*size = header->image_size;
+	} else {
+		ret = snap_by_name(header, snap_name, &snapc->seq, size);
+		if (ret < 0)
+			goto done;
+
+		dev->cur_snap = header->total_snaps - ret;
+		dev->read_only = 1;
+	}
+
+	ret = 0;
+done:
+	up_write(&header->snap_rwsem);
+	return ret;
+}
+
+static void rbd_header_free(struct rbd_image_header *header)
+{
+	kfree(header->snapc);
+	kfree(header->snap_names);
+	kfree(header->snap_sizes);
+}
+
+/*
+ * get the actual striped segment name, offset and length
+ */
+static u64 rbd_get_segment(struct rbd_image_header *header,
+			   const char *block_name,
+			   u64 ofs, u64 len,
+			   char *seg_name, u64 *segofs)
+{
+	u64 seg = ofs >> header->obj_order;
+
+	if (seg_name)
+		snprintf(seg_name, RBD_MAX_SEG_NAME_LEN,
+			 "%s.%012llx", block_name, seg);
+
+	ofs = ofs & ((1 << header->obj_order) - 1);
+	len = min_t(u64, len, (1 << header->obj_order) - ofs);
+
+	if (segofs)
+		*segofs = ofs;
+
+	return len;
+}
+
+/*
+ * bio helpers
+ */
+
+static void bio_chain_put(struct bio *chain)
+{
+	struct bio *tmp;
+
+	while (chain) {
+		tmp = chain;
+		chain = chain->bi_next;
+		bio_put(tmp);
+	}
+}
+
+/*
+ * zeros a bio chain, starting at specific offset
+ */
+static void zero_bio_chain(struct bio *chain, int start_ofs)
+{
+	struct bio_vec *bv;
+	unsigned long flags;
+	void *buf;
+	int i;
+	int pos = 0;
+
+	while (chain) {
+		bio_for_each_segment(bv, chain, i) {
+			if (pos + bv->bv_len > start_ofs) {
+				int remainder = max(start_ofs - pos, 0);
+				buf = bvec_kmap_irq(bv, &flags);
+				memset(buf + remainder, 0,
+				       bv->bv_len - remainder);
+				bvec_kunmap_irq(bv, &flags);
+			}
+			pos += bv->bv_len;
+		}
+
+		chain = chain->bi_next;
+	}
+}
+
+/*
+ * bio_chain_clone - clone a chain of bios up to a certain length.
+ * might return a bio_pair that will need to be released.
+ */
+static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
+				   struct bio_pair **bp,
+				   int len, gfp_t gfpmask)
+{
+	struct bio *tmp, *old_chain = *old, *new_chain = NULL, *tail = NULL;
+	int total = 0;
+
+	if (*bp) {
+		bio_pair_release(*bp);
+		*bp = NULL;
+	}
+
+	while (old_chain && (total < len)) {
+		tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
+		if (!tmp)
+			goto err_out;
+
+		if (total + old_chain->bi_size > len) {
+			struct bio_pair *bp;
+
+			/*
+			 * this split can only happen with a single paged bio,
+			 * split_bio will BUG_ON if this is not the case
+			 */
+			dout("bio_chain_clone split! total=%d remaining=%d"
+			     "bi_size=%d\n",
+			     (int)total, (int)len-total,
+			     (int)old_chain->bi_size);
+
+			/* split the bio. We'll release it either in the next
+			   call, or it will have to be released outside */
+			bp = bio_split(old_chain, (len - total) / 512ULL);
+			if (!bp)
+				goto err_out;
+
+			__bio_clone(tmp, &bp->bio1);
+
+			*next = &bp->bio2;
+		} else {
+			__bio_clone(tmp, old_chain);
+			*next = old_chain->bi_next;
+		}
+
+		tmp->bi_bdev = NULL;
+		gfpmask &= ~__GFP_WAIT;
+		tmp->bi_next = NULL;
+
+		if (!new_chain) {
+			new_chain = tail = tmp;
+		} else {
+			tail->bi_next = tmp;
+			tail = tmp;
+		}
+		old_chain = old_chain->bi_next;
+
+		total += tmp->bi_size;
+	}
+
+	BUG_ON(total < len);
+
+	if (tail)
+		tail->bi_next = NULL;
+
+	*old = old_chain;
+
+	return new_chain;
+
+err_out:
+	dout("bio_chain_clone with err\n");
+	bio_chain_put(new_chain);
+	return NULL;
+}
+
+/*
+ * helpers for osd request op vectors.
+ */
+static int rbd_create_rw_ops(struct ceph_osd_req_op **ops,
+			    int num_ops,
+			    int opcode,
+			    u32 payload_len)
+{
+	*ops = kzalloc(sizeof(struct ceph_osd_req_op) * (num_ops + 1),
+		       GFP_NOIO);
+	if (!*ops)
+		return -ENOMEM;
+	(*ops)[0].op = opcode;
+	/*
+	 * op extent offset and length will be set later on
+	 * in calc_raw_layout()
+	 */
+	(*ops)[0].payload_len = payload_len;
+	return 0;
+}
+
+static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
+{
+	kfree(ops);
+}
+
+/*
+ * Send ceph osd request
+ */
+static int rbd_do_request(struct request *rq,
+			  struct rbd_device *dev,
+			  struct ceph_snap_context *snapc,
+			  u64 snapid,
+			  const char *obj, u64 ofs, u64 len,
+			  struct bio *bio,
+			  struct page **pages,
+			  int num_pages,
+			  int flags,
+			  struct ceph_osd_req_op *ops,
+			  int num_reply,
+			  void (*rbd_cb)(struct ceph_osd_request *req,
+					 struct ceph_msg *msg))
+{
+	struct ceph_osd_request *req;
+	struct ceph_file_layout *layout;
+	int ret;
+	u64 bno;
+	struct timespec mtime = CURRENT_TIME;
+	struct rbd_request *req_data;
+	struct ceph_osd_request_head *reqhead;
+	struct rbd_image_header *header = &dev->header;
+
+	ret = -ENOMEM;
+	req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
+	if (!req_data)
+		goto done;
+
+	dout("rbd_do_request len=%lld ofs=%lld\n", len, ofs);
+
+	down_read(&header->snap_rwsem);
+
+	req = ceph_osdc_alloc_request(&dev->client->osdc, flags,
+				      snapc,
+				      ops,
+				      false,
+				      GFP_NOIO, pages, bio);
+	if (IS_ERR(req)) {
+		up_read(&header->snap_rwsem);
+		ret = PTR_ERR(req);
+		goto done_pages;
+	}
+
+	req->r_callback = rbd_cb;
+
+	req_data->rq = rq;
+	req_data->bio = bio;
+	req_data->pages = pages;
+	req_data->len = len;
+
+	req->r_priv = req_data;
+
+	reqhead = req->r_request->front.iov_base;
+	reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
+
+	strncpy(req->r_oid, obj, sizeof(req->r_oid));
+	req->r_oid_len = strlen(req->r_oid);
+
+	layout = &req->r_file_layout;
+	memset(layout, 0, sizeof(*layout));
+	layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
+	layout->fl_stripe_count = cpu_to_le32(1);
+	layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
+	layout->fl_pg_preferred = cpu_to_le32(-1);
+	layout->fl_pg_pool = cpu_to_le32(dev->poolid);
+	ceph_calc_raw_layout(&dev->client->osdc, layout, snapid,
+			     ofs, &len, &bno, req, ops);
+
+	ceph_osdc_build_request(req, ofs, &len,
+				ops,
+				snapc,
+				&mtime,
+				req->r_oid, req->r_oid_len);
+	up_read(&header->snap_rwsem);
+
+	ret = ceph_osdc_start_request(&dev->client->osdc, req, false);
+	if (ret < 0)
+		goto done_err;
+
+	if (!rbd_cb) {
+		ret = ceph_osdc_wait_request(&dev->client->osdc, req);
+		ceph_osdc_put_request(req);
+	}
+	return ret;
+
+done_err:
+	bio_chain_put(req_data->bio);
+	ceph_osdc_put_request(req);
+done_pages:
+	kfree(req_data);
+done:
+	if (rq)
+		blk_end_request(rq, ret, len);
+	return ret;
+}
+
+/*
+ * Ceph osd op callback
+ */
+static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
+{
+	struct rbd_request *req_data = req->r_priv;
+	struct ceph_osd_reply_head *replyhead;
+	struct ceph_osd_op *op;
+	__s32 rc;
+	u64 bytes;
+	int read_op;
+
+	/* parse reply */
+	replyhead = msg->front.iov_base;
+	WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
+	op = (void *)(replyhead + 1);
+	rc = le32_to_cpu(replyhead->result);
+	bytes = le64_to_cpu(op->extent.length);
+	read_op = (le32_to_cpu(op->op) == CEPH_OSD_OP_READ);
+
+	dout("rbd_req_cb bytes=%lld readop=%d rc=%d\n", bytes, read_op, rc);
+
+	if (rc == -ENOENT && read_op) {
+		zero_bio_chain(req_data->bio, 0);
+		rc = 0;
+	} else if (rc == 0 && read_op && bytes < req_data->len) {
+		zero_bio_chain(req_data->bio, bytes);
+		bytes = req_data->len;
+	}
+
+	blk_end_request(req_data->rq, rc, bytes);
+
+	if (req_data->bio)
+		bio_chain_put(req_data->bio);
+
+	ceph_osdc_put_request(req);
+	kfree(req_data);
+}
+
+/*
+ * Do a synchronous ceph osd operation
+ */
+static int rbd_req_sync_op(struct rbd_device *dev,
+			   struct ceph_snap_context *snapc,
+			   u64 snapid,
+			   int opcode,
+			   int flags,
+			   struct ceph_osd_req_op *orig_ops,
+			   int num_reply,
+			   const char *obj,
+			   u64 ofs, u64 len,
+			   char *buf)
+{
+	int ret;
+	struct page **pages;
+	int num_pages;
+	struct ceph_osd_req_op *ops = orig_ops;
+	u32 payload_len;
+
+	num_pages = calc_pages_for(ofs , len);
+	pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
+	if (!pages)
+		return -ENOMEM;
+
+	if (!orig_ops) {
+		payload_len = (flags & CEPH_OSD_FLAG_WRITE ? len : 0);
+		ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
+		if (ret < 0)
+			goto done;
+
+		if ((flags & CEPH_OSD_FLAG_WRITE) && buf) {
+			ret = ceph_copy_to_page_vector(pages, buf, ofs, len);
+			if (ret < 0)
+				goto done_ops;
+		}
+	}
+
+	ret = rbd_do_request(NULL, dev, snapc, snapid,
+			  obj, ofs, len, NULL,
+			  pages, num_pages,
+			  flags,
+			  ops,
+			  2,
+			  NULL);
+	if (ret < 0)
+		goto done_ops;
+
+	if ((flags & CEPH_OSD_FLAG_READ) && buf)
+		ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
+
+done_ops:
+	if (!orig_ops)
+		rbd_destroy_ops(ops);
+done:
+	ceph_release_page_vector(pages, num_pages);
+	return ret;
+}
+
+/*
+ * Do an asynchronous ceph osd operation
+ */
+static int rbd_do_op(struct request *rq,
+		     struct rbd_device *rbd_dev ,
+		     struct ceph_snap_context *snapc,
+		     u64 snapid,
+		     int opcode, int flags, int num_reply,
+		     u64 ofs, u64 len,
+		     struct bio *bio)
+{
+	char *seg_name;
+	u64 seg_ofs;
+	u64 seg_len;
+	int ret;
+	struct ceph_osd_req_op *ops;
+	u32 payload_len;
+
+	seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
+	if (!seg_name)
+		return -ENOMEM;
+
+	seg_len = rbd_get_segment(&rbd_dev->header,
+				  rbd_dev->header.block_name,
+				  ofs, len,
+				  seg_name, &seg_ofs);
+	if (seg_len < 0)
+		return seg_len;
+
+	payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
+
+	ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
+	if (ret < 0)
+		goto done;
+
+	/* we've taken care of segment sizes earlier when we
+	   cloned the bios. We should never have a segment
+	   truncated at this point */
+	BUG_ON(seg_len < len);
+
+	ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
+			     seg_name, seg_ofs, seg_len,
+			     bio,
+			     NULL, 0,
+			     flags,
+			     ops,
+			     num_reply,
+			     rbd_req_cb);
+done:
+	kfree(seg_name);
+	return ret;
+}
+
+/*
+ * Request async osd write
+ */
+static int rbd_req_write(struct request *rq,
+			 struct rbd_device *rbd_dev,
+			 struct ceph_snap_context *snapc,
+			 u64 ofs, u64 len,
+			 struct bio *bio)
+{
+	return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
+			 CEPH_OSD_OP_WRITE,
+			 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
+			 2,
+			 ofs, len, bio);
+}
+
+/*
+ * Request async osd read
+ */
+static int rbd_req_read(struct request *rq,
+			 struct rbd_device *rbd_dev,
+			 u64 snapid,
+			 u64 ofs, u64 len,
+			 struct bio *bio)
+{
+	return rbd_do_op(rq, rbd_dev, NULL,
+			 (snapid ? snapid : CEPH_NOSNAP),
+			 CEPH_OSD_OP_READ,
+			 CEPH_OSD_FLAG_READ,
+			 2,
+			 ofs, len, bio);
+}
+
+/*
+ * Request sync osd read
+ */
+static int rbd_req_sync_read(struct rbd_device *dev,
+			  struct ceph_snap_context *snapc,
+			  u64 snapid,
+			  const char *obj,
+			  u64 ofs, u64 len,
+			  char *buf)
+{
+	return rbd_req_sync_op(dev, NULL,
+			       (snapid ? snapid : CEPH_NOSNAP),
+			       CEPH_OSD_OP_READ,
+			       CEPH_OSD_FLAG_READ,
+			       NULL,
+			       1, obj, ofs, len, buf);
+}
+
+/*
+ * Request sync osd read
+ */
+static int rbd_req_sync_rollback_obj(struct rbd_device *dev,
+				     u64 snapid,
+				     const char *obj)
+{
+	struct ceph_osd_req_op *ops;
+	int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_ROLLBACK, 0);
+	if (ret < 0)
+		return ret;
+
+	ops[0].snap.snapid = snapid;
+
+	ret = rbd_req_sync_op(dev, NULL,
+			       CEPH_NOSNAP,
+			       0,
+			       CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
+			       ops,
+			       1, obj, 0, 0, NULL);
+
+	rbd_destroy_ops(ops);
+
+	if (ret < 0)
+		return ret;
+
+	return ret;
+}
+
+/*
+ * Request sync osd read
+ */
+static int rbd_req_sync_exec(struct rbd_device *dev,
+			     const char *obj,
+			     const char *cls,
+			     const char *method,
+			     const char *data,
+			     int len)
+{
+	struct ceph_osd_req_op *ops;
+	int cls_len = strlen(cls);
+	int method_len = strlen(method);
+	int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_CALL,
+				    cls_len + method_len + len);
+	if (ret < 0)
+		return ret;
+
+	ops[0].cls.class_name = cls;
+	ops[0].cls.class_len = (__u8)cls_len;
+	ops[0].cls.method_name = method;
+	ops[0].cls.method_len = (__u8)method_len;
+	ops[0].cls.argc = 0;
+	ops[0].cls.indata = data;
+	ops[0].cls.indata_len = len;
+
+	ret = rbd_req_sync_op(dev, NULL,
+			       CEPH_NOSNAP,
+			       0,
+			       CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
+			       ops,
+			       1, obj, 0, 0, NULL);
+
+	rbd_destroy_ops(ops);
+
+	dout("cls_exec returned %d\n", ret);
+	return ret;
+}
+
+/*
+ * block device queue callback
+ */
+static void rbd_rq_fn(struct request_queue *q)
+{
+	struct rbd_device *rbd_dev = q->queuedata;
+	struct request *rq;
+	struct bio_pair *bp = NULL;
+
+	rq = blk_fetch_request(q);
+
+	while (1) {
+		struct bio *bio;
+		struct bio *rq_bio, *next_bio = NULL;
+		bool do_write;
+		int size, op_size = 0;
+		u64 ofs;
+
+		/* peek at request from block layer */
+		if (!rq)
+			break;
+
+		dout("fetched request\n");
+
+		/* filter out block requests we don't understand */
+		if ((rq->cmd_type != REQ_TYPE_FS)) {
+			__blk_end_request_all(rq, 0);
+			goto next;
+		}
+
+		/* deduce our operation (read, write) */
+		do_write = (rq_data_dir(rq) == WRITE);
+
+		size = blk_rq_bytes(rq);
+		ofs = blk_rq_pos(rq) * 512ULL;
+		rq_bio = rq->bio;
+		if (do_write && rbd_dev->read_only) {
+			__blk_end_request_all(rq, -EROFS);
+			goto next;
+		}
+
+		spin_unlock_irq(q->queue_lock);
+
+		dout("%s 0x%x bytes at 0x%llx\n",
+		     do_write ? "write" : "read",
+		     size, blk_rq_pos(rq) * 512ULL);
+
+		do {
+			/* a bio clone to be passed down to OSD req */
+			dout("rq->bio->bi_vcnt=%d\n", rq->bio->bi_vcnt);
+			op_size = rbd_get_segment(&rbd_dev->header,
+						  rbd_dev->header.block_name,
+						  ofs, size,
+						  NULL, NULL);
+			bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
+					      op_size, GFP_ATOMIC);
+			if (!bio) {
+				spin_lock_irq(q->queue_lock);
+				__blk_end_request_all(rq, -ENOMEM);
+				goto next;
+			}
+
+			/* init OSD command: write or read */
+			if (do_write)
+				rbd_req_write(rq, rbd_dev,
+					      rbd_dev->header.snapc,
+					      ofs,
+					      op_size, bio);
+			else
+				rbd_req_read(rq, rbd_dev,
+					     cur_snap_id(rbd_dev),
+					     ofs,
+					     op_size, bio);
+
+			size -= op_size;
+			ofs += op_size;
+
+			rq_bio = next_bio;
+		} while (size > 0);
+
+		if (bp)
+			bio_pair_release(bp);
+
+		spin_lock_irq(q->queue_lock);
+next:
+		rq = blk_fetch_request(q);
+	}
+}
+
+/*
+ * a queue callback. Makes sure that we don't create a bio that spans across
+ * multiple osd objects. One exception would be with a single page bios,
+ * which we handle later at bio_chain_clone
+ */
+static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
+			  struct bio_vec *bvec)
+{
+	struct rbd_device *rbd_dev = q->queuedata;
+	unsigned int chunk_sectors = 1 << (rbd_dev->header.obj_order - 9);
+	sector_t sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
+	unsigned int bio_sectors = bmd->bi_size >> 9;
+	int max;
+
+	max =  (chunk_sectors - ((sector & (chunk_sectors - 1))
+				 + bio_sectors)) << 9;
+	if (max < 0)
+		max = 0; /* bio_add cannot handle a negative return */
+	if (max <= bvec->bv_len && bio_sectors == 0)
+		return bvec->bv_len;
+	return max;
+}
+
+static void rbd_free_disk(struct rbd_device *rbd_dev)
+{
+	struct gendisk *disk = rbd_dev->disk;
+
+	if (!disk)
+		return;
+
+	rbd_header_free(&rbd_dev->header);
+
+	if (disk->flags & GENHD_FL_UP)
+		del_gendisk(disk);
+	if (disk->queue)
+		blk_cleanup_queue(disk->queue);
+	put_disk(disk);
+}
+
+/*
+ * reload the ondisk the header 
+ */
+static int rbd_read_header(struct rbd_device *rbd_dev,
+			   struct rbd_image_header *header)
+{
+	ssize_t rc;
+	struct rbd_image_header_ondisk *dh;
+	int snap_count = 0;
+	u64 snap_names_len = 0;
+
+	while (1) {
+		int len = sizeof(*dh) +
+			  snap_count * sizeof(struct rbd_image_snap_ondisk) +
+			  snap_names_len;
+
+		rc = -ENOMEM;
+		dh = kmalloc(len, GFP_KERNEL);
+		if (!dh)
+			return -ENOMEM;
+
+		rc = rbd_req_sync_read(rbd_dev,
+				       NULL, CEPH_NOSNAP,
+				       rbd_dev->obj_md_name,
+				       0, len,
+				       (char *)dh);
+		if (rc < 0)
+			goto out_dh;
+
+		rc = rbd_header_from_disk(header, dh, snap_count, GFP_KERNEL);
+		if (rc < 0)
+			goto out_dh;
+
+		if (snap_count != header->total_snaps) {
+			snap_count = header->total_snaps;
+			snap_names_len = header->snap_names_len;
+			rbd_header_free(header);
+			kfree(dh);
+			continue;
+		}
+		break;
+	}
+
+out_dh:
+	kfree(dh);
+	return rc;
+}
+
+/*
+ * create a snapshot
+ */
+static int rbd_header_add_snap(struct rbd_device *dev,
+			       const char *snap_name,
+			       gfp_t gfp_flags)
+{
+	int name_len = strlen(snap_name);
+	u64 new_snapid;
+	int ret;
+	void *data, *data_start, *data_end;
+
+	/* we should create a snapshot only if we're pointing at the head */
+	if (dev->cur_snap)
+		return -EINVAL;
+
+	ret = ceph_monc_create_snapid(&dev->client->monc, dev->poolid,
+				      &new_snapid);
+	dout("created snapid=%lld\n", new_snapid);
+	if (ret < 0)
+		return ret;
+
+	data = kmalloc(name_len + 16, gfp_flags);
+	if (!data)
+		return -ENOMEM;
+
+	data_start = data;
+	data_end = data + name_len + 16;
+
+	ceph_encode_string_safe(&data, data_end, snap_name, name_len, bad);
+	ceph_encode_64_safe(&data, data_end, new_snapid, bad);
+
+	ret = rbd_req_sync_exec(dev, dev->obj_md_name, "rbd", "snap_add",
+				data_start, data - data_start);
+
+	kfree(data_start);
+
+	if (ret < 0)
+		return ret;
+
+	dev->header.snapc->seq =  new_snapid;
+
+	return 0;
+bad:
+	return -ERANGE;
+}
+
+/*
+ * only read the first part of the ondisk header, without the snaps info
+ */
+static int rbd_update_snaps(struct rbd_device *rbd_dev)
+{
+	int ret;
+	struct rbd_image_header h;
+	u64 snap_seq;
+
+	ret = rbd_read_header(rbd_dev, &h);
+	if (ret < 0)
+		return ret;
+
+	down_write(&rbd_dev->header.snap_rwsem);
+
+	snap_seq = rbd_dev->header.snapc->seq;
+
+	kfree(rbd_dev->header.snapc);
+	kfree(rbd_dev->header.snap_names);
+	kfree(rbd_dev->header.snap_sizes);
+
+	rbd_dev->header.total_snaps = h.total_snaps;
+	rbd_dev->header.snapc = h.snapc;
+	rbd_dev->header.snap_names = h.snap_names;
+	rbd_dev->header.snap_sizes = h.snap_sizes;
+	rbd_dev->header.snapc->seq = snap_seq;
+
+	up_write(&rbd_dev->header.snap_rwsem);
+
+	return 0;
+}
+
+static int rbd_init_disk(struct rbd_device *rbd_dev)
+{
+	struct gendisk *disk;
+	struct request_queue *q;
+	int rc;
+	u64 total_size = 0;
+
+	/* contact OSD, request size info about the object being mapped */
+	rc = rbd_read_header(rbd_dev, &rbd_dev->header);
+	if (rc)
+		return rc;
+
+	rc = rbd_header_set_snap(rbd_dev, rbd_dev->snap_name, &total_size);
+	if (rc)
+		return rc;
+
+	/* create gendisk info */
+	rc = -ENOMEM;
+	disk = alloc_disk(RBD_MINORS_PER_MAJOR);
+	if (!disk)
+		goto out;
+
+	sprintf(disk->disk_name, DRV_NAME "%d", rbd_dev->id);
+	disk->major = rbd_dev->major;
+	disk->first_minor = 0;
+	disk->fops = &rbd_bd_ops;
+	disk->private_data = rbd_dev;
+
+	/* init rq */
+	rc = -ENOMEM;
+	q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
+	if (!q)
+		goto out_disk;
+	blk_queue_merge_bvec(q, rbd_merge_bvec);
+	disk->queue = q;
+
+	q->queuedata = rbd_dev;
+
+	rbd_dev->disk = disk;
+	rbd_dev->q = q;
+
+	/* finally, announce the disk to the world */
+	set_capacity(disk, total_size / 512ULL);
+	add_disk(disk);
+
+	pr_info("%s: added with size 0x%llx\n",
+		disk->disk_name, (unsigned long long)total_size);
+	return 0;
+
+out_disk:
+	put_disk(disk);
+out:
+	return rc;
+}
+
+/********************************************************************
+ * /sys/class/rbd/
+ *                   add	map rados objects to blkdev
+ *                   remove	unmap rados objects
+ *                   list	show mappings
+ *******************************************************************/
+
+static void class_rbd_release(struct class *cls)
+{
+	kfree(cls);
+}
+
+static ssize_t class_rbd_list(struct class *c,
+			      struct class_attribute *attr,
+			      char *data)
+{
+	int n = 0;
+	struct list_head *tmp;
+	int max = PAGE_SIZE;
+
+	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
+
+	n += snprintf(data, max,
+		      "#id\tmajor\tclient_name\tpool\tname\tsnap\tKB\n");
+
+	list_for_each(tmp, &rbd_dev_list) {
+		struct rbd_device *rbd_dev;
+
+		rbd_dev = list_entry(tmp, struct rbd_device, node);
+		n += snprintf(data+n, max-n,
+			      "%d\t%d\tclient%lld\t%s\t%s\t%s\t%lld\n",
+			      rbd_dev->id,
+			      rbd_dev->major,
+			      ceph_client_id(rbd_dev->client),
+			      rbd_dev->pool_name,
+			      rbd_dev->obj, rbd_dev->snap_name,
+			      rbd_dev->header.image_size >> 10);
+		if (n == max)
+			break;
+	}
+
+	mutex_unlock(&ctl_mutex);
+	return n;
+}
+
+static ssize_t class_rbd_add(struct class *c,
+			     struct class_attribute *attr,
+			     const char *buf, size_t count)
+{
+	struct ceph_osd_client *osdc;
+	struct rbd_device *rbd_dev;
+	ssize_t rc = -ENOMEM;
+	int irc, new_id = 0;
+	struct list_head *tmp;
+	char *mon_dev_name;
+	char *options;
+
+	if (!try_module_get(THIS_MODULE))
+		return -ENODEV;
+
+	mon_dev_name = kmalloc(RBD_MAX_OPT_LEN, GFP_KERNEL);
+	if (!mon_dev_name)
+		goto err_out_mod;
+
+	options = kmalloc(RBD_MAX_OPT_LEN, GFP_KERNEL);
+	if (!options)
+		goto err_mon_dev;
+
+	/* new rbd_device object */
+	rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
+	if (!rbd_dev)
+		goto err_out_opt;
+
+	/* static rbd_device initialization */
+	spin_lock_init(&rbd_dev->lock);
+	INIT_LIST_HEAD(&rbd_dev->node);
+
+	/* generate unique id: find highest unique id, add one */
+	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
+
+	list_for_each(tmp, &rbd_dev_list) {
+		struct rbd_device *rbd_dev;
+
+		rbd_dev = list_entry(tmp, struct rbd_device, node);
+		if (rbd_dev->id >= new_id)
+			new_id = rbd_dev->id + 1;
+	}
+
+	rbd_dev->id = new_id;
+
+	/* add to global list */
+	list_add_tail(&rbd_dev->node, &rbd_dev_list);
+
+	/* parse add command */
+	if (sscanf(buf, "%" __stringify(RBD_MAX_OPT_LEN) "s "
+		   "%" __stringify(RBD_MAX_OPT_LEN) "s "
+		   "%" __stringify(RBD_MAX_POOL_NAME_LEN) "s "
+		   "%" __stringify(RBD_MAX_OBJ_NAME_LEN) "s"
+		   "%" __stringify(RBD_MAX_SNAP_NAME_LEN) "s",
+		   mon_dev_name, options, rbd_dev->pool_name,
+		   rbd_dev->obj, rbd_dev->snap_name) < 4) {
+		rc = -EINVAL;
+		goto err_out_slot;
+	}
+
+	if (rbd_dev->snap_name[0] == 0)
+		rbd_dev->snap_name[0] = '-';
+
+	rbd_dev->obj_len = strlen(rbd_dev->obj);
+	snprintf(rbd_dev->obj_md_name, sizeof(rbd_dev->obj_md_name), "%s%s",
+		 rbd_dev->obj, RBD_SUFFIX);
+
+	/* initialize rest of new object */
+	snprintf(rbd_dev->name, DEV_NAME_LEN, DRV_NAME "%d", rbd_dev->id);
+	rc = rbd_get_client(rbd_dev, mon_dev_name, options);
+	if (rc < 0)
+		goto err_out_slot;
+
+	mutex_unlock(&ctl_mutex);
+
+	/* pick the pool */
+	osdc = &rbd_dev->client->osdc;
+	rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
+	if (rc < 0)
+		goto err_out_client;
+	rbd_dev->poolid = rc;
+
+	/* register our block device */
+	irc = register_blkdev(0, rbd_dev->name);
+	if (irc < 0) {
+		rc = irc;
+		goto err_out_client;
+	}
+	rbd_dev->major = irc;
+
+	/* set up and announce blkdev mapping */
+	rc = rbd_init_disk(rbd_dev);
+	if (rc)
+		goto err_out_blkdev;
+
+	return count;
+
+err_out_blkdev:
+	unregister_blkdev(rbd_dev->major, rbd_dev->name);
+err_out_client:
+	rbd_put_client(rbd_dev);
+	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
+err_out_slot:
+	list_del_init(&rbd_dev->node);
+	mutex_unlock(&ctl_mutex);
+
+	kfree(rbd_dev);
+err_out_opt:
+	kfree(options);
+err_mon_dev:
+	kfree(mon_dev_name);
+err_out_mod:
+	dout("Error adding device %s\n", buf);
+	module_put(THIS_MODULE);
+	return rc;
+}
+
+static struct rbd_device *__rbd_get_dev(unsigned long id)
+{
+	struct list_head *tmp;
+	struct rbd_device *rbd_dev;
+
+	list_for_each(tmp, &rbd_dev_list) {
+		rbd_dev = list_entry(tmp, struct rbd_device, node);
+		if (rbd_dev->id == id)
+			return rbd_dev;
+	}
+	return NULL;
+}
+
+static ssize_t class_rbd_remove(struct class *c,
+				struct class_attribute *attr,
+				const char *buf,
+				size_t count)
+{
+	struct rbd_device *rbd_dev = NULL;
+	int target_id, rc;
+	unsigned long ul;
+
+	rc = strict_strtoul(buf, 10, &ul);
+	if (rc)
+		return rc;
+
+	/* convert to int; abort if we lost anything in the conversion */
+	target_id = (int) ul;
+	if (target_id != ul)
+		return -EINVAL;
+
+	/* remove object from list immediately */
+	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
+
+	rbd_dev = __rbd_get_dev(target_id);
+	if (rbd_dev)
+		list_del_init(&rbd_dev->node);
+
+	mutex_unlock(&ctl_mutex);
+
+	if (!rbd_dev)
+		return -ENOENT;
+
+	rbd_put_client(rbd_dev);
+
+	/* clean up and free blkdev */
+	rbd_free_disk(rbd_dev);
+	unregister_blkdev(rbd_dev->major, rbd_dev->name);
+	kfree(rbd_dev);
+
+	/* release module ref */
+	module_put(THIS_MODULE);
+
+	return count;
+}
+
+static ssize_t class_rbd_snaps_list(struct class *c,
+			      struct class_attribute *attr,
+			      char *data)
+{
+	struct rbd_device *rbd_dev = NULL;
+	struct list_head *tmp;
+	struct rbd_image_header *header;
+	int i, n = 0, max = PAGE_SIZE;
+	int ret;
+
+	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
+
+	n += snprintf(data, max, "#id\tsnap\tKB\n");
+
+	list_for_each(tmp, &rbd_dev_list) {
+		char *names, *p;
+		struct ceph_snap_context *snapc;
+
+		rbd_dev = list_entry(tmp, struct rbd_device, node);
+		header = &rbd_dev->header;
+
+		down_read(&header->snap_rwsem);
+
+		names = header->snap_names;
+		snapc = header->snapc;
+
+		n += snprintf(data + n, max - n, "%d\t%s\t%lld%s\n",
+			      rbd_dev->id, RBD_SNAP_HEAD_NAME,
+			      header->image_size >> 10,
+			      (!rbd_dev->cur_snap ? " (*)" : ""));
+		if (n == max)
+			break;
+
+		p = names;
+		for (i = 0; i < header->total_snaps; i++, p += strlen(p) + 1) {
+			n += snprintf(data + n, max - n, "%d\t%s\t%lld%s\n",
+			      rbd_dev->id, p, header->snap_sizes[i] >> 10,
+			      (rbd_dev->cur_snap &&
+			       (snap_index(header, i) == rbd_dev->cur_snap) ?
+			       " (*)" : ""));
+			if (n == max)
+				break;
+		}
+
+		up_read(&header->snap_rwsem);
+	}
+
+
+	ret = n;
+	mutex_unlock(&ctl_mutex);
+	return ret;
+}
+
+static ssize_t class_rbd_snaps_refresh(struct class *c,
+				struct class_attribute *attr,
+				const char *buf,
+				size_t count)
+{
+	struct rbd_device *rbd_dev = NULL;
+	int target_id, rc;
+	unsigned long ul;
+	int ret = count;
+
+	rc = strict_strtoul(buf, 10, &ul);
+	if (rc)
+		return rc;
+
+	/* convert to int; abort if we lost anything in the conversion */
+	target_id = (int) ul;
+	if (target_id != ul)
+		return -EINVAL;
+
+	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
+
+	rbd_dev = __rbd_get_dev(target_id);
+	if (!rbd_dev) {
+		ret = -ENOENT;
+		goto done;
+	}
+
+	rc = rbd_update_snaps(rbd_dev);
+	if (rc < 0)
+		ret = rc;
+
+done:
+	mutex_unlock(&ctl_mutex);
+	return ret;
+}
+
+static ssize_t class_rbd_snap_create(struct class *c,
+				struct class_attribute *attr,
+				const char *buf,
+				size_t count)
+{
+	struct rbd_device *rbd_dev = NULL;
+	int target_id, ret;
+	char *name;
+
+	name = kmalloc(RBD_MAX_SNAP_NAME_LEN + 1, GFP_KERNEL);
+	if (!name)
+		return -ENOMEM;
+
+	/* parse snaps add command */
+	if (sscanf(buf, "%d "
+		   "%" __stringify(RBD_MAX_SNAP_NAME_LEN) "s",
+		   &target_id,
+		   name) != 2) {
+		ret = -EINVAL;
+		goto done;
+	}
+
+	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
+
+	rbd_dev = __rbd_get_dev(target_id);
+	if (!rbd_dev) {
+		ret = -ENOENT;
+		goto done_unlock;
+	}
+
+	ret = rbd_header_add_snap(rbd_dev,
+				  name, GFP_KERNEL);
+	if (ret < 0)
+		goto done_unlock;
+
+	ret = rbd_update_snaps(rbd_dev);
+	if (ret < 0)
+		goto done_unlock;
+
+	ret = count;
+done_unlock:
+	mutex_unlock(&ctl_mutex);
+done:
+	kfree(name);
+	return ret;
+}
+
+static ssize_t class_rbd_rollback(struct class *c,
+				struct class_attribute *attr,
+				const char *buf,
+				size_t count)
+{
+	struct rbd_device *rbd_dev = NULL;
+	int target_id, ret;
+	u64 snapid;
+	char snap_name[RBD_MAX_SNAP_NAME_LEN];
+	u64 cur_ofs;
+	char *seg_name;
+
+	/* parse snaps add command */
+	if (sscanf(buf, "%d "
+		   "%" __stringify(RBD_MAX_SNAP_NAME_LEN) "s",
+		   &target_id,
+		   snap_name) != 2) {
+		return -EINVAL;
+	}
+
+	ret = -ENOMEM;
+	seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
+	if (!seg_name)
+		return ret;
+
+	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
+
+	rbd_dev = __rbd_get_dev(target_id);
+	if (!rbd_dev) {
+		ret = -ENOENT;
+		goto done_unlock;
+	}
+
+	ret = snap_by_name(&rbd_dev->header, snap_name, &snapid, NULL);
+	if (ret < 0)
+		goto done_unlock;
+
+	dout("snapid=%lld\n", snapid);
+
+	cur_ofs = 0;
+	while (cur_ofs < rbd_dev->header.image_size) {
+		cur_ofs += rbd_get_segment(&rbd_dev->header,
+					   rbd_dev->obj,
+					   cur_ofs, (u64)-1,
+					   seg_name, NULL);
+		dout("seg_name=%s\n", seg_name);
+
+		ret = rbd_req_sync_rollback_obj(rbd_dev, snapid, seg_name);
+		if (ret < 0)
+			pr_warning("could not roll back obj %s err=%d\n",
+				   seg_name, ret);
+	}
+
+	ret = rbd_update_snaps(rbd_dev);
+	if (ret < 0)
+		goto done_unlock;
+
+	ret = count;
+
+done_unlock:
+	mutex_unlock(&ctl_mutex);
+	kfree(seg_name);
+
+	return ret;
+}
+
+static struct class_attribute class_rbd_attrs[] = {
+	__ATTR(add,		0200, NULL, class_rbd_add),
+	__ATTR(remove,		0200, NULL, class_rbd_remove),
+	__ATTR(list,		0444, class_rbd_list, NULL),
+	__ATTR(snaps_refresh,	0200, NULL, class_rbd_snaps_refresh),
+	__ATTR(snap_create,	0200, NULL, class_rbd_snap_create),
+	__ATTR(snaps_list,	0444, class_rbd_snaps_list, NULL),
+	__ATTR(snap_rollback,	0200, NULL, class_rbd_rollback),
+	__ATTR_NULL
+};
+
+/*
+ * create control files in sysfs
+ * /sys/class/rbd/...
+ */
+static int rbd_sysfs_init(void)
+{
+	int ret = -ENOMEM;
+
+	class_rbd = kzalloc(sizeof(*class_rbd), GFP_KERNEL);
+	if (!class_rbd)
+		goto out;
+
+	class_rbd->name = DRV_NAME;
+	class_rbd->owner = THIS_MODULE;
+	class_rbd->class_release = class_rbd_release;
+	class_rbd->class_attrs = class_rbd_attrs;
+
+	ret = class_register(class_rbd);
+	if (ret)
+		goto out_class;
+	return 0;
+
+out_class:
+	kfree(class_rbd);
+	class_rbd = NULL;
+	pr_err(DRV_NAME ": failed to create class rbd\n");
+out:
+	return ret;
+}
+
+static void rbd_sysfs_cleanup(void)
+{
+	if (class_rbd)
+		class_destroy(class_rbd);
+	class_rbd = NULL;
+}
+
+int __init rbd_init(void)
+{
+	int rc;
+
+	rc = rbd_sysfs_init();
+	if (rc)
+		return rc;
+	spin_lock_init(&node_lock);
+	pr_info("loaded " DRV_NAME_LONG "\n");
+	return 0;
+}
+
+void __exit rbd_exit(void)
+{
+	rbd_sysfs_cleanup();
+}
+
+module_init(rbd_init);
+module_exit(rbd_exit);
+
+MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
+MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
+MODULE_DESCRIPTION("rados block device");
+
+/* following authorship retained from original osdblk.c */
+MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
+
+MODULE_LICENSE("GPL");
diff --git a/drivers/block/rbd_types.h b/drivers/block/rbd_types.h
new file mode 100644
index 0000000..fc6c678
--- /dev/null
+++ b/drivers/block/rbd_types.h
@@ -0,0 +1,73 @@
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2010 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#ifndef CEPH_RBD_TYPES_H
+#define CEPH_RBD_TYPES_H
+
+#include <linux/types.h>
+
+/*
+ * rbd image 'foo' consists of objects
+ *   foo.rbd      - image metadata
+ *   foo.00000000
+ *   foo.00000001
+ *   ...          - data
+ */
+
+#define RBD_SUFFIX		".rbd"
+#define RBD_DIRECTORY           "rbd_directory"
+#define RBD_INFO                "rbd_info"
+
+#define RBD_DEFAULT_OBJ_ORDER	22   /* 4MB */
+#define RBD_MIN_OBJ_ORDER       16
+#define RBD_MAX_OBJ_ORDER       30
+
+#define RBD_MAX_OBJ_NAME_LEN	96
+#define RBD_MAX_SEG_NAME_LEN	128
+
+#define RBD_COMP_NONE		0
+#define RBD_CRYPT_NONE		0
+
+#define RBD_HEADER_TEXT		"<<< Rados Block Device Image >>>\n"
+#define RBD_HEADER_SIGNATURE	"RBD"
+#define RBD_HEADER_VERSION	"001.005"
+
+struct rbd_info {
+	__le64 max_id;
+} __attribute__ ((packed));
+
+struct rbd_image_snap_ondisk {
+	__le64 id;
+	__le64 image_size;
+} __attribute__((packed));
+
+struct rbd_image_header_ondisk {
+	char text[40];
+	char block_name[24];
+	char signature[4];
+	char version[8];
+	struct {
+		__u8 order;
+		__u8 crypt_type;
+		__u8 comp_type;
+		__u8 unused;
+	} __attribute__((packed)) options;
+	__le64 image_size;
+	__le64 snap_seq;
+	__le32 snap_count;
+	__le32 reserved;
+	__le64 snap_names_len;
+	struct rbd_image_snap_ondisk snaps[0];
+} __attribute__((packed));
+
+
+#endif
-- 
1.7.0


^ permalink raw reply related	[flat|nested] 14+ messages in thread

* Re: [PATCH 0/8] rados block device and ceph refactor
  2010-08-13 17:40 [PATCH 0/8] rados block device and ceph refactor Sage Weil
                   ` (6 preceding siblings ...)
  2010-08-13 17:40 ` [PATCH 8/8] rbd: introduce rados block device (rbd), based on libceph Sage Weil
@ 2010-08-13 22:37 ` Randy Dunlap
  2010-08-13 23:11   ` Sage Weil
  7 siblings, 1 reply; 14+ messages in thread
From: Randy Dunlap @ 2010-08-13 22:37 UTC (permalink / raw)
  To: Sage Weil
  Cc: linux-kernel, linux-fsdevel, linux-scsi, ceph-devel, hch, akpm, yehuda

On Fri, 13 Aug 2010 10:40:32 -0700 Sage Weil wrote:

> Hi,
> 
> The rados block device (rbd) implements a network block device backed by 
> the Ceph distributed object store (think nbd/iSCSI, but distributed and 
> fault tolerant).  At the suggestion of Christoph and James, this version 
> of the patchset factors out the common Ceph bits (the network protocol, 
> cluster membership, and object storage parts) into a libceph module 
> (currently in net/ceph/ and include/linux/ceph/) that is shared by the 
> file system component (fs/ceph) and rbd (drivers/block/rbd.c). The first 
> few patches lay some groundwork, #7 moves does the ceph -> libceph+ceph 
> split, and #8 adds the block device driver.

Hi,
Did patch #7 make it to any mailing lists?
I didn't receive it.

---
~Randy
*** Remember to use Documentation/SubmitChecklist when testing your code ***

^ permalink raw reply	[flat|nested] 14+ messages in thread

* Re: [PATCH 0/8] rados block device and ceph refactor
  2010-08-13 22:37 ` [PATCH 0/8] rados block device and ceph refactor Randy Dunlap
@ 2010-08-13 23:11   ` Sage Weil
  2010-08-14  2:32     ` Randy Dunlap
  0 siblings, 1 reply; 14+ messages in thread
From: Sage Weil @ 2010-08-13 23:11 UTC (permalink / raw)
  To: Randy Dunlap
  Cc: linux-kernel, linux-fsdevel, linux-scsi, ceph-devel, hch, akpm, yehuda

On Fri, 13 Aug 2010, Randy Dunlap wrote:
> On Fri, 13 Aug 2010 10:40:32 -0700 Sage Weil wrote:
> 
> > Hi,
> > 
> > The rados block device (rbd) implements a network block device backed by 
> > the Ceph distributed object store (think nbd/iSCSI, but distributed and 
> > fault tolerant).  At the suggestion of Christoph and James, this version 
> > of the patchset factors out the common Ceph bits (the network protocol, 
> > cluster membership, and object storage parts) into a libceph module 
> > (currently in net/ceph/ and include/linux/ceph/) that is shared by the 
> > file system component (fs/ceph) and rbd (drivers/block/rbd.c). The first 
> > few patches lay some groundwork, #7 moves does the ceph -> libceph+ceph 
> > split, and #8 adds the block device driver.
> 
> Hi,
> Did patch #7 make it to any mailing lists?
> I didn't receive it.

Sorry, I think vger ate it (it's 850KB).  You can see it here:

http://git.kernel.org/?p=linux/kernel/git/sage/ceph-client.git;a=commit;h=a6da68196474aabcdcc2f5dab64c0b55ca5090b7

sage


^ permalink raw reply	[flat|nested] 14+ messages in thread

* Re: [PATCH 0/8] rados block device and ceph refactor
  2010-08-13 23:11   ` Sage Weil
@ 2010-08-14  2:32     ` Randy Dunlap
  0 siblings, 0 replies; 14+ messages in thread
From: Randy Dunlap @ 2010-08-14  2:32 UTC (permalink / raw)
  To: Sage Weil
  Cc: linux-kernel, linux-fsdevel, linux-scsi, ceph-devel, hch, akpm, yehuda

On Fri, 13 Aug 2010 16:11:55 -0700 (PDT) Sage Weil wrote:

> On Fri, 13 Aug 2010, Randy Dunlap wrote:
> > On Fri, 13 Aug 2010 10:40:32 -0700 Sage Weil wrote:
> > 
> > > Hi,
> > > 
> > > The rados block device (rbd) implements a network block device backed by 
> > > the Ceph distributed object store (think nbd/iSCSI, but distributed and 
> > > fault tolerant).  At the suggestion of Christoph and James, this version 
> > > of the patchset factors out the common Ceph bits (the network protocol, 
> > > cluster membership, and object storage parts) into a libceph module 
> > > (currently in net/ceph/ and include/linux/ceph/) that is shared by the 
> > > file system component (fs/ceph) and rbd (drivers/block/rbd.c). The first 
> > > few patches lay some groundwork, #7 moves does the ceph -> libceph+ceph 
> > > split, and #8 adds the block device driver.
> > 
> > Hi,
> > Did patch #7 make it to any mailing lists?
> > I didn't receive it.
> 
> Sorry, I think vger ate it (it's 850KB).  You can see it here:
> 
> http://git.kernel.org/?p=linux/kernel/git/sage/ceph-client.git;a=commit;h=a6da68196474aabcdcc2f5dab64c0b55ca5090b7

Yes, vger has a limit of 400 KB on lkml and netdev.
Other lists are probably less than that.

David M. wrote on 13-AUG-2007:

"The posting limit is 400K for linux-kernel, netdev, and one
or two of the other lists."

---
~Randy
*** Remember to use Documentation/SubmitChecklist when testing your code ***

^ permalink raw reply	[flat|nested] 14+ messages in thread

* Re: [PATCH 8/8] rbd: introduce rados block device (rbd), based on libceph
  2010-08-13 17:40 ` [PATCH 8/8] rbd: introduce rados block device (rbd), based on libceph Sage Weil
@ 2010-08-14  2:44   ` Randy Dunlap
  2010-08-14  3:29     ` Sage Weil
  0 siblings, 1 reply; 14+ messages in thread
From: Randy Dunlap @ 2010-08-14  2:44 UTC (permalink / raw)
  To: Sage Weil
  Cc: linux-kernel, linux-fsdevel, linux-scsi, ceph-devel, hch, akpm, yehuda

On Fri, 13 Aug 2010 10:40:40 -0700 Sage Weil wrote:

> From: Yehuda Sadeh <yehuda@hq.newdream.net>
> 
> The rados block device (rbd), based on osdblk, creates a block device
> that is backed by objects stored in the Ceph distributed object storage
> cluster.  Each device consists of a single metadata object and data
> striped over many data objects.
> 
> The rbd driver supports read-only snapshots.
> 
> Signed-off-by: Yehuda Sadeh <yehuda@hq.newdream.net>
> Signed-off-by: Sage Weil <sage@newdream.net>
> ---
>  MAINTAINERS               |    9 +
>  drivers/block/Kconfig     |   13 +
>  drivers/block/Makefile    |    1 +
>  drivers/block/rbd.c       | 1844 +++++++++++++++++++++++++++++++++++++++++++++
>  drivers/block/rbd_types.h |   73 ++
>  5 files changed, 1940 insertions(+), 0 deletions(-)
>  create mode 100644 drivers/block/rbd.c
>  create mode 100644 drivers/block/rbd_types.h
> 
> diff --git a/MAINTAINERS b/MAINTAINERS
> index 5102922..cb34b1b 100644
> --- a/MAINTAINERS
> +++ b/MAINTAINERS
> @@ -4694,6 +4694,15 @@ F:	fs/qnx4/
>  F:	include/linux/qnx4_fs.h
>  F:	include/linux/qnxtypes.h
>  
> +RADOS BLOCK DEVICE (RBD)
> +F:	include/linux/qnxtypes.h
> +M:	Yehuda Sadeh <yehuda@hq.newdream.net>
> +M:	Sage Weil <sage@newdream.net>
> +M:	ceph-devel@vger.kernel.org
> +S:	Supported
> +F:	drivers/block/rbd.c
> +F:	drivers/block/rbd_types.h
> +
>  RADEON FRAMEBUFFER DISPLAY DRIVER
>  M:	Benjamin Herrenschmidt <benh@kernel.crashing.org>
>  L:	linux-fbdev@vger.kernel.org
> diff --git a/drivers/block/Kconfig b/drivers/block/Kconfig
> index de27768..708104b 100644
> --- a/drivers/block/Kconfig
> +++ b/drivers/block/Kconfig
> @@ -488,4 +488,17 @@ config BLK_DEV_HD
>  
>  	  If unsure, say N.
>  
> +config BLK_DEV_RBD
> +	tristate "Rados block device (RBD)"
> +	select CEPH_LIB
> +	default n
> +	help
> +	  Say Y here if you want include the Rados block device, which stripes
> +	  a block device over objects stored in the Ceph distributed object
> +	  store.
> +
> +	  More information at http://ceph.newdream.net/.
> +
> +	  If unsure, say N.
> +
>  endif # BLK_DEV

In linux-next of 20100813, I get:

net/built-in.o: In function `read_partial_message_section':
messenger.c:(.text+0x6598b): undefined reference to `crc32c'
net/built-in.o: In function `read_partial_message_bio':
messenger.c:(.text+0x65a57): undefined reference to `crc32c'
net/built-in.o: In function `write_partial_msg_pages':
messenger.c:(.text+0x65e22): undefined reference to `crc32c'
net/built-in.o: In function `prepare_write_message':
messenger.c:(.text+0x66219): undefined reference to `crc32c'
messenger.c:(.text+0x66240): undefined reference to `crc32c'
net/built-in.o:messenger.c:(.text+0x66264): more undefined references to `crc32c' follow

when CONFIG_INET is not enabled.  It looks like BLK_DEV_RBD needs to depend on
INET and possibly on BLOCK (I sent a patch for depends on BLOCK on 2010-aug-04
due to other build errors).

---
~Randy
*** Remember to use Documentation/SubmitChecklist when testing your code ***

^ permalink raw reply	[flat|nested] 14+ messages in thread

* Re: [PATCH 8/8] rbd: introduce rados block device (rbd), based on libceph
  2010-08-14  2:44   ` Randy Dunlap
@ 2010-08-14  3:29     ` Sage Weil
  2010-08-14 14:23       ` Randy Dunlap
  0 siblings, 1 reply; 14+ messages in thread
From: Sage Weil @ 2010-08-14  3:29 UTC (permalink / raw)
  To: Randy Dunlap
  Cc: linux-kernel, linux-fsdevel, linux-scsi, ceph-devel, hch, akpm, yehuda

On Fri, 13 Aug 2010, Randy Dunlap wrote:
> On Fri, 13 Aug 2010 10:40:40 -0700 Sage Weil wrote:
> 
> > From: Yehuda Sadeh <yehuda@hq.newdream.net>
> > 
> > The rados block device (rbd), based on osdblk, creates a block device
> > that is backed by objects stored in the Ceph distributed object storage
> > cluster.  Each device consists of a single metadata object and data
> > striped over many data objects.
> > 
> > The rbd driver supports read-only snapshots.
> > 
> > Signed-off-by: Yehuda Sadeh <yehuda@hq.newdream.net>
> > Signed-off-by: Sage Weil <sage@newdream.net>
> > ---
> >  MAINTAINERS               |    9 +
> >  drivers/block/Kconfig     |   13 +
> >  drivers/block/Makefile    |    1 +
> >  drivers/block/rbd.c       | 1844 +++++++++++++++++++++++++++++++++++++++++++++
> >  drivers/block/rbd_types.h |   73 ++
> >  5 files changed, 1940 insertions(+), 0 deletions(-)
> >  create mode 100644 drivers/block/rbd.c
> >  create mode 100644 drivers/block/rbd_types.h
> > 
> > diff --git a/MAINTAINERS b/MAINTAINERS
> > index 5102922..cb34b1b 100644
> > --- a/MAINTAINERS
> > +++ b/MAINTAINERS
> > @@ -4694,6 +4694,15 @@ F:	fs/qnx4/
> >  F:	include/linux/qnx4_fs.h
> >  F:	include/linux/qnxtypes.h
> >  
> > +RADOS BLOCK DEVICE (RBD)
> > +F:	include/linux/qnxtypes.h
> > +M:	Yehuda Sadeh <yehuda@hq.newdream.net>
> > +M:	Sage Weil <sage@newdream.net>
> > +M:	ceph-devel@vger.kernel.org
> > +S:	Supported
> > +F:	drivers/block/rbd.c
> > +F:	drivers/block/rbd_types.h
> > +
> >  RADEON FRAMEBUFFER DISPLAY DRIVER
> >  M:	Benjamin Herrenschmidt <benh@kernel.crashing.org>
> >  L:	linux-fbdev@vger.kernel.org
> > diff --git a/drivers/block/Kconfig b/drivers/block/Kconfig
> > index de27768..708104b 100644
> > --- a/drivers/block/Kconfig
> > +++ b/drivers/block/Kconfig
> > @@ -488,4 +488,17 @@ config BLK_DEV_HD
> >  
> >  	  If unsure, say N.
> >  
> > +config BLK_DEV_RBD
> > +	tristate "Rados block device (RBD)"
> > +	select CEPH_LIB
> > +	default n
> > +	help
> > +	  Say Y here if you want include the Rados block device, which stripes
> > +	  a block device over objects stored in the Ceph distributed object
> > +	  store.
> > +
> > +	  More information at http://ceph.newdream.net/.
> > +
> > +	  If unsure, say N.
> > +
> >  endif # BLK_DEV
> 
> In linux-next of 20100813, I get:
> 
> net/built-in.o: In function `read_partial_message_section':
> messenger.c:(.text+0x6598b): undefined reference to `crc32c'
> net/built-in.o: In function `read_partial_message_bio':
> messenger.c:(.text+0x65a57): undefined reference to `crc32c'
> net/built-in.o: In function `write_partial_msg_pages':
> messenger.c:(.text+0x65e22): undefined reference to `crc32c'
> net/built-in.o: In function `prepare_write_message':
> messenger.c:(.text+0x66219): undefined reference to `crc32c'
> messenger.c:(.text+0x66240): undefined reference to `crc32c'
> net/built-in.o:messenger.c:(.text+0x66264): more undefined references to `crc32c' follow
> 
> when CONFIG_INET is not enabled.  It looks like BLK_DEV_RBD needs to depend on
> INET and possibly on BLOCK (I sent a patch for depends on BLOCK on 2010-aug-04
> due to other build errors).

I see the problem: BLK_DEV_RBD and CEPH_FS both have 'select CEPH_LIB' and 
CEPH_LIB depends on INET and selects LIBCRC32C, but kconfig doesn't 
propagate those backward dependencies for you.  It looks like CEPH_FS and 
BLK_DEV_RBD should then depend on/select CEPH_LIB _and_ its dependencies.  
Patching that (and the BLOCK dependency) up now...

Unless there is a better way to do it?

Thanks!
sage


diff --git a/drivers/block/Kconfig b/drivers/block/Kconfig
index 708104b..4a6e1b7 100644
--- a/drivers/block/Kconfig
+++ b/drivers/block/Kconfig
@@ -490,7 +490,10 @@ config BLK_DEV_HD
 
 config BLK_DEV_RBD
 	tristate "Rados block device (RBD)"
+	depends on INET && EXPERIMENTAL && BLOCK
 	select CEPH_LIB
+	select LIBCRC32C
+	select CRYPTO_AES
 	default n
 	help
 	  Say Y here if you want include the Rados block device, which stripes
diff --git a/fs/ceph/Kconfig b/fs/ceph/Kconfig
index 89f9718..73a7b31 100644
--- a/fs/ceph/Kconfig
+++ b/fs/ceph/Kconfig
@@ -1,9 +1,9 @@
 config CEPH_FS
         tristate "Ceph distributed file system (EXPERIMENTAL)"
 	depends on INET && EXPERIMENTAL
+	select CEPH_LIB
 	select LIBCRC32C
 	select CRYPTO_AES
-	select CEPH_LIB
 	default n
 	help
 	  Choose Y or M here to include support for mounting the

^ permalink raw reply related	[flat|nested] 14+ messages in thread

* Re: [PATCH 8/8] rbd: introduce rados block device (rbd), based on libceph
  2010-08-14  3:29     ` Sage Weil
@ 2010-08-14 14:23       ` Randy Dunlap
  0 siblings, 0 replies; 14+ messages in thread
From: Randy Dunlap @ 2010-08-14 14:23 UTC (permalink / raw)
  To: Sage Weil
  Cc: linux-kernel, linux-fsdevel, linux-scsi, ceph-devel, hch, akpm, yehuda

On 08/13/10 20:29, Sage Weil wrote:
> On Fri, 13 Aug 2010, Randy Dunlap wrote:
>> On Fri, 13 Aug 2010 10:40:40 -0700 Sage Weil wrote:
>>
>>> From: Yehuda Sadeh <yehuda@hq.newdream.net>
>>>
>>> The rados block device (rbd), based on osdblk, creates a block device
>>> that is backed by objects stored in the Ceph distributed object storage
>>> cluster.  Each device consists of a single metadata object and data
>>> striped over many data objects.
>>>
>>> The rbd driver supports read-only snapshots.
>>>
>>> Signed-off-by: Yehuda Sadeh <yehuda@hq.newdream.net>
>>> Signed-off-by: Sage Weil <sage@newdream.net>
>>> ---
>>>  MAINTAINERS               |    9 +
>>>  drivers/block/Kconfig     |   13 +
>>>  drivers/block/Makefile    |    1 +
>>>  drivers/block/rbd.c       | 1844 +++++++++++++++++++++++++++++++++++++++++++++
>>>  drivers/block/rbd_types.h |   73 ++
>>>  5 files changed, 1940 insertions(+), 0 deletions(-)
>>>  create mode 100644 drivers/block/rbd.c
>>>  create mode 100644 drivers/block/rbd_types.h
>>>
>>> diff --git a/MAINTAINERS b/MAINTAINERS
>>> index 5102922..cb34b1b 100644
>>> --- a/MAINTAINERS
>>> +++ b/MAINTAINERS
>>> @@ -4694,6 +4694,15 @@ F:	fs/qnx4/
>>>  F:	include/linux/qnx4_fs.h
>>>  F:	include/linux/qnxtypes.h
>>>  
>>> +RADOS BLOCK DEVICE (RBD)
>>> +F:	include/linux/qnxtypes.h
>>> +M:	Yehuda Sadeh <yehuda@hq.newdream.net>
>>> +M:	Sage Weil <sage@newdream.net>
>>> +M:	ceph-devel@vger.kernel.org
>>> +S:	Supported
>>> +F:	drivers/block/rbd.c
>>> +F:	drivers/block/rbd_types.h
>>> +
>>>  RADEON FRAMEBUFFER DISPLAY DRIVER
>>>  M:	Benjamin Herrenschmidt <benh@kernel.crashing.org>
>>>  L:	linux-fbdev@vger.kernel.org
>>> diff --git a/drivers/block/Kconfig b/drivers/block/Kconfig
>>> index de27768..708104b 100644
>>> --- a/drivers/block/Kconfig
>>> +++ b/drivers/block/Kconfig
>>> @@ -488,4 +488,17 @@ config BLK_DEV_HD
>>>  
>>>  	  If unsure, say N.
>>>  
>>> +config BLK_DEV_RBD
>>> +	tristate "Rados block device (RBD)"
>>> +	select CEPH_LIB
>>> +	default n
>>> +	help
>>> +	  Say Y here if you want include the Rados block device, which stripes
>>> +	  a block device over objects stored in the Ceph distributed object
>>> +	  store.
>>> +
>>> +	  More information at http://ceph.newdream.net/.
>>> +
>>> +	  If unsure, say N.
>>> +
>>>  endif # BLK_DEV
>>
>> In linux-next of 20100813, I get:
>>
>> net/built-in.o: In function `read_partial_message_section':
>> messenger.c:(.text+0x6598b): undefined reference to `crc32c'
>> net/built-in.o: In function `read_partial_message_bio':
>> messenger.c:(.text+0x65a57): undefined reference to `crc32c'
>> net/built-in.o: In function `write_partial_msg_pages':
>> messenger.c:(.text+0x65e22): undefined reference to `crc32c'
>> net/built-in.o: In function `prepare_write_message':
>> messenger.c:(.text+0x66219): undefined reference to `crc32c'
>> messenger.c:(.text+0x66240): undefined reference to `crc32c'
>> net/built-in.o:messenger.c:(.text+0x66264): more undefined references to `crc32c' follow
>>
>> when CONFIG_INET is not enabled.  It looks like BLK_DEV_RBD needs to depend on
>> INET and possibly on BLOCK (I sent a patch for depends on BLOCK on 2010-aug-04
>> due to other build errors).
> 
> I see the problem: BLK_DEV_RBD and CEPH_FS both have 'select CEPH_LIB' and 
> CEPH_LIB depends on INET and selects LIBCRC32C, but kconfig doesn't 
> propagate those backward dependencies for you.  It looks like CEPH_FS and 
> BLK_DEV_RBD should then depend on/select CEPH_LIB _and_ its dependencies.  
> Patching that (and the BLOCK dependency) up now...
> 
> Unless there is a better way to do it?

Not currently, so that is what it needs.
Thanks.

> Thanks!
> sage
> 
> 
> diff --git a/drivers/block/Kconfig b/drivers/block/Kconfig
> index 708104b..4a6e1b7 100644
> --- a/drivers/block/Kconfig
> +++ b/drivers/block/Kconfig
> @@ -490,7 +490,10 @@ config BLK_DEV_HD
>  
>  config BLK_DEV_RBD
>  	tristate "Rados block device (RBD)"
> +	depends on INET && EXPERIMENTAL && BLOCK
>  	select CEPH_LIB
> +	select LIBCRC32C
> +	select CRYPTO_AES
>  	default n
>  	help
>  	  Say Y here if you want include the Rados block device, which stripes
> diff --git a/fs/ceph/Kconfig b/fs/ceph/Kconfig
> index 89f9718..73a7b31 100644
> --- a/fs/ceph/Kconfig
> +++ b/fs/ceph/Kconfig
> @@ -1,9 +1,9 @@
>  config CEPH_FS
>          tristate "Ceph distributed file system (EXPERIMENTAL)"
>  	depends on INET && EXPERIMENTAL
> +	select CEPH_LIB
>  	select LIBCRC32C
>  	select CRYPTO_AES
> -	select CEPH_LIB
>  	default n
>  	help
>  	  Choose Y or M here to include support for mounting the


-- 
~Randy
*** Remember to use Documentation/SubmitChecklist when testing your code ***

^ permalink raw reply	[flat|nested] 14+ messages in thread

end of thread, other threads:[~2010-08-14 14:24 UTC | newest]

Thread overview: 14+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2010-08-13 17:40 [PATCH 0/8] rados block device and ceph refactor Sage Weil
2010-08-13 17:40 ` [PATCH 1/8] ceph-rbd: lookup pool in osdmap by name Sage Weil
2010-08-13 17:40 ` [PATCH 2/8] ceph-rbd: refactor osdc requests creation functions Sage Weil
2010-08-13 17:40 ` [PATCH 3/8] ceph-rbd: messenger and osdc changes for rbd Sage Weil
2010-08-13 17:40 ` [PATCH 4/8] ceph-rbd: enable creation of clients that don't need mds Sage Weil
2010-08-13 17:40 ` [PATCH 5/8] ceph-rbd: refactor mount related functions, add helpers Sage Weil
2010-08-13 17:40 ` [PATCH 6/8] ceph-rbd: osdc support for osd call and rollback operations Sage Weil
2010-08-13 17:40 ` [PATCH 8/8] rbd: introduce rados block device (rbd), based on libceph Sage Weil
2010-08-14  2:44   ` Randy Dunlap
2010-08-14  3:29     ` Sage Weil
2010-08-14 14:23       ` Randy Dunlap
2010-08-13 22:37 ` [PATCH 0/8] rados block device and ceph refactor Randy Dunlap
2010-08-13 23:11   ` Sage Weil
2010-08-14  2:32     ` Randy Dunlap

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).