All of lore.kernel.org
 help / color / mirror / Atom feed
* [PATCH 01/18] libceph: add scatterlist messenger data type
@ 2015-07-29  9:23 mchristi
  2015-07-29  9:23 ` [PATCH 02/18] rbd: add support for scatterlist obj_request_type mchristi
                   ` (19 more replies)
  0 siblings, 20 replies; 33+ messages in thread
From: mchristi @ 2015-07-29  9:23 UTC (permalink / raw)
  To: ceph-devel, target-devel

From: Mike Christie <michaelc@cs.wisc.edu>

LIO uses scatterlist for its page/data management. This patch
adds a scatterlist messenger data type, so LIO can pass its sg
down directly to rbd.

Signed-off-by: Mike Christie <michaelc@cs.wisc.edu>
---
 include/linux/ceph/messenger.h  | 13 ++++++
 include/linux/ceph/osd_client.h | 12 +++++-
 net/ceph/messenger.c            | 96 +++++++++++++++++++++++++++++++++++++++++
 net/ceph/osd_client.c           | 26 +++++++++++
 4 files changed, 146 insertions(+), 1 deletion(-)

diff --git a/include/linux/ceph/messenger.h b/include/linux/ceph/messenger.h
index 3775327..bc1bde8 100644
--- a/include/linux/ceph/messenger.h
+++ b/include/linux/ceph/messenger.h
@@ -79,6 +79,7 @@ enum ceph_msg_data_type {
 #ifdef CONFIG_BLOCK
 	CEPH_MSG_DATA_BIO,	/* data source/destination is a bio list */
 #endif /* CONFIG_BLOCK */
+	CEPH_MSG_DATA_SG,	/* data source/destination is a scatterlist */
 };
 
 static __inline__ bool ceph_msg_data_type_valid(enum ceph_msg_data_type type)
@@ -90,6 +91,7 @@ static __inline__ bool ceph_msg_data_type_valid(enum ceph_msg_data_type type)
 #ifdef CONFIG_BLOCK
 	case CEPH_MSG_DATA_BIO:
 #endif /* CONFIG_BLOCK */
+	case CEPH_MSG_DATA_SG:
 		return true;
 	default:
 		return false;
@@ -112,6 +114,11 @@ struct ceph_msg_data {
 			unsigned int	alignment;	/* first page */
 		};
 		struct ceph_pagelist	*pagelist;
+		struct {
+			struct scatterlist *sgl;
+			unsigned int	sgl_init_offset;
+			u64		sgl_length;
+		};
 	};
 };
 
@@ -139,6 +146,10 @@ struct ceph_msg_data_cursor {
 			struct page	*page;		/* page from list */
 			size_t		offset;		/* bytes from list */
 		};
+		struct {
+			struct scatterlist	*sg;		/* curr sg */
+			unsigned int		sg_consumed;
+		};
 	};
 };
 
@@ -294,6 +305,8 @@ extern void ceph_msg_data_add_pagelist(struct ceph_msg *msg,
 extern void ceph_msg_data_add_bio(struct ceph_msg *msg, struct bio *bio,
 				size_t length);
 #endif /* CONFIG_BLOCK */
+extern void ceph_msg_data_add_sg(struct ceph_msg *msg, struct scatterlist *sgl,
+				 unsigned int sgl_init_offset, u64 length);
 
 extern struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags,
 				     bool can_fail);
diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h
index 0890167..2152f06 100644
--- a/include/linux/ceph/osd_client.h
+++ b/include/linux/ceph/osd_client.h
@@ -52,6 +52,7 @@ enum ceph_osd_data_type {
 #ifdef CONFIG_BLOCK
 	CEPH_OSD_DATA_TYPE_BIO,
 #endif /* CONFIG_BLOCK */
+	CEPH_OSD_DATA_TYPE_SG,
 };
 
 struct ceph_osd_data {
@@ -70,6 +71,11 @@ struct ceph_osd_data {
 			struct bio	*bio;		/* list of bios */
 			size_t		bio_length;	/* total in list */
 		};
+		struct {
+			struct scatterlist *sgl;
+			size_t		sgl_length;
+			unsigned int	sgl_init_offset;
+		};
 #endif /* CONFIG_BLOCK */
 	};
 };
@@ -313,7 +319,11 @@ extern void osd_req_op_extent_osd_data_bio(struct ceph_osd_request *,
 					unsigned int which,
 					struct bio *bio, size_t bio_length);
 #endif /* CONFIG_BLOCK */
-
+extern void osd_req_op_extent_osd_data_sg(struct ceph_osd_request *,
+					unsigned int which,
+					struct scatterlist *sgl,
+					unsigned int init_sg_offset,
+					u64 length);
 extern void osd_req_op_cls_request_data_pagelist(struct ceph_osd_request *,
 					unsigned int which,
 					struct ceph_pagelist *pagelist);
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index e3be1d2..08d39fb 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -893,6 +893,75 @@ static bool ceph_msg_data_bio_advance(struct ceph_msg_data_cursor *cursor,
 #endif /* CONFIG_BLOCK */
 
 /*
+ * For a sg data item, a piece is whatever remains of the next
+ * entry in the current sg entry, or the first entry in the next
+ * sg in the list.
+ */
+static void ceph_msg_data_sg_cursor_init(struct ceph_msg_data_cursor *cursor,
+					 size_t length)
+{
+	struct ceph_msg_data *data = cursor->data;
+	struct scatterlist *sg;
+
+	BUG_ON(data->type != CEPH_MSG_DATA_SG);
+
+	sg = data->sgl;
+	BUG_ON(!sg);
+
+	cursor->resid = min_t(u64, length, data->sgl_length);
+	cursor->sg = sg;
+	cursor->sg_consumed = data->sgl_init_offset;
+	cursor->last_piece = cursor->resid <= sg->length;
+}
+
+static struct page *ceph_msg_data_sg_next(struct ceph_msg_data_cursor *cursor,
+					  size_t *page_offset, size_t *length)
+{
+	struct ceph_msg_data *data = cursor->data;
+	struct scatterlist *sg;
+
+	BUG_ON(data->type != CEPH_MSG_DATA_SG);
+
+	sg = cursor->sg;
+	BUG_ON(!sg);
+
+	*page_offset = sg->offset + cursor->sg_consumed;
+
+	if (cursor->last_piece)
+		*length = cursor->resid;
+	else
+		*length = sg->length - cursor->sg_consumed;
+
+	/* currently support non clustered sg pages */
+	return sg_page(sg);
+}
+
+static bool ceph_msg_data_sg_advance(struct ceph_msg_data_cursor *cursor,
+				     size_t bytes)
+{
+	BUG_ON(cursor->data->type != CEPH_MSG_DATA_SG);
+
+	/* Advance the cursor offset */
+	BUG_ON(cursor->resid < bytes);
+	cursor->resid -= bytes;
+	cursor->sg_consumed += bytes;
+
+	if (!bytes || cursor->sg_consumed < cursor->sg->length)
+		return false;	/* more bytes to process in the current page */
+
+	if (!cursor->resid)
+		return false;	/* no more data */
+
+	/* For WRITE_SAME we have a single sg that is written over and over */
+	if (sg_next(cursor->sg))
+		cursor->sg = sg_next(cursor->sg);
+	cursor->sg_consumed = 0;
+
+	cursor->last_piece = cursor->resid <= cursor->sg->length;
+	return true;
+}
+
+/*
  * For a page array, a piece comes from the first page in the array
  * that has not already been fully consumed.
  */
@@ -1075,6 +1144,9 @@ static void __ceph_msg_data_cursor_init(struct ceph_msg_data_cursor *cursor)
 		ceph_msg_data_bio_cursor_init(cursor, length);
 		break;
 #endif /* CONFIG_BLOCK */
+	case CEPH_MSG_DATA_SG:
+		ceph_msg_data_sg_cursor_init(cursor, length);
+		break;
 	case CEPH_MSG_DATA_NONE:
 	default:
 		/* BUG(); */
@@ -1123,6 +1195,9 @@ static struct page *ceph_msg_data_next(struct ceph_msg_data_cursor *cursor,
 		page = ceph_msg_data_bio_next(cursor, page_offset, length);
 		break;
 #endif /* CONFIG_BLOCK */
+	case CEPH_MSG_DATA_SG:
+		page = ceph_msg_data_sg_next(cursor, page_offset, length);
+		break;
 	case CEPH_MSG_DATA_NONE:
 	default:
 		page = NULL;
@@ -1159,6 +1234,9 @@ static bool ceph_msg_data_advance(struct ceph_msg_data_cursor *cursor,
 		new_piece = ceph_msg_data_bio_advance(cursor, bytes);
 		break;
 #endif /* CONFIG_BLOCK */
+	case CEPH_MSG_DATA_SG:
+		new_piece = ceph_msg_data_sg_advance(cursor, bytes);
+		break;
 	case CEPH_MSG_DATA_NONE:
 	default:
 		BUG();
@@ -3182,6 +3260,24 @@ void ceph_msg_data_add_bio(struct ceph_msg *msg, struct bio *bio,
 EXPORT_SYMBOL(ceph_msg_data_add_bio);
 #endif	/* CONFIG_BLOCK */
 
+void ceph_msg_data_add_sg(struct ceph_msg *msg, struct scatterlist *sgl,
+			  unsigned int sgl_init_offset, u64 length)
+{
+	struct ceph_msg_data *data;
+
+	BUG_ON(!sgl);
+
+	data = ceph_msg_data_create(CEPH_MSG_DATA_SG);
+	BUG_ON(!data);
+	data->sgl = sgl;
+	data->sgl_length = length;
+	data->sgl_init_offset = sgl_init_offset;
+
+	list_add_tail(&data->links, &msg->data);
+	msg->data_length += length;
+}
+EXPORT_SYMBOL(ceph_msg_data_add_sg);
+
 /*
  * construct a new message with given type, size
  * the new msg has a ref count of 1.
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index f8178b7..fd0a52e 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -128,6 +128,16 @@ static void ceph_osd_data_bio_init(struct ceph_osd_data *osd_data,
 }
 #endif /* CONFIG_BLOCK */
 
+static void ceph_osd_data_sg_init(struct ceph_osd_data *osd_data,
+				  struct scatterlist *sgl,
+				  unsigned int init_sg_offset, u64 length)
+{
+	osd_data->type = CEPH_OSD_DATA_TYPE_SG;
+	osd_data->sgl = sgl;
+	osd_data->sgl_length = length;
+	osd_data->sgl_init_offset = init_sg_offset;
+}
+
 #define osd_req_op_data(oreq, whch, typ, fld)	\
 	({						\
 		BUG_ON(whch >= (oreq)->r_num_ops);	\
@@ -206,6 +216,17 @@ void osd_req_op_extent_osd_data_bio(struct ceph_osd_request *osd_req,
 EXPORT_SYMBOL(osd_req_op_extent_osd_data_bio);
 #endif /* CONFIG_BLOCK */
 
+void osd_req_op_extent_osd_data_sg(struct ceph_osd_request *osd_req,
+			unsigned int which, struct scatterlist *sgl,
+			unsigned int init_sg_offset, u64 length)
+{
+	struct ceph_osd_data *osd_data;
+
+	osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
+	ceph_osd_data_sg_init(osd_data, sgl, init_sg_offset, length);
+}
+EXPORT_SYMBOL(osd_req_op_extent_osd_data_sg);
+
 static void osd_req_op_cls_request_info_pagelist(
 			struct ceph_osd_request *osd_req,
 			unsigned int which, struct ceph_pagelist *pagelist)
@@ -317,6 +338,8 @@ static u64 ceph_osd_data_length(struct ceph_osd_data *osd_data)
 	case CEPH_OSD_DATA_TYPE_BIO:
 		return (u64)osd_data->bio_length;
 #endif /* CONFIG_BLOCK */
+	case CEPH_OSD_DATA_TYPE_SG:
+		return osd_data->sgl_length;
 	default:
 		WARN(true, "unrecognized data type %d\n", (int)osd_data->type);
 		return 0;
@@ -727,6 +750,9 @@ static void ceph_osdc_msg_data_add(struct ceph_msg *msg,
 	} else if (osd_data->type == CEPH_OSD_DATA_TYPE_BIO) {
 		ceph_msg_data_add_bio(msg, osd_data->bio, length);
 #endif
+	} else if (osd_data->type == CEPH_OSD_DATA_TYPE_SG) {
+		ceph_msg_data_add_sg(msg, osd_data->sgl,
+				     osd_data->sgl_init_offset, length);
 	} else {
 		BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_NONE);
 	}
-- 
1.8.3.1


^ permalink raw reply related	[flat|nested] 33+ messages in thread

* [PATCH 02/18] rbd: add support for scatterlist obj_request_type
  2015-07-29  9:23 [PATCH 01/18] libceph: add scatterlist messenger data type mchristi
@ 2015-07-29  9:23 ` mchristi
  2015-07-29  9:23 ` [PATCH 03/18] rbd: add lio specific data area mchristi
                   ` (18 subsequent siblings)
  19 siblings, 0 replies; 33+ messages in thread
From: mchristi @ 2015-07-29  9:23 UTC (permalink / raw)
  To: ceph-devel, target-devel

From: Mike Christie <michaelc@cs.wisc.edu>

This adds support for a scatterlist rbd obj_request_type, so LIO
can pass down its sg to rbd.

Signed-off-by: Mike Christie <michaelc@cs.wisc.edu>
---
 drivers/block/rbd.c | 105 ++++++++++++++++++++++++++++++++++++++++++++++------
 1 file changed, 94 insertions(+), 11 deletions(-)

diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index 60257cf..bc0466c 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -44,6 +44,7 @@
 #include <linux/slab.h>
 #include <linux/idr.h>
 #include <linux/workqueue.h>
+#include <linux/scatterlist.h>
 
 #include "rbd_types.h"
 
@@ -208,7 +209,7 @@ struct rbd_obj_request;
 typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
 
 enum obj_request_type {
-	OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
+	OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES, OBJ_REQUEST_SG,
 };
 
 enum obj_operation_type {
@@ -264,6 +265,10 @@ struct rbd_obj_request {
 			struct page	**pages;
 			u32		page_count;
 		};
+		struct {
+			struct scatterlist	*sg;
+			unsigned int		init_sg_offset;
+		};
 	};
 	struct page		**copyup_pages;
 	u32			copyup_page_count;
@@ -295,16 +300,22 @@ struct rbd_img_request {
 		u64			snap_id;	/* for reads */
 		struct ceph_snap_context *snapc;	/* for writes */
 	};
-	union {
-		struct request		*rq;		/* block request */
-		struct rbd_obj_request	*obj_request;	/* obj req initiator */
-	};
+
+	struct request		*rq;		/* block request */
+	struct rbd_obj_request	*obj_request;	/* obj req initiator */
+
 	struct page		**copyup_pages;
 	u32			copyup_page_count;
 	spinlock_t		completion_lock;/* protects next_completion */
 	u32			next_completion;
 	rbd_img_callback_t	callback;
+	/*
+	 * xferred is the bytes that have successfully been transferred.
+	 * completed is the bytes that have been accounted for and includes
+	 * both failed and successfully transffered bytes.
+	 */
 	u64			xferred;/* aggregate bytes transferred */
+	u64			completed;
 	int			result;	/* first nonzero obj_request result */
 
 	u32			obj_request_count;
@@ -1273,6 +1284,34 @@ static void zero_bio_chain(struct bio *chain, int start_ofs)
 	}
 }
 
+static void zero_sg(struct scatterlist *sgl, u64 start, u64 length)
+{
+	struct scatterlist *sg = sgl;
+	u64 end = start + length;
+	u64 pos = 0;
+
+	while (pos < end && sg) {
+		if (pos + sg->length > start) {
+			int sg_offset = max_t(int, start - pos, 0);
+			unsigned int length = min_t(unsigned int,
+						    sg->length - sg_offset,
+						    end - pos);
+			void *kaddr;
+			unsigned long flags;
+
+			local_irq_save(flags);
+			kaddr = kmap_atomic(sg_page(sg));
+			memset(kaddr + sg_offset + sg->offset, 0, length);
+			flush_dcache_page(sg_page(sg));
+			kunmap_atomic(kaddr);
+			local_irq_restore(flags);
+		}
+
+		pos += sg->length;
+		sg = sg_next(sg);
+	}
+}
+
 /*
  * similar to zero_bio_chain(), zeros data defined by a page array,
  * starting at the given byte offset from the start of the array and
@@ -1547,6 +1586,7 @@ static bool obj_request_type_valid(enum obj_request_type type)
 	case OBJ_REQUEST_NODATA:
 	case OBJ_REQUEST_BIO:
 	case OBJ_REQUEST_PAGES:
+	case OBJ_REQUEST_SG:
 		return true;
 	default:
 		return false;
@@ -1730,14 +1770,18 @@ rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
 	if (obj_request->result == -ENOENT) {
 		if (obj_request->type == OBJ_REQUEST_BIO)
 			zero_bio_chain(obj_request->bio_list, 0);
-		else
+		else if (obj_request->type == OBJ_REQUEST_PAGES)
 			zero_pages(obj_request->pages, 0, length);
+		else if (obj_request->type == OBJ_REQUEST_SG)
+			zero_sg(obj_request->sg, 0, length);
 		obj_request->result = 0;
 	} else if (xferred < length && !obj_request->result) {
 		if (obj_request->type == OBJ_REQUEST_BIO)
 			zero_bio_chain(obj_request->bio_list, xferred);
-		else
+		else if (obj_request->type == OBJ_REQUEST_PAGES)
 			zero_pages(obj_request->pages, xferred, length);
+		else if (obj_request->type == OBJ_REQUEST_SG)
+			zero_sg(obj_request->sg, xferred, length);
 	}
 	obj_request->xferred = length;
 	obj_request_done_set(obj_request);
@@ -2067,6 +2111,7 @@ static void rbd_obj_request_destroy(struct kref *kref)
 	rbd_assert(obj_request_type_valid(obj_request->type));
 	switch (obj_request->type) {
 	case OBJ_REQUEST_NODATA:
+	case OBJ_REQUEST_SG:
 		break;		/* Nothing to do */
 	case OBJ_REQUEST_BIO:
 		if (obj_request->bio_list)
@@ -2168,6 +2213,7 @@ static struct rbd_img_request *rbd_img_request_create(
 	img_request->offset = offset;
 	img_request->length = length;
 	img_request->flags = 0;
+	img_request->completed = 0;
 	if (op_type == OBJ_OP_DISCARD) {
 		img_request_discard_set(img_request);
 		img_request->snapc = snapc;
@@ -2293,6 +2339,7 @@ static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
 		 */
 		xferred = obj_request->length;
 	}
+	img_request->completed += xferred;
 
 	/* Image object requests don't own their page array */
 
@@ -2304,12 +2351,15 @@ static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
 	if (img_request_child_test(img_request)) {
 		rbd_assert(img_request->obj_request != NULL);
 		more = obj_request->which < img_request->obj_request_count - 1;
-	} else {
-		rbd_assert(img_request->rq != NULL);
-
+	} else if (img_request->rq) {
 		more = blk_update_request(img_request->rq, result, xferred);
 		if (!more)
 			__blk_mq_end_request(img_request->rq, result);
+	} else {
+		if (img_request->completed < img_request->length)
+			more = true;
+		else
+			more = false;
 	}
 
 	return more;
@@ -2411,6 +2461,10 @@ static void rbd_img_obj_request_fill(struct rbd_obj_request *obj_request,
 		osd_req_op_extent_osd_data_pages(osd_request, num_ops,
 					obj_request->pages, length,
 					offset & ~PAGE_MASK, false, false);
+	else if (obj_request->type == OBJ_REQUEST_SG)
+		osd_req_op_extent_osd_data_sg(osd_request, num_ops,
+					obj_request->sg,
+					obj_request->init_sg_offset, length);
 
 	/* Discards are also writes */
 	if (op_type == OBJ_OP_WRITE || op_type == OBJ_OP_DISCARD)
@@ -2436,7 +2490,9 @@ static int rbd_img_request_fill(struct rbd_img_request *img_request,
 	struct rbd_obj_request *next_obj_request;
 	struct bio *bio_list = NULL;
 	unsigned int bio_offset = 0;
+	unsigned int sg_offset = 0;
 	struct page **pages = NULL;
+	struct scatterlist *sgl = NULL;
 	enum obj_operation_type op_type;
 	u64 img_offset;
 	u64 resid;
@@ -2455,6 +2511,8 @@ static int rbd_img_request_fill(struct rbd_img_request *img_request,
 			   bio_list->bi_iter.bi_sector << SECTOR_SHIFT);
 	} else if (type == OBJ_REQUEST_PAGES) {
 		pages = data_desc;
+	} else if (type == OBJ_REQUEST_SG) {
+		sgl = data_desc;
 	}
 
 	while (resid) {
@@ -2502,6 +2560,27 @@ static int rbd_img_request_fill(struct rbd_img_request *img_request,
 			if ((offset + length) & ~PAGE_MASK)
 				page_count--;	/* more on last page */
 			pages += page_count;
+		} else if (type == OBJ_REQUEST_SG) {
+			u64 sg_length = 0;
+
+			obj_request->init_sg_offset = sg_offset;
+			obj_request->sg = sgl;
+			do {
+				sg_length += (sgl->length - sg_offset);
+				sg_offset = 0;
+				if (sg_length > length) {
+					sg_offset = sgl->length -
+							(sg_length - length);
+					break;
+				}
+				/*
+				 * For WRITE_SAME we have a single sg that
+				 * is written possibly multiple times over
+				 * img_request->length bytes.
+				 */
+				if (sg_next(sgl))
+					sgl = sg_next(sgl);
+			} while (true);
 		}
 
 		osd_req = rbd_osd_req_create(rbd_dev, op_type,
@@ -3058,9 +3137,13 @@ static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
 	if (obj_request->type == OBJ_REQUEST_BIO)
 		result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
 						obj_request->bio_list);
-	else
+	else if (obj_request->type == OBJ_REQUEST_PAGES)
 		result = rbd_img_request_fill(img_request, OBJ_REQUEST_PAGES,
 						obj_request->pages);
+	else
+		result = rbd_img_request_fill(img_request, OBJ_REQUEST_SG,
+						obj_request->sg);
+
 	if (result)
 		goto out_err;
 
-- 
1.8.3.1


^ permalink raw reply related	[flat|nested] 33+ messages in thread

* [PATCH 03/18] rbd: add lio specific data area
  2015-07-29  9:23 [PATCH 01/18] libceph: add scatterlist messenger data type mchristi
  2015-07-29  9:23 ` [PATCH 02/18] rbd: add support for scatterlist obj_request_type mchristi
@ 2015-07-29  9:23 ` mchristi
  2015-07-29  9:23 ` [PATCH 04/18] libceph: support bidirectional requests mchristi
                   ` (17 subsequent siblings)
  19 siblings, 0 replies; 33+ messages in thread
From: mchristi @ 2015-07-29  9:23 UTC (permalink / raw)
  To: ceph-devel, target-devel

From: Mike Christie <michaelc@cs.wisc.edu>

The LIO RBD backend is going to make img_request calls, so this patch
adds a pointer so it can store its cmd for completions.

Signed-off-by: Mike Christie <michaelc@cs.wisc.edu>
---
 drivers/block/rbd.c | 1 +
 1 file changed, 1 insertion(+)

diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index bc0466c..1df7bdd 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -303,6 +303,7 @@ struct rbd_img_request {
 
 	struct request		*rq;		/* block request */
 	struct rbd_obj_request	*obj_request;	/* obj req initiator */
+	void			*lio_cmd_data;	/* lio specific data */
 
 	struct page		**copyup_pages;
 	u32			copyup_page_count;
-- 
1.8.3.1

^ permalink raw reply related	[flat|nested] 33+ messages in thread

* [PATCH 04/18] libceph: support bidirectional requests
  2015-07-29  9:23 [PATCH 01/18] libceph: add scatterlist messenger data type mchristi
  2015-07-29  9:23 ` [PATCH 02/18] rbd: add support for scatterlist obj_request_type mchristi
  2015-07-29  9:23 ` [PATCH 03/18] rbd: add lio specific data area mchristi
@ 2015-07-29  9:23 ` mchristi
  2015-11-21 23:32   ` Goldwyn Rodrigues
  2015-07-29  9:23 ` [PATCH 05/18] libceph: add support for CMPEXT compare extent requests mchristi
                   ` (16 subsequent siblings)
  19 siblings, 1 reply; 33+ messages in thread
From: mchristi @ 2015-07-29  9:23 UTC (permalink / raw)
  To: ceph-devel, target-devel

From: Mike Christie <michaelc@cs.wisc.edu>

The next patch will add support for SCSI's compare and write
command. This command sends N bytes, compares them to N bytes on disk,
then returns success or the offset in the buffer where a miscompare
occured. For Ceph support, I implemented this as a multiple op request:

1. a new CMPEXT (compare extent) operation that compare N bytes
and if a miscompare occured then returns the offset it miscompared
and also returns the buffer.
2. a write request. If the CMPEXT succeeds then this will be executed.

This patch modifies libceph so it can support both a request buffer
and response buffer for extent based IO, so the CMPEXT command can
send its comparision buffer and also receive the failed buffer if needed.

Signed-off-by: Mike Christie <michaelc@cs.wisc.edu>
---
 fs/ceph/addr.c                  |   4 +-
 include/linux/ceph/osd_client.h |   3 +-
 net/ceph/osd_client.c           | 109 +++++++++++++++++++++++++++++++---------
 3 files changed, 89 insertions(+), 27 deletions(-)

diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c
index 890c509..0360b44 100644
--- a/fs/ceph/addr.c
+++ b/fs/ceph/addr.c
@@ -269,7 +269,7 @@ static void finish_read(struct ceph_osd_request *req, struct ceph_msg *msg)
 	dout("finish_read %p req %p rc %d bytes %d\n", inode, req, rc, bytes);
 
 	/* unlock all pages, zeroing any data we didn't read */
-	osd_data = osd_req_op_extent_osd_data(req, 0);
+	osd_data = osd_req_op_extent_osd_response_data(req, 0);
 	BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_PAGES);
 	num_pages = calc_pages_for((u64)osd_data->alignment,
 					(u64)osd_data->length);
@@ -618,7 +618,7 @@ static void writepages_finish(struct ceph_osd_request *req,
 	long writeback_stat;
 	unsigned issued = ceph_caps_issued(ci);
 
-	osd_data = osd_req_op_extent_osd_data(req, 0);
+	osd_data = osd_req_op_extent_osd_request_data(req, 0);
 	BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_PAGES);
 	num_pages = calc_pages_for((u64)osd_data->alignment,
 					(u64)osd_data->length);
diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h
index 2152f06..e737173 100644
--- a/include/linux/ceph/osd_client.h
+++ b/include/linux/ceph/osd_client.h
@@ -90,7 +90,8 @@ struct ceph_osd_req_op {
 			u64 offset, length;
 			u64 truncate_size;
 			u32 truncate_seq;
-			struct ceph_osd_data osd_data;
+			struct ceph_osd_data request_data;
+			struct ceph_osd_data response_data;
 		} extent;
 		struct {
 			u32 name_len;
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index fd0a52e..3bf0849 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -153,12 +153,20 @@ osd_req_op_raw_data_in(struct ceph_osd_request *osd_req, unsigned int which)
 }
 
 struct ceph_osd_data *
-osd_req_op_extent_osd_data(struct ceph_osd_request *osd_req,
-			unsigned int which)
+osd_req_op_extent_osd_request_data(struct ceph_osd_request *osd_req,
+				   unsigned int which)
 {
-	return osd_req_op_data(osd_req, which, extent, osd_data);
+	return osd_req_op_data(osd_req, which, extent, request_data);
 }
-EXPORT_SYMBOL(osd_req_op_extent_osd_data);
+EXPORT_SYMBOL(osd_req_op_extent_osd_request_data);
+
+struct ceph_osd_data *
+osd_req_op_extent_osd_response_data(struct ceph_osd_request *osd_req,
+				    unsigned int which)
+{
+	return osd_req_op_data(osd_req, which, extent, response_data);
+}
+EXPORT_SYMBOL(osd_req_op_extent_osd_response_data);
 
 struct ceph_osd_data *
 osd_req_op_cls_response_data(struct ceph_osd_request *osd_req,
@@ -186,21 +194,46 @@ void osd_req_op_extent_osd_data_pages(struct ceph_osd_request *osd_req,
 			u64 length, u32 alignment,
 			bool pages_from_pool, bool own_pages)
 {
-	struct ceph_osd_data *osd_data;
+	struct ceph_osd_req_op *op = &osd_req->r_ops[which];
 
-	osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
-	ceph_osd_data_pages_init(osd_data, pages, length, alignment,
-				pages_from_pool, own_pages);
+	switch (op->op) {
+	case CEPH_OSD_OP_READ:
+	case CEPH_OSD_OP_ZERO:
+	case CEPH_OSD_OP_TRUNCATE:
+		ceph_osd_data_pages_init(&op->extent.response_data, pages,
+					 length, alignment, pages_from_pool,
+					 own_pages);
+		break;
+	case CEPH_OSD_OP_WRITE:
+		ceph_osd_data_pages_init(&op->extent.request_data, pages,
+					  length, alignment, pages_from_pool,
+					  own_pages);
+		break;
+	default:
+		BUG();
+	}
 }
 EXPORT_SYMBOL(osd_req_op_extent_osd_data_pages);
 
 void osd_req_op_extent_osd_data_pagelist(struct ceph_osd_request *osd_req,
 			unsigned int which, struct ceph_pagelist *pagelist)
 {
-	struct ceph_osd_data *osd_data;
+	struct ceph_osd_req_op *op = &osd_req->r_ops[which];
 
-	osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
-	ceph_osd_data_pagelist_init(osd_data, pagelist);
+	switch (op->op) {
+	case CEPH_OSD_OP_READ:
+	case CEPH_OSD_OP_ZERO:
+	case CEPH_OSD_OP_TRUNCATE:
+		ceph_osd_data_pagelist_init(&op->extent.response_data,
+					    pagelist);
+		break;
+	case CEPH_OSD_OP_WRITE:
+		ceph_osd_data_pagelist_init(&op->extent.request_data,
+					    pagelist);
+		break;
+	default:
+		BUG();
+	}
 }
 EXPORT_SYMBOL(osd_req_op_extent_osd_data_pagelist);
 
@@ -208,10 +241,22 @@ EXPORT_SYMBOL(osd_req_op_extent_osd_data_pagelist);
 void osd_req_op_extent_osd_data_bio(struct ceph_osd_request *osd_req,
 			unsigned int which, struct bio *bio, size_t bio_length)
 {
-	struct ceph_osd_data *osd_data;
+	struct ceph_osd_req_op *op = &osd_req->r_ops[which];
 
-	osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
-	ceph_osd_data_bio_init(osd_data, bio, bio_length);
+	switch (op->op) {
+	case CEPH_OSD_OP_READ:
+	case CEPH_OSD_OP_ZERO:
+	case CEPH_OSD_OP_TRUNCATE:
+		ceph_osd_data_bio_init(&op->extent.response_data, bio,
+				       bio_length);
+		break;
+	case CEPH_OSD_OP_WRITE:
+		ceph_osd_data_bio_init(&op->extent.request_data, bio,
+				       bio_length);
+		break;
+	default:
+		BUG();
+	}
 }
 EXPORT_SYMBOL(osd_req_op_extent_osd_data_bio);
 #endif /* CONFIG_BLOCK */
@@ -220,10 +265,22 @@ void osd_req_op_extent_osd_data_sg(struct ceph_osd_request *osd_req,
 			unsigned int which, struct scatterlist *sgl,
 			unsigned int init_sg_offset, u64 length)
 {
-	struct ceph_osd_data *osd_data;
+	struct ceph_osd_req_op *op = &osd_req->r_ops[which];
 
-	osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
-	ceph_osd_data_sg_init(osd_data, sgl, init_sg_offset, length);
+	switch (op->op) {
+	case CEPH_OSD_OP_READ:
+	case CEPH_OSD_OP_ZERO:
+	case CEPH_OSD_OP_TRUNCATE:
+		ceph_osd_data_sg_init(&op->extent.response_data,
+				      sgl, init_sg_offset, length);
+		break;
+	case CEPH_OSD_OP_WRITE:
+		ceph_osd_data_sg_init(&op->extent.request_data,
+				      sgl, init_sg_offset, length);
+		break;
+	default:
+		BUG();
+	}
 }
 EXPORT_SYMBOL(osd_req_op_extent_osd_data_sg);
 
@@ -368,8 +425,10 @@ static void osd_req_op_data_release(struct ceph_osd_request *osd_req,
 
 	switch (op->op) {
 	case CEPH_OSD_OP_READ:
+		ceph_osd_data_release(&op->extent.response_data);
+		break;
 	case CEPH_OSD_OP_WRITE:
-		ceph_osd_data_release(&op->extent.osd_data);
+		ceph_osd_data_release(&op->extent.request_data);
 		break;
 	case CEPH_OSD_OP_CALL:
 		ceph_osd_data_release(&op->cls.request_info);
@@ -783,19 +842,21 @@ static u64 osd_req_encode_op(struct ceph_osd_request *req,
 	case CEPH_OSD_OP_WRITE:
 	case CEPH_OSD_OP_ZERO:
 	case CEPH_OSD_OP_TRUNCATE:
-		if (src->op == CEPH_OSD_OP_WRITE)
-			request_data_len = src->extent.length;
 		dst->extent.offset = cpu_to_le64(src->extent.offset);
 		dst->extent.length = cpu_to_le64(src->extent.length);
 		dst->extent.truncate_size =
 			cpu_to_le64(src->extent.truncate_size);
 		dst->extent.truncate_seq =
 			cpu_to_le32(src->extent.truncate_seq);
-		osd_data = &src->extent.osd_data;
-		if (src->op == CEPH_OSD_OP_WRITE)
+		if (src->op == CEPH_OSD_OP_WRITE) {
+			osd_data = &src->extent.request_data;
 			ceph_osdc_msg_data_add(req->r_request, osd_data);
-		else
+
+			request_data_len = src->extent.length;
+		} else {
+			osd_data = &src->extent.response_data;
 			ceph_osdc_msg_data_add(req->r_reply, osd_data);
+		}
 		break;
 	case CEPH_OSD_OP_CALL:
 		dst->cls.class_len = src->cls.class_len;
@@ -3326,7 +3387,7 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
 		 * XXX page data.  Probably OK for reads, but this
 		 * XXX ought to be done more generally.
 		 */
-		osd_data = osd_req_op_extent_osd_data(req, 0);
+		osd_data = osd_req_op_extent_osd_response_data(req, 0);
 		if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES) {
 			if (osd_data->pages &&
 				unlikely(osd_data->length < data_len)) {
-- 
1.8.3.1

^ permalink raw reply related	[flat|nested] 33+ messages in thread

* [PATCH 05/18] libceph: add support for CMPEXT compare extent requests
  2015-07-29  9:23 [PATCH 01/18] libceph: add scatterlist messenger data type mchristi
                   ` (2 preceding siblings ...)
  2015-07-29  9:23 ` [PATCH 04/18] libceph: support bidirectional requests mchristi
@ 2015-07-29  9:23 ` mchristi
  2015-07-29  9:23 ` [PATCH 06/18] rbd: add write test helper mchristi
                   ` (15 subsequent siblings)
  19 siblings, 0 replies; 33+ messages in thread
From: mchristi @ 2015-07-29  9:23 UTC (permalink / raw)
  To: ceph-devel, target-devel

From: Mike Christie <michaelc@cs.wisc.edu>

This adds support for the CMPEXT request. The request will compare
extent.length bytes and compare them to extent.length bytes at
extent.offset on disk. If there is a miscompare the osd will return
-EILSEQ, the offset in the buffer where it occurred, and the buffer.

Signed-off-by: Mike Christie <michaelc@cs.wisc.edu>
---
 include/linux/ceph/rados.h |  2 ++
 net/ceph/osd_client.c      | 23 ++++++++++++++++++++---
 2 files changed, 22 insertions(+), 3 deletions(-)

diff --git a/include/linux/ceph/rados.h b/include/linux/ceph/rados.h
index bcaabf3..a07da93 100644
--- a/include/linux/ceph/rados.h
+++ b/include/linux/ceph/rados.h
@@ -212,6 +212,8 @@ extern const char *ceph_osd_state_name(int s);
 	/* sync */							    \
 	f(SYNC_READ,	__CEPH_OSD_OP(RD, DATA, 11),	"sync_read")	    \
 									    \
+	f(CMPEXT,	__CEPH_OSD_OP(RD, DATA, 31),	"cmpext")	    \
+									    \
 	/* write */							    \
 	f(WRITE,	__CEPH_OSD_OP(WR, DATA, 1),	"write")	    \
 	f(WRITEFULL,	__CEPH_OSD_OP(WR, DATA, 2),	"writefull")	    \
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index 3bf0849..8b44b67 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -200,6 +200,7 @@ void osd_req_op_extent_osd_data_pages(struct ceph_osd_request *osd_req,
 	case CEPH_OSD_OP_READ:
 	case CEPH_OSD_OP_ZERO:
 	case CEPH_OSD_OP_TRUNCATE:
+	case CEPH_OSD_OP_CMPEXT:
 		ceph_osd_data_pages_init(&op->extent.response_data, pages,
 					 length, alignment, pages_from_pool,
 					 own_pages);
@@ -275,6 +276,7 @@ void osd_req_op_extent_osd_data_sg(struct ceph_osd_request *osd_req,
 				      sgl, init_sg_offset, length);
 		break;
 	case CEPH_OSD_OP_WRITE:
+	case CEPH_OSD_OP_CMPEXT:
 		ceph_osd_data_sg_init(&op->extent.request_data,
 				      sgl, init_sg_offset, length);
 		break;
@@ -430,6 +432,10 @@ static void osd_req_op_data_release(struct ceph_osd_request *osd_req,
 	case CEPH_OSD_OP_WRITE:
 		ceph_osd_data_release(&op->extent.request_data);
 		break;
+	case CEPH_OSD_OP_CMPEXT:
+		ceph_osd_data_release(&op->extent.response_data);
+		ceph_osd_data_release(&op->extent.request_data);
+		break;
 	case CEPH_OSD_OP_CALL:
 		ceph_osd_data_release(&op->cls.request_info);
 		ceph_osd_data_release(&op->cls.request_data);
@@ -634,13 +640,14 @@ void osd_req_op_extent_init(struct ceph_osd_request *osd_req,
 	size_t payload_len = 0;
 
 	BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE &&
-	       opcode != CEPH_OSD_OP_ZERO && opcode != CEPH_OSD_OP_TRUNCATE);
+	       opcode != CEPH_OSD_OP_ZERO && opcode != CEPH_OSD_OP_TRUNCATE &&
+	       opcode != CEPH_OSD_OP_CMPEXT);
 
 	op->extent.offset = offset;
 	op->extent.length = length;
 	op->extent.truncate_size = truncate_size;
 	op->extent.truncate_seq = truncate_seq;
-	if (opcode == CEPH_OSD_OP_WRITE)
+	if (opcode == CEPH_OSD_OP_WRITE || opcode == CEPH_OSD_OP_CMPEXT)
 		payload_len += length;
 
 	op->payload_len = payload_len;
@@ -842,6 +849,7 @@ static u64 osd_req_encode_op(struct ceph_osd_request *req,
 	case CEPH_OSD_OP_WRITE:
 	case CEPH_OSD_OP_ZERO:
 	case CEPH_OSD_OP_TRUNCATE:
+	case CEPH_OSD_OP_CMPEXT:
 		dst->extent.offset = cpu_to_le64(src->extent.offset);
 		dst->extent.length = cpu_to_le64(src->extent.length);
 		dst->extent.truncate_size =
@@ -853,6 +861,14 @@ static u64 osd_req_encode_op(struct ceph_osd_request *req,
 			ceph_osdc_msg_data_add(req->r_request, osd_data);
 
 			request_data_len = src->extent.length;
+		} else if (src->op == CEPH_OSD_OP_CMPEXT) {
+			osd_data = &src->extent.request_data;
+			ceph_osdc_msg_data_add(req->r_request, osd_data);
+
+			osd_data = &src->extent.response_data;
+			ceph_osdc_msg_data_add(req->r_reply, osd_data);
+
+			request_data_len = src->extent.length;
 		} else {
 			osd_data = &src->extent.response_data;
 			ceph_osdc_msg_data_add(req->r_reply, osd_data);
@@ -1092,7 +1108,8 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
 
 	BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE &&
 	       opcode != CEPH_OSD_OP_ZERO && opcode != CEPH_OSD_OP_TRUNCATE &&
-	       opcode != CEPH_OSD_OP_CREATE && opcode != CEPH_OSD_OP_DELETE);
+	       opcode != CEPH_OSD_OP_CREATE && opcode != CEPH_OSD_OP_DELETE &&
+	       opcode != CEPH_OSD_OP_CMPEXT);
 
 	req = ceph_osdc_alloc_request(osdc, snapc, num_ops, use_mempool,
 					GFP_NOFS);
-- 
1.8.3.1

^ permalink raw reply related	[flat|nested] 33+ messages in thread

* [PATCH 06/18] rbd: add write test helper
  2015-07-29  9:23 [PATCH 01/18] libceph: add scatterlist messenger data type mchristi
                   ` (3 preceding siblings ...)
  2015-07-29  9:23 ` [PATCH 05/18] libceph: add support for CMPEXT compare extent requests mchristi
@ 2015-07-29  9:23 ` mchristi
  2015-07-29  9:23 ` [PATCH 07/18] rbd: add num ops calculator helper mchristi
                   ` (14 subsequent siblings)
  19 siblings, 0 replies; 33+ messages in thread
From: mchristi @ 2015-07-29  9:23 UTC (permalink / raw)
  To: ceph-devel, target-devel

From: Mike Christie <michaelc@cs.wisc.edu>

The next patches add a couple new commands that have write data.
This patch adds a helper to combine all the IMG_REQ write tests.

Signed-off-by: Mike Christie <michaelc@cs.wisc.edu>
---
 drivers/block/rbd.c | 15 +++++++++------
 1 file changed, 9 insertions(+), 6 deletions(-)

diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index 1df7bdd..8d0b30a 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -1739,6 +1739,12 @@ static bool img_request_layered_test(struct rbd_img_request *img_request)
 	return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
 }
 
+static bool img_request_is_write_type_test(struct rbd_img_request *img_request)
+{
+	return img_request_write_test(img_request) ||
+	       img_request_discard_test(img_request);
+}
+
 static enum obj_operation_type
 rbd_img_request_op_type(struct rbd_img_request *img_request)
 {
@@ -2024,8 +2030,7 @@ rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
 	rbd_assert(obj_request_img_data_test(obj_request));
 	img_request = obj_request->img_request;
 	rbd_assert(img_request);
-	rbd_assert(img_request_write_test(img_request) ||
-			img_request_discard_test(img_request));
+	rbd_assert(img_request_is_write_type_test(img_request));
 
 	if (img_request_discard_test(img_request))
 		num_osd_ops = 2;
@@ -2259,8 +2264,7 @@ static void rbd_img_request_destroy(struct kref *kref)
 		rbd_dev_parent_put(img_request->rbd_dev);
 	}
 
-	if (img_request_write_test(img_request) ||
-		img_request_discard_test(img_request))
+	if (img_request_is_write_type_test(img_request))
 		ceph_put_snap_context(img_request->snapc);
 
 	kmem_cache_free(rbd_img_request_cache, img_request);
@@ -2977,8 +2981,7 @@ static bool img_obj_request_simple(struct rbd_obj_request *obj_request)
 	rbd_dev = img_request->rbd_dev;
 
 	/* Reads */
-	if (!img_request_write_test(img_request) &&
-	    !img_request_discard_test(img_request))
+	if (!img_request_is_write_type_test(img_request))
 		return true;
 
 	/* Non-layered writes */
-- 
1.8.3.1


^ permalink raw reply related	[flat|nested] 33+ messages in thread

* [PATCH 07/18] rbd: add num ops calculator helper
  2015-07-29  9:23 [PATCH 01/18] libceph: add scatterlist messenger data type mchristi
                   ` (4 preceding siblings ...)
  2015-07-29  9:23 ` [PATCH 06/18] rbd: add write test helper mchristi
@ 2015-07-29  9:23 ` mchristi
  2015-07-29  9:23 ` [PATCH 08/18] rbd: add support for COMPARE_AND_WRITE/CMPEXT mchristi
                   ` (13 subsequent siblings)
  19 siblings, 0 replies; 33+ messages in thread
From: mchristi @ 2015-07-29  9:23 UTC (permalink / raw)
  To: ceph-devel, target-devel

From: Mike Christie <michaelc@cs.wisc.edu>

The next patches add new commands that have different
number of ops, so this adds a helper.

Signed-off-by: Mike Christie <michaelc@cs.wisc.edu>
---
 drivers/block/rbd.c | 15 ++++++++++++---
 1 file changed, 12 insertions(+), 3 deletions(-)

diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index 8d0b30a..425c3d8 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -813,6 +813,16 @@ static int parse_rbd_opts_token(char *c, void *private)
 	return 0;
 }
 
+static int obj_num_ops(enum obj_operation_type op_type)
+{
+	switch (op_type) {
+	case OBJ_OP_WRITE:
+		return 2;
+	default:
+		return 1;
+	}
+}
+
 static char* obj_op_name(enum obj_operation_type op_type)
 {
 	switch (op_type) {
@@ -1987,7 +1997,7 @@ static struct ceph_osd_request *rbd_osd_req_create(
 		snapc = img_request->snapc;
 	}
 
-	rbd_assert(num_ops == 1 || ((op_type == OBJ_OP_WRITE) && num_ops == 2));
+	rbd_assert(num_ops == 1 || obj_num_ops(op_type) == num_ops);
 
 	/* Allocate and initialize the request, for the num_ops ops */
 
@@ -2589,8 +2599,7 @@ static int rbd_img_request_fill(struct rbd_img_request *img_request,
 		}
 
 		osd_req = rbd_osd_req_create(rbd_dev, op_type,
-					(op_type == OBJ_OP_WRITE) ? 2 : 1,
-					obj_request);
+					     obj_num_ops(op_type), obj_request);
 		if (!osd_req)
 			goto out_unwind;
 
-- 
1.8.3.1


^ permalink raw reply related	[flat|nested] 33+ messages in thread

* [PATCH 08/18] rbd: add support for COMPARE_AND_WRITE/CMPEXT
  2015-07-29  9:23 [PATCH 01/18] libceph: add scatterlist messenger data type mchristi
                   ` (5 preceding siblings ...)
  2015-07-29  9:23 ` [PATCH 07/18] rbd: add num ops calculator helper mchristi
@ 2015-07-29  9:23 ` mchristi
  2015-07-29  9:23 ` [PATCH 09/18] libceph: add support for write same requests mchristi
                   ` (12 subsequent siblings)
  19 siblings, 0 replies; 33+ messages in thread
From: mchristi @ 2015-07-29  9:23 UTC (permalink / raw)
  To: ceph-devel, target-devel

From: Mike Christie <michaelc@cs.wisc.edu>

This patch adds support to rbd for SCSI COMPARE_AND_WRITE commands. Higher
levels like LIO will work with IMG_REQ_CMP_AND_WRITE requests, but
rbd breaks it up into CMPEXT and WRITE Ceph requests.

Signed-off-by: Mike Christie <michaelc@cs.wisc.edu>
---
 drivers/block/rbd.c | 182 ++++++++++++++++++++++++++++++++++++++++++++++------
 1 file changed, 162 insertions(+), 20 deletions(-)

diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index 425c3d8..b6d7f33 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -216,6 +216,7 @@ enum obj_operation_type {
 	OBJ_OP_WRITE,
 	OBJ_OP_READ,
 	OBJ_OP_DISCARD,
+	OBJ_OP_CMP_AND_WRITE,
 };
 
 enum obj_req_flags {
@@ -289,6 +290,7 @@ enum img_req_flags {
 	IMG_REQ_CHILD,		/* initiator: block = 0, child image = 1 */
 	IMG_REQ_LAYERED,	/* ENOENT handling: normal = 0, layered = 1 */
 	IMG_REQ_DISCARD,	/* discard: normal = 0, discard request = 1 */
+	IMG_REQ_CMP_AND_WRITE,	/* normal = 0, compare and write request = 1 */
 };
 
 struct rbd_img_request {
@@ -296,10 +298,9 @@ struct rbd_img_request {
 	u64			offset;	/* starting image byte offset */
 	u64			length;	/* byte count from offset */
 	unsigned long		flags;
-	union {
-		u64			snap_id;	/* for reads */
-		struct ceph_snap_context *snapc;	/* for writes */
-	};
+
+	u64			snap_id;	/* for reads */
+	struct ceph_snap_context *snapc;	/* for writes */
 
 	struct request		*rq;		/* block request */
 	struct rbd_obj_request	*obj_request;	/* obj req initiator */
@@ -818,6 +819,8 @@ static int obj_num_ops(enum obj_operation_type op_type)
 	switch (op_type) {
 	case OBJ_OP_WRITE:
 		return 2;
+	case OBJ_OP_CMP_AND_WRITE:
+		return 3;
 	default:
 		return 1;
 	}
@@ -832,6 +835,8 @@ static char* obj_op_name(enum obj_operation_type op_type)
 		return "write";
 	case OBJ_OP_DISCARD:
 		return "discard";
+	case OBJ_OP_CMP_AND_WRITE:
+		return "compare-and-write";
 	default:
 		return "???";
 	}
@@ -1749,10 +1754,23 @@ static bool img_request_layered_test(struct rbd_img_request *img_request)
 	return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
 }
 
+static void img_request_cmp_and_write_set(struct rbd_img_request *img_request)
+{
+	set_bit(IMG_REQ_CMP_AND_WRITE, &img_request->flags);
+	smp_mb();
+}
+
+static bool img_request_cmp_and_write_test(struct rbd_img_request *img_request)
+{
+	smp_mb();
+	return test_bit(IMG_REQ_CMP_AND_WRITE, &img_request->flags) != 0;
+}
+
 static bool img_request_is_write_type_test(struct rbd_img_request *img_request)
 {
 	return img_request_write_test(img_request) ||
-	       img_request_discard_test(img_request);
+	       img_request_discard_test(img_request) ||
+	       img_request_cmp_and_write_test(img_request);
 }
 
 static enum obj_operation_type
@@ -1762,6 +1780,8 @@ rbd_img_request_op_type(struct rbd_img_request *img_request)
 		return OBJ_OP_WRITE;
 	else if (img_request_discard_test(img_request))
 		return OBJ_OP_DISCARD;
+	else if (img_request_cmp_and_write_test(img_request))
+		return OBJ_OP_CMP_AND_WRITE;
 	else
 		return OBJ_OP_READ;
 }
@@ -1856,6 +1876,23 @@ static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
 	obj_request_done_set(obj_request);
 }
 
+static void rbd_osd_cmpext_callback(struct rbd_obj_request *obj_request,
+				    struct ceph_osd_request *osd_req)
+{
+	dout("%s: obj %p result %d %llu\n", __func__, obj_request,
+		obj_request->result, obj_request->length);
+
+	if (obj_request->result == -EILSEQ)
+		/*
+		 * on mismatch reply buf will contain offset and mismatched
+		 * data
+		 */
+		obj_request->xferred = osd_req->r_reply_op_len[1];
+	else
+		obj_request->xferred = obj_request->length;
+	obj_request_done_set(obj_request);
+}
+
 static void rbd_osd_discard_callback(struct rbd_obj_request *obj_request)
 {
 	dout("%s: obj %p result %d %llu\n", __func__, obj_request,
@@ -1915,11 +1952,19 @@ static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
 		rbd_osd_read_callback(obj_request);
 		break;
 	case CEPH_OSD_OP_SETALLOCHINT:
-		rbd_assert(osd_req->r_ops[1].op == CEPH_OSD_OP_WRITE);
-		/* fall through */
+		if (osd_req->r_ops[1].op == CEPH_OSD_OP_WRITE)
+			rbd_osd_write_callback(obj_request);
+		else if (osd_req->r_ops[1].op == CEPH_OSD_OP_CMPEXT)
+			rbd_osd_cmpext_callback(obj_request, osd_req);
+		else
+			rbd_assert(0);
+		break;
 	case CEPH_OSD_OP_WRITE:
 		rbd_osd_write_callback(obj_request);
 		break;
+	case CEPH_OSD_OP_CMPEXT:
+		rbd_osd_cmpext_callback(obj_request, osd_req);
+		break;
 	case CEPH_OSD_OP_STAT:
 		rbd_osd_stat_callback(obj_request);
 		break;
@@ -1943,6 +1988,22 @@ static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
 		rbd_obj_request_complete(obj_request);
 }
 
+static void rbd_osd_req_format_rw(struct rbd_obj_request *obj_request)
+{
+	struct rbd_img_request *img_request = obj_request->img_request;
+	struct ceph_osd_request *osd_req = obj_request->osd_req;
+	struct ceph_snap_context *snapc;
+	struct timespec mtime = CURRENT_TIME;
+	u64 snap_id;
+
+	rbd_assert(osd_req != NULL);
+
+	snapc = img_request ? img_request->snapc : NULL;
+	snap_id = img_request ? img_request->snap_id : CEPH_NOSNAP;
+	ceph_osdc_build_request(osd_req, obj_request->offset,
+				snapc, snap_id, &mtime);
+}
+
 static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
 {
 	struct rbd_img_request *img_request = obj_request->img_request;
@@ -1975,6 +2036,7 @@ static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
  * A write request has either one (watch) or two (hint+write) osd ops.
  * (All rbd data writes are prefixed with an allocation hint op, but
  * technically osd watch is a write request, hence this distinction.)
+ * A extent cmp has three (cmp+write+hint).
  */
 static struct ceph_osd_request *rbd_osd_req_create(
 					struct rbd_device *rbd_dev,
@@ -1987,12 +2049,15 @@ static struct ceph_osd_request *rbd_osd_req_create(
 	struct ceph_osd_request *osd_req;
 
 	if (obj_request_img_data_test(obj_request) &&
-		(op_type == OBJ_OP_DISCARD || op_type == OBJ_OP_WRITE)) {
+		(op_type == OBJ_OP_DISCARD || op_type == OBJ_OP_WRITE ||
+		 op_type == OBJ_OP_CMP_AND_WRITE)) {
 		struct rbd_img_request *img_request = obj_request->img_request;
 		if (op_type == OBJ_OP_WRITE) {
 			rbd_assert(img_request_write_test(img_request));
-		} else {
+		} else if (op_type == OBJ_OP_DISCARD) {
 			rbd_assert(img_request_discard_test(img_request));
+		} else if (op_type == OBJ_OP_CMP_AND_WRITE) {
+			rbd_assert(img_request_cmp_and_write_test(img_request));
 		}
 		snapc = img_request->snapc;
 	}
@@ -2007,7 +2072,8 @@ static struct ceph_osd_request *rbd_osd_req_create(
 	if (!osd_req)
 		return NULL;	/* ENOMEM */
 
-	if (op_type == OBJ_OP_WRITE || op_type == OBJ_OP_DISCARD)
+	if (op_type == OBJ_OP_WRITE || op_type == OBJ_OP_DISCARD ||
+	    op_type == OBJ_OP_CMP_AND_WRITE)
 		osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
 	else
 		osd_req->r_flags = CEPH_OSD_FLAG_READ;
@@ -2236,6 +2302,10 @@ static struct rbd_img_request *rbd_img_request_create(
 	} else if (op_type == OBJ_OP_WRITE) {
 		img_request_write_set(img_request);
 		img_request->snapc = snapc;
+	} else if (op_type == OBJ_OP_CMP_AND_WRITE) {
+		img_request_cmp_and_write_set(img_request);
+		img_request->snapc = snapc;
+		img_request->snap_id = rbd_dev->spec->snap_id;
 	} else {
 		img_request->snap_id = rbd_dev->spec->snap_id;
 	}
@@ -2332,18 +2402,11 @@ static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
 	result = obj_request->result;
 	if (result) {
 		struct rbd_device *rbd_dev = img_request->rbd_dev;
-		enum obj_operation_type op_type;
-
-		if (img_request_discard_test(img_request))
-			op_type = OBJ_OP_DISCARD;
-		else if (img_request_write_test(img_request))
-			op_type = OBJ_OP_WRITE;
-		else
-			op_type = OBJ_OP_READ;
 
 		rbd_warn(rbd_dev, "%s %llx at %llx (%llx)",
-			obj_op_name(op_type), obj_request->length,
-			obj_request->img_offset, obj_request->offset);
+			obj_op_name(rbd_img_request_op_type(img_request)),
+			obj_request->length, obj_request->img_offset,
+			obj_request->offset);
 		rbd_warn(rbd_dev, "  result %d xferred %x",
 			result, xferred);
 		if (!img_request->result)
@@ -2624,6 +2687,85 @@ out_unwind:
 	return -ENOMEM;
 }
 
+int rbd_img_cmp_and_write_request_fill(struct rbd_img_request *img_request,
+				       struct scatterlist *cmp_sgl,
+				       u64 cmp_length,
+				       struct scatterlist *write_sgl,
+				       u64 write_length,
+				       struct page **response_pages,
+				       u64 response_length)
+{
+	struct rbd_device *rbd_dev = img_request->rbd_dev;
+	u64 object_size = rbd_obj_bytes(&rbd_dev->header);
+	struct rbd_obj_request *obj_request;
+	struct ceph_osd_request *osd_req;
+	const char *object_name;
+	int num_ops = 0;
+	u64 img_offset;
+	u64 offset;
+
+	img_offset = img_request->offset;
+	offset = rbd_segment_offset(rbd_dev, img_offset);
+
+	/*
+	 * LIO currently only supports 1 sector reqs and we assume the req
+	 * will not span segments.
+	 */
+	if (rbd_segment_length(rbd_dev, offset, cmp_length) != cmp_length)
+		return -EOPNOTSUPP;
+
+	object_name = rbd_segment_name(rbd_dev, img_offset);
+	if (!object_name)
+		return -EINVAL;
+
+	obj_request = rbd_obj_request_create(object_name, offset,
+					     cmp_length, OBJ_REQUEST_SG);
+	/* object request has its own copy of the object name */
+	rbd_segment_name_free(object_name);
+	if (!obj_request)
+		return -ENOMEM;
+
+	rbd_img_obj_request_add(img_request, obj_request);
+
+	obj_request->pages = response_pages;
+	obj_request->page_count = calc_pages_for(0, response_length);
+
+	osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_CMP_AND_WRITE, 3,
+				     obj_request);
+	if (!osd_req)
+		goto del_obj_req;
+
+	obj_request->osd_req = osd_req;
+	obj_request->callback = rbd_img_obj_callback;
+	obj_request->img_offset = img_offset;
+
+	osd_req_op_alloc_hint_init(osd_req, num_ops, object_size, object_size);
+
+	num_ops++;
+	osd_req_op_extent_init(osd_req, num_ops, CEPH_OSD_OP_CMPEXT, offset,
+			       cmp_length, 0, 0);
+	osd_req_op_extent_osd_data_sg(osd_req, num_ops, cmp_sgl, 0, cmp_length);
+	osd_req_op_extent_osd_data_pages(osd_req, num_ops, obj_request->pages,
+					 response_length, 0,
+					 obj_request->page_count, false);
+
+	num_ops++;
+	osd_req_op_extent_init(osd_req, num_ops, CEPH_OSD_OP_WRITE, offset,
+			       write_length, 0, 0);
+	osd_req_op_extent_osd_data_sg(osd_req, num_ops, write_sgl, 0,
+				      write_length);
+
+	rbd_osd_req_format_rw(obj_request);
+
+	rbd_img_request_get(img_request);
+	return 0;
+
+del_obj_req:
+	rbd_img_obj_request_del(img_request, obj_request);
+	return -ENOMEM;
+}
+EXPORT_SYMBOL(rbd_img_cmp_and_write_request_fill);
+
 static void
 rbd_img_obj_copyup_callback(struct rbd_obj_request *obj_request)
 {
-- 
1.8.3.1


^ permalink raw reply related	[flat|nested] 33+ messages in thread

* [PATCH 09/18] libceph: add support for write same requests
  2015-07-29  9:23 [PATCH 01/18] libceph: add scatterlist messenger data type mchristi
                   ` (6 preceding siblings ...)
  2015-07-29  9:23 ` [PATCH 08/18] rbd: add support for COMPARE_AND_WRITE/CMPEXT mchristi
@ 2015-07-29  9:23 ` mchristi
  2015-07-29  9:23 ` [PATCH 10/18] rbd: add support for writesame requests mchristi
                   ` (11 subsequent siblings)
  19 siblings, 0 replies; 33+ messages in thread
From: mchristi @ 2015-07-29  9:23 UTC (permalink / raw)
  To: ceph-devel, target-devel

From: Mike Christie <michaelc@cs.wisc.edu>

This adds a new ceph request writesame. Write a buffer of length
writesame.data_length bytes at writesame.offset over writesame.length
bytes.

This command maps to SCSI's WRITE SAME request. In the next patches
rbd and lio will hook in to this support.

Signed-off-by: Mike Christie <michaelc@cs.wisc.edu>
---
 include/linux/ceph/osd_client.h | 15 +++++++++++++++
 include/linux/ceph/rados.h      |  6 ++++++
 net/ceph/osd_client.c           | 40 ++++++++++++++++++++++++++++++++++++++++
 3 files changed, 61 insertions(+)

diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h
index e737173..55c1442 100644
--- a/include/linux/ceph/osd_client.h
+++ b/include/linux/ceph/osd_client.h
@@ -128,6 +128,12 @@ struct ceph_osd_req_op {
 			u64 expected_object_size;
 			u64 expected_write_size;
 		} alloc_hint;
+		struct {
+			u64 offset;
+			u64 length;
+			u64 data_length;
+			struct ceph_osd_data request_data;
+		} writesame;
 	};
 };
 
@@ -293,6 +299,10 @@ extern void osd_req_op_raw_data_in_pages(struct ceph_osd_request *,
 					u32 alignment, bool pages_from_pool,
 					bool own_pages);
 
+extern void osd_req_op_writesame_init(struct ceph_osd_request *osd_req,
+					unsigned int which, u16 opcode,
+					u64 offset, u64 length,
+					u64 data_length);
 extern void osd_req_op_extent_init(struct ceph_osd_request *osd_req,
 					unsigned int which, u16 opcode,
 					u64 offset, u64 length,
@@ -325,6 +335,11 @@ extern void osd_req_op_extent_osd_data_sg(struct ceph_osd_request *,
 					struct scatterlist *sgl,
 					unsigned int init_sg_offset,
 					u64 length);
+extern void osd_req_op_writesame_osd_data_sg(struct ceph_osd_request *,
+					unsigned int which,
+					struct scatterlist *sgl,
+					unsigned int init_sg_offset,
+					u64 length);
 extern void osd_req_op_cls_request_data_pagelist(struct ceph_osd_request *,
 					unsigned int which,
 					struct ceph_pagelist *pagelist);
diff --git a/include/linux/ceph/rados.h b/include/linux/ceph/rados.h
index a07da93..6f79f29 100644
--- a/include/linux/ceph/rados.h
+++ b/include/linux/ceph/rados.h
@@ -220,6 +220,7 @@ extern const char *ceph_osd_state_name(int s);
 	f(TRUNCATE,	__CEPH_OSD_OP(WR, DATA, 3),	"truncate")	    \
 	f(ZERO,		__CEPH_OSD_OP(WR, DATA, 4),	"zero")		    \
 	f(DELETE,	__CEPH_OSD_OP(WR, DATA, 5),	"delete")	    \
+	f(WRITESAME,	__CEPH_OSD_OP(WR, DATA, 36),	"write-same")	    \
 									    \
 	/* fancy write */						    \
 	f(APPEND,	__CEPH_OSD_OP(WR, DATA, 6),	"append")	    \
@@ -477,6 +478,11 @@ struct ceph_osd_op {
 			__le64 expected_object_size;
 			__le64 expected_write_size;
 		} __attribute__ ((packed)) alloc_hint;
+		struct {
+			__le64 offset;
+			__le64 length;
+			__le64 data_length;
+		} __attribute__ ((packed)) writesame;
 	};
 	__le32 payload_len;
 } __attribute__ ((packed));
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index 8b44b67..10d27d7 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -384,6 +384,17 @@ void osd_req_op_list_watchers_response_data_pages(
 }
 EXPORT_SYMBOL(osd_req_op_list_watchers_response_data_pages);
 
+void osd_req_op_writesame_osd_data_sg(struct ceph_osd_request *osd_req,
+			unsigned int which, struct scatterlist *sgl,
+			unsigned int init_sg_offset, u64 length)
+{
+	struct ceph_osd_data *osd_data;
+
+	osd_data = osd_req_op_data(osd_req, which, writesame, request_data);
+	ceph_osd_data_sg_init(osd_data, sgl, init_sg_offset, length);
+}
+EXPORT_SYMBOL(osd_req_op_writesame_osd_data_sg);
+
 static u64 ceph_osd_data_length(struct ceph_osd_data *osd_data)
 {
 	switch (osd_data->type) {
@@ -432,6 +443,9 @@ static void osd_req_op_data_release(struct ceph_osd_request *osd_req,
 	case CEPH_OSD_OP_WRITE:
 		ceph_osd_data_release(&op->extent.request_data);
 		break;
+	case CEPH_OSD_OP_WRITESAME:
+		ceph_osd_data_release(&op->writesame.request_data);
+		break;
 	case CEPH_OSD_OP_CMPEXT:
 		ceph_osd_data_release(&op->extent.response_data);
 		ceph_osd_data_release(&op->extent.request_data);
@@ -630,6 +644,22 @@ void osd_req_op_init(struct ceph_osd_request *osd_req,
 }
 EXPORT_SYMBOL(osd_req_op_init);
 
+void osd_req_op_writesame_init(struct ceph_osd_request *osd_req,
+			       unsigned int which, u16 opcode,
+			       u64 offset, u64 length, u64 data_length)
+{
+	struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, opcode,
+						      0);
+
+	BUG_ON(opcode != CEPH_OSD_OP_WRITESAME);
+
+	op->writesame.offset = offset;
+	op->writesame.length = length;
+	op->writesame.data_length = data_length;
+	op->payload_len = data_length;
+}
+EXPORT_SYMBOL(osd_req_op_writesame_init);
+
 void osd_req_op_extent_init(struct ceph_osd_request *osd_req,
 				unsigned int which, u16 opcode,
 				u64 offset, u64 length,
@@ -946,6 +976,16 @@ static u64 osd_req_encode_op(struct ceph_osd_request *req,
 	case CEPH_OSD_OP_CREATE:
 	case CEPH_OSD_OP_DELETE:
 		break;
+	case CEPH_OSD_OP_WRITESAME:
+		osd_data = &src->writesame.request_data;
+		ceph_osdc_msg_data_add(req->r_request, osd_data);
+
+		dst->writesame.offset = cpu_to_le64(src->writesame.offset);
+		dst->writesame.length = cpu_to_le64(src->writesame.length);
+		dst->writesame.data_length =
+				cpu_to_le64(src->writesame.data_length);
+		request_data_len = src->writesame.data_length;;
+		break;
 	default:
 		pr_err("unsupported osd opcode %s\n",
 			ceph_osd_op_name(src->op));
-- 
1.8.3.1

^ permalink raw reply related	[flat|nested] 33+ messages in thread

* [PATCH 10/18] rbd: add support for writesame requests
  2015-07-29  9:23 [PATCH 01/18] libceph: add scatterlist messenger data type mchristi
                   ` (7 preceding siblings ...)
  2015-07-29  9:23 ` [PATCH 09/18] libceph: add support for write same requests mchristi
@ 2015-07-29  9:23 ` mchristi
  2015-07-29  9:23 ` [PATCH 11/18] target: add compare and write callback mchristi
                   ` (10 subsequent siblings)
  19 siblings, 0 replies; 33+ messages in thread
From: mchristi @ 2015-07-29  9:23 UTC (permalink / raw)
  To: ceph-devel, target-devel

From: Mike Christie <michaelc@cs.wisc.edu>

This adds support for ceph writesame requests.

Signed-off-by: Mike Christie <michaelc@cs.wisc.edu>
---
 drivers/block/rbd.c | 59 +++++++++++++++++++++++++++++++++++++++++++++--------
 1 file changed, 51 insertions(+), 8 deletions(-)

diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index b6d7f33..931d2d7 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -217,6 +217,7 @@ enum obj_operation_type {
 	OBJ_OP_READ,
 	OBJ_OP_DISCARD,
 	OBJ_OP_CMP_AND_WRITE,
+	OBJ_OP_WRITESAME,
 };
 
 enum obj_req_flags {
@@ -291,6 +292,7 @@ enum img_req_flags {
 	IMG_REQ_LAYERED,	/* ENOENT handling: normal = 0, layered = 1 */
 	IMG_REQ_DISCARD,	/* discard: normal = 0, discard request = 1 */
 	IMG_REQ_CMP_AND_WRITE,	/* normal = 0, compare and write request = 1 */
+	IMG_REQ_WRITESAME,	/* normal = 0, write same = 1 */
 };
 
 struct rbd_img_request {
@@ -818,6 +820,7 @@ static int obj_num_ops(enum obj_operation_type op_type)
 {
 	switch (op_type) {
 	case OBJ_OP_WRITE:
+	case OBJ_OP_WRITESAME:
 		return 2;
 	case OBJ_OP_CMP_AND_WRITE:
 		return 3;
@@ -1703,6 +1706,17 @@ static bool img_request_write_test(struct rbd_img_request *img_request)
 	return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
 }
 
+static void img_request_writesame_set(struct rbd_img_request *img_request)
+{
+	set_bit(IMG_REQ_WRITESAME, &img_request->flags);
+	smp_mb();
+}
+
+static bool img_request_writesame_test(struct rbd_img_request *img_request)
+{
+	smp_mb();
+	return test_bit(IMG_REQ_WRITESAME, &img_request->flags) != 0;
+}
 /*
  * Set the discard flag when the img_request is an discard request
  */
@@ -1769,6 +1783,7 @@ static bool img_request_cmp_and_write_test(struct rbd_img_request *img_request)
 static bool img_request_is_write_type_test(struct rbd_img_request *img_request)
 {
 	return img_request_write_test(img_request) ||
+	       img_request_writesame_test(img_request) ||
 	       img_request_discard_test(img_request) ||
 	       img_request_cmp_and_write_test(img_request);
 }
@@ -1782,6 +1797,8 @@ rbd_img_request_op_type(struct rbd_img_request *img_request)
 		return OBJ_OP_DISCARD;
 	else if (img_request_cmp_and_write_test(img_request))
 		return OBJ_OP_CMP_AND_WRITE;
+	else if (img_request_writesame_test(img_request))
+		return OBJ_OP_WRITESAME;
 	else
 		return OBJ_OP_READ;
 }
@@ -1952,7 +1969,8 @@ static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
 		rbd_osd_read_callback(obj_request);
 		break;
 	case CEPH_OSD_OP_SETALLOCHINT:
-		if (osd_req->r_ops[1].op == CEPH_OSD_OP_WRITE)
+		if (osd_req->r_ops[1].op == CEPH_OSD_OP_WRITE ||
+		    osd_req->r_ops[1].op == CEPH_OSD_OP_WRITESAME)
 			rbd_osd_write_callback(obj_request);
 		else if (osd_req->r_ops[1].op == CEPH_OSD_OP_CMPEXT)
 			rbd_osd_cmpext_callback(obj_request, osd_req);
@@ -1960,6 +1978,7 @@ static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
 			rbd_assert(0);
 		break;
 	case CEPH_OSD_OP_WRITE:
+	case CEPH_OSD_OP_WRITESAME:
 		rbd_osd_write_callback(obj_request);
 		break;
 	case CEPH_OSD_OP_CMPEXT:
@@ -2050,7 +2069,8 @@ static struct ceph_osd_request *rbd_osd_req_create(
 
 	if (obj_request_img_data_test(obj_request) &&
 		(op_type == OBJ_OP_DISCARD || op_type == OBJ_OP_WRITE ||
-		 op_type == OBJ_OP_CMP_AND_WRITE)) {
+		 op_type == OBJ_OP_CMP_AND_WRITE ||
+		 op_type == OBJ_OP_WRITESAME)) {
 		struct rbd_img_request *img_request = obj_request->img_request;
 		if (op_type == OBJ_OP_WRITE) {
 			rbd_assert(img_request_write_test(img_request));
@@ -2058,6 +2078,8 @@ static struct ceph_osd_request *rbd_osd_req_create(
 			rbd_assert(img_request_discard_test(img_request));
 		} else if (op_type == OBJ_OP_CMP_AND_WRITE) {
 			rbd_assert(img_request_cmp_and_write_test(img_request));
+		} else if (op_type == OBJ_OP_WRITESAME) {
+			rbd_assert(img_request_writesame_test(img_request));
 		}
 		snapc = img_request->snapc;
 	}
@@ -2073,7 +2095,7 @@ static struct ceph_osd_request *rbd_osd_req_create(
 		return NULL;	/* ENOMEM */
 
 	if (op_type == OBJ_OP_WRITE || op_type == OBJ_OP_DISCARD ||
-	    op_type == OBJ_OP_CMP_AND_WRITE)
+	    op_type == OBJ_OP_CMP_AND_WRITE || op_type == OBJ_OP_WRITESAME)
 		osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
 	else
 		osd_req->r_flags = CEPH_OSD_FLAG_READ;
@@ -2302,6 +2324,9 @@ static struct rbd_img_request *rbd_img_request_create(
 	} else if (op_type == OBJ_OP_WRITE) {
 		img_request_write_set(img_request);
 		img_request->snapc = snapc;
+	} else if (op_type == OBJ_OP_WRITESAME) {
+		img_request_writesame_set(img_request);
+		img_request->snapc = snapc;
 	} else if (op_type == OBJ_OP_CMP_AND_WRITE) {
 		img_request_cmp_and_write_set(img_request);
 		img_request->snapc = snapc;
@@ -2522,12 +2547,21 @@ static void rbd_img_obj_request_fill(struct rbd_obj_request *obj_request,
 		osd_req_op_alloc_hint_init(osd_request, num_ops,
 					object_size, object_size);
 		num_ops++;
+	} else if (op_type == OBJ_OP_WRITESAME) {
+		opcode = CEPH_OSD_OP_WRITESAME;
+		osd_req_op_alloc_hint_init(osd_request, num_ops,
+					   object_size, object_size);
+		num_ops++;
 	} else {
 		opcode = CEPH_OSD_OP_READ;
 	}
 
 	if (opcode == CEPH_OSD_OP_DELETE)
 		osd_req_op_init(osd_request, num_ops, opcode, 0);
+	else if (opcode == CEPH_OSD_OP_WRITESAME)
+		osd_req_op_writesame_init(osd_request, num_ops, opcode,
+					  offset, length, min_t(u64, length,
+					  obj_request->sg->length));
 	else
 		osd_req_op_extent_init(osd_request, num_ops, opcode,
 				       offset, length, 0, 0);
@@ -2539,13 +2573,22 @@ static void rbd_img_obj_request_fill(struct rbd_obj_request *obj_request,
 		osd_req_op_extent_osd_data_pages(osd_request, num_ops,
 					obj_request->pages, length,
 					offset & ~PAGE_MASK, false, false);
-	else if (obj_request->type == OBJ_REQUEST_SG)
-		osd_req_op_extent_osd_data_sg(osd_request, num_ops,
-					obj_request->sg,
-					obj_request->init_sg_offset, length);
+	else if (obj_request->type == OBJ_REQUEST_SG) {
+		if (op_type == OBJ_OP_WRITESAME)
+			osd_req_op_writesame_osd_data_sg(osd_request, num_ops,
+						obj_request->sg,
+						obj_request->init_sg_offset,
+						min_t(u64, length,
+						obj_request->sg->length));
+		else
+			osd_req_op_extent_osd_data_sg(osd_request, num_ops,
+						obj_request->sg,
+						obj_request->init_sg_offset,
+						length);
+	}
 
 	/* Discards are also writes */
-	if (op_type == OBJ_OP_WRITE || op_type == OBJ_OP_DISCARD)
+	if (img_request_is_write_type_test(img_request))
 		rbd_osd_req_format_write(obj_request);
 	else
 		rbd_osd_req_format_read(obj_request);
-- 
1.8.3.1


^ permalink raw reply related	[flat|nested] 33+ messages in thread

* [PATCH 11/18] target: add compare and write callback
  2015-07-29  9:23 [PATCH 01/18] libceph: add scatterlist messenger data type mchristi
                   ` (8 preceding siblings ...)
  2015-07-29  9:23 ` [PATCH 10/18] rbd: add support for writesame requests mchristi
@ 2015-07-29  9:23 ` mchristi
  2015-07-29  9:23 ` [PATCH 12/18] target: compare and write backend driver sense handling mchristi
                   ` (9 subsequent siblings)
  19 siblings, 0 replies; 33+ messages in thread
From: mchristi @ 2015-07-29  9:23 UTC (permalink / raw)
  To: ceph-devel, target-devel

From: Mike Christie <michaelc@cs.wisc.edu>

Add a sbc ops callout for compare and write commands, so backends
like rbd which support that commmand can execute it natively.

Signed-off-by: Mike Christie <michaelc@cs.wisc.edu>
---
 drivers/target/target_core_sbc.c     | 9 +++++++--
 include/target/target_core_backend.h | 1 +
 2 files changed, 8 insertions(+), 2 deletions(-)

diff --git a/drivers/target/target_core_sbc.c b/drivers/target/target_core_sbc.c
index e318ddb..9a001e6 100644
--- a/drivers/target/target_core_sbc.c
+++ b/drivers/target/target_core_sbc.c
@@ -968,8 +968,13 @@ sbc_parse_cdb(struct se_cmd *cmd, struct sbc_ops *ops)
 		cmd->t_task_lba = get_unaligned_be64(&cdb[2]);
 		cmd->t_task_nolb = sectors;
 		cmd->se_cmd_flags |= SCF_SCSI_DATA_CDB | SCF_COMPARE_AND_WRITE;
-		cmd->execute_cmd = sbc_compare_and_write;
-		cmd->transport_complete_callback = compare_and_write_callback;
+		if (ops->execute_compare_and_write) {
+			cmd->execute_cmd = ops->execute_compare_and_write;
+		} else {
+			cmd->execute_cmd = sbc_compare_and_write;
+			cmd->transport_complete_callback =
+						compare_and_write_callback;
+		}
 		break;
 	case READ_CAPACITY:
 		size = READ_CAP_LEN;
diff --git a/include/target/target_core_backend.h b/include/target/target_core_backend.h
index 1e5c8f9..ec5a09f 100644
--- a/include/target/target_core_backend.h
+++ b/include/target/target_core_backend.h
@@ -51,6 +51,7 @@ struct sbc_ops {
 	sense_reason_t (*execute_write_same)(struct se_cmd *cmd);
 	sense_reason_t (*execute_unmap)(struct se_cmd *cmd,
 				sector_t lba, sector_t nolb);
+	sense_reason_t (*execute_compare_and_write)(struct se_cmd *cmd);
 };
 
 int	transport_backend_register(const struct target_backend_ops *);
-- 
1.8.3.1

^ permalink raw reply related	[flat|nested] 33+ messages in thread

* [PATCH 12/18] target: compare and write backend driver sense handling
  2015-07-29  9:23 [PATCH 01/18] libceph: add scatterlist messenger data type mchristi
                   ` (9 preceding siblings ...)
  2015-07-29  9:23 ` [PATCH 11/18] target: add compare and write callback mchristi
@ 2015-07-29  9:23 ` mchristi
  2015-09-04 19:41   ` Mike Christie
  2015-09-06  7:12   ` Christoph Hellwig
  2015-07-29  9:23 ` [PATCH 13/18] target: add COMPARE_AND_WRITE sg creation helper mchristi
                   ` (8 subsequent siblings)
  19 siblings, 2 replies; 33+ messages in thread
From: mchristi @ 2015-07-29  9:23 UTC (permalink / raw)
  To: ceph-devel, target-devel

From: Mike Christie <michaelc@cs.wisc.edu>

Currently, backend drivers seem to only fail IO with
SAM_STAT_CHECK_CONDITION which gets us
TCM_LOGICAL_UNIT_COMMUNICATION_FAILURE.
For compare and write support we will want to be able to fail with
TCM_MISCOMPARE_VERIFY. This patch adds a new helper that allows backend
drivers to fail with specific sense codes.

It also allows the backend driver to set the miscompare offset.

Signed-off-by: Mike Christie <michaelc@cs.wisc.edu>
---
 drivers/target/target_core_transport.c | 33 ++++++++++++++++++++++++++++-----
 include/target/target_core_backend.h   |  1 +
 include/target/target_core_base.h      |  5 ++++-
 3 files changed, 33 insertions(+), 6 deletions(-)

diff --git a/drivers/target/target_core_transport.c b/drivers/target/target_core_transport.c
index ce8574b..f9b0527 100644
--- a/drivers/target/target_core_transport.c
+++ b/drivers/target/target_core_transport.c
@@ -639,8 +639,7 @@ static void target_complete_failure_work(struct work_struct *work)
 {
 	struct se_cmd *cmd = container_of(work, struct se_cmd, work);
 
-	transport_generic_request_failure(cmd,
-			TCM_LOGICAL_UNIT_COMMUNICATION_FAILURE);
+	transport_generic_request_failure(cmd, cmd->sense_reason);
 }
 
 /*
@@ -666,14 +665,15 @@ static unsigned char *transport_get_sense_buffer(struct se_cmd *cmd)
 	return cmd->sense_buffer;
 }
 
-void target_complete_cmd(struct se_cmd *cmd, u8 scsi_status)
+static void __target_complete_cmd(struct se_cmd *cmd, u8 scsi_status,
+				  sense_reason_t sense_reason)
 {
 	struct se_device *dev = cmd->se_dev;
 	int success = scsi_status == GOOD;
 	unsigned long flags;
 
 	cmd->scsi_status = scsi_status;
-
+	cmd->sense_reason = sense_reason;
 
 	spin_lock_irqsave(&cmd->t_state_lock, flags);
 	cmd->transport_state &= ~CMD_T_BUSY;
@@ -716,8 +716,22 @@ void target_complete_cmd(struct se_cmd *cmd, u8 scsi_status)
 
 	queue_work(target_completion_wq, &cmd->work);
 }
+
+void target_complete_cmd(struct se_cmd *cmd, u8 scsi_status)
+{
+	__target_complete_cmd(cmd, scsi_status, scsi_status ?
+			     TCM_LOGICAL_UNIT_COMMUNICATION_FAILURE :
+			     TCM_NO_SENSE);
+}
 EXPORT_SYMBOL(target_complete_cmd);
 
+void target_complete_cmd_with_sense(struct se_cmd *cmd,
+				    sense_reason_t sense_reason)
+{
+	__target_complete_cmd(cmd, SAM_STAT_CHECK_CONDITION, sense_reason);
+}
+EXPORT_SYMBOL(target_complete_cmd_with_sense);
+
 void target_complete_cmd_with_length(struct se_cmd *cmd, u8 scsi_status, int length)
 {
 	if (scsi_status == SAM_STAT_GOOD && length < cmd->data_length) {
@@ -1650,6 +1664,7 @@ void transport_generic_request_failure(struct se_cmd *cmd,
 	case TCM_LOGICAL_BLOCK_GUARD_CHECK_FAILED:
 	case TCM_LOGICAL_BLOCK_APP_TAG_CHECK_FAILED:
 	case TCM_LOGICAL_BLOCK_REF_TAG_CHECK_FAILED:
+	case TCM_MISCOMPARE_VERIFY:
 		break;
 	case TCM_OUT_OF_RESOURCES:
 		sense_reason = TCM_LOGICAL_UNIT_COMMUNICATION_FAILURE;
@@ -2633,12 +2648,19 @@ void transport_err_sector_info(unsigned char *buffer, sector_t bad_sector)
 	buffer[SPC_ADD_SENSE_LEN_OFFSET] = 0xc;
 	buffer[SPC_DESC_TYPE_OFFSET] = 0; /* Information */
 	buffer[SPC_ADDITIONAL_DESC_LEN_OFFSET] = 0xa;
-	buffer[SPC_VALIDITY_OFFSET] = 0x80;
+	buffer[SPC_CMD_INFO_VALIDITY_OFFSET] = 0x80;
 
 	/* Descriptor Information: failing sector */
 	put_unaligned_be64(bad_sector, &buffer[12]);
 }
 
+static void transport_err_sense_info(unsigned char *buffer, u32 info)
+{
+	buffer[SPC_INFO_VALIDITY_OFFSET] |= 0x80;
+	/* Sense Information */
+	put_unaligned_be32(info, &buffer[3]);
+}
+
 int
 transport_send_check_condition_and_sense(struct se_cmd *cmd,
 		sense_reason_t reason, int from_transport)
@@ -2831,6 +2853,7 @@ transport_send_check_condition_and_sense(struct se_cmd *cmd,
 		/* MISCOMPARE DURING VERIFY OPERATION */
 		buffer[SPC_ASC_KEY_OFFSET] = 0x1d;
 		buffer[SPC_ASCQ_KEY_OFFSET] = 0x00;
+		transport_err_sense_info(buffer, cmd->sense_info);
 		break;
 	case TCM_LOGICAL_BLOCK_GUARD_CHECK_FAILED:
 		/* CURRENT ERROR */
diff --git a/include/target/target_core_backend.h b/include/target/target_core_backend.h
index ec5a09f..c98f6e6 100644
--- a/include/target/target_core_backend.h
+++ b/include/target/target_core_backend.h
@@ -58,6 +58,7 @@ int	transport_backend_register(const struct target_backend_ops *);
 void	target_backend_unregister(const struct target_backend_ops *);
 
 void	target_complete_cmd(struct se_cmd *, u8);
+void	target_complete_cmd_with_sense(struct se_cmd *, sense_reason_t);
 void	target_complete_cmd_with_length(struct se_cmd *, u8, int);
 
 sense_reason_t	spc_parse_cdb(struct se_cmd *cmd, unsigned int *size);
diff --git a/include/target/target_core_base.h b/include/target/target_core_base.h
index 17ae2d6..b83a8ec 100644
--- a/include/target/target_core_base.h
+++ b/include/target/target_core_base.h
@@ -22,11 +22,12 @@
  */
 #define TRANSPORT_SENSE_BUFFER			96
 /* Used by transport_send_check_condition_and_sense() */
+#define SPC_INFO_VALIDITY_OFFSET		0
 #define SPC_SENSE_KEY_OFFSET			2
 #define SPC_ADD_SENSE_LEN_OFFSET		7
 #define SPC_DESC_TYPE_OFFSET			8
 #define SPC_ADDITIONAL_DESC_LEN_OFFSET		9
-#define SPC_VALIDITY_OFFSET			10
+#define SPC_CMD_INFO_VALIDITY_OFFSET		10
 #define SPC_ASC_KEY_OFFSET			12
 #define SPC_ASCQ_KEY_OFFSET			13
 #define TRANSPORT_IQN_LEN			224
@@ -439,6 +440,8 @@ struct se_dif_v1_tuple {
 #define TCM_ACA_TAG	0x24
 
 struct se_cmd {
+	sense_reason_t		sense_reason;
+	u32			sense_info;
 	/* SAM response code being sent to initiator */
 	u8			scsi_status;
 	u8			scsi_asc;
-- 
1.8.3.1

^ permalink raw reply related	[flat|nested] 33+ messages in thread

* [PATCH 13/18] target: add COMPARE_AND_WRITE sg creation helper
  2015-07-29  9:23 [PATCH 01/18] libceph: add scatterlist messenger data type mchristi
                   ` (10 preceding siblings ...)
  2015-07-29  9:23 ` [PATCH 12/18] target: compare and write backend driver sense handling mchristi
@ 2015-07-29  9:23 ` mchristi
  2015-07-29  9:23 ` [PATCH 14/18] libceph: fix pr_fmt compile issues mchristi
                   ` (7 subsequent siblings)
  19 siblings, 0 replies; 33+ messages in thread
From: mchristi @ 2015-07-29  9:23 UTC (permalink / raw)
  To: ceph-devel, target-devel

From: Mike Christie <michaelc@cs.wisc.edu>

bc core and the rbd backend driver want seperate scatterlists
for the write phase of COMPARE_AND_WRITE. This moves the sbc
code to a helper function.

Signed-off-by: Mike Christie <michaelc@cs.wisc.edu>
---
 drivers/target/target_core_sbc.c     | 73 ++++++++++++++++++++++++------------
 include/target/target_core_backend.h |  1 +
 2 files changed, 49 insertions(+), 25 deletions(-)

diff --git a/drivers/target/target_core_sbc.c b/drivers/target/target_core_sbc.c
index 9a001e6..f803068 100644
--- a/drivers/target/target_core_sbc.c
+++ b/drivers/target/target_core_sbc.c
@@ -428,12 +428,58 @@ static sense_reason_t compare_and_write_post(struct se_cmd *cmd, bool success)
 	return TCM_NO_SENSE;
 }
 
+/**
+ * sbc_create_compare_and_write_sg - alloc and prep a sg for the write phase
+ * @cmd: se_cmd to copy scatterlist from.
+ *
+ * Takes the cmd's scatterlist and creates a new sg with only the write
+ * portion.
+*/
+struct scatterlist *sbc_create_compare_and_write_sg(struct se_cmd *cmd)
+{
+	struct se_device *dev = cmd->se_dev;
+	unsigned int block_size = dev->dev_attrib.block_size;
+	unsigned int len = cmd->t_task_nolb * block_size;
+	struct scatterlist *write_sg;
+	struct sg_mapping_iter m;
+	int i = 0;
+
+	write_sg = kmalloc(sizeof(struct scatterlist) * cmd->t_data_nents,
+			   GFP_KERNEL);
+	if (!write_sg) {
+		pr_err("Unable to allocate compare_and_write sg\n");
+		return NULL;
+	}
+	sg_init_table(write_sg, cmd->t_data_nents);
+
+	sg_miter_start(&m, cmd->t_data_sg, cmd->t_data_nents, SG_MITER_TO_SG);
+	/*
+	 * Currently assumes NoLB=1 and SGLs are PAGE_SIZE..
+	 */
+	while (len) {
+		sg_miter_next(&m);
+
+		if (block_size < PAGE_SIZE) {
+			sg_set_page(&write_sg[i], m.page, block_size,
+				    block_size);
+		} else {
+			sg_miter_next(&m);
+			sg_set_page(&write_sg[i], m.page, block_size, 0);
+		}
+		len -= block_size;
+		i++;
+	}
+	sg_miter_stop(&m);
+
+	return write_sg;
+}
+EXPORT_SYMBOL(sbc_create_compare_and_write_sg);
+
 static sense_reason_t compare_and_write_callback(struct se_cmd *cmd, bool success)
 {
 	struct se_device *dev = cmd->se_dev;
 	struct scatterlist *write_sg = NULL, *sg;
 	unsigned char *buf = NULL, *addr;
-	struct sg_mapping_iter m;
 	unsigned int offset = 0, len;
 	unsigned int nlbas = cmd->t_task_nolb;
 	unsigned int block_size = dev->dev_attrib.block_size;
@@ -469,14 +515,12 @@ static sense_reason_t compare_and_write_callback(struct se_cmd *cmd, bool succes
 		goto out;
 	}
 
-	write_sg = kmalloc(sizeof(struct scatterlist) * cmd->t_data_nents,
-			   GFP_KERNEL);
+	write_sg = sbc_create_compare_and_write_sg(cmd);
 	if (!write_sg) {
 		pr_err("Unable to allocate compare_and_write sg\n");
 		ret = TCM_OUT_OF_RESOURCES;
 		goto out;
 	}
-	sg_init_table(write_sg, cmd->t_data_nents);
 	/*
 	 * Setup verify and write data payloads from total NumberLBAs.
 	 */
@@ -513,27 +557,6 @@ static sense_reason_t compare_and_write_callback(struct se_cmd *cmd, bool succes
 			break;
 	}
 
-	i = 0;
-	len = cmd->t_task_nolb * block_size;
-	sg_miter_start(&m, cmd->t_data_sg, cmd->t_data_nents, SG_MITER_TO_SG);
-	/*
-	 * Currently assumes NoLB=1 and SGLs are PAGE_SIZE..
-	 */
-	while (len) {
-		sg_miter_next(&m);
-
-		if (block_size < PAGE_SIZE) {
-			sg_set_page(&write_sg[i], m.page, block_size,
-				    block_size);
-		} else {
-			sg_miter_next(&m);
-			sg_set_page(&write_sg[i], m.page, block_size,
-				    0);
-		}
-		len -= block_size;
-		i++;
-	}
-	sg_miter_stop(&m);
 	/*
 	 * Save the original SGL + nents values before updating to new
 	 * assignments, to be released in transport_free_pages() ->
diff --git a/include/target/target_core_backend.h b/include/target/target_core_backend.h
index c98f6e6..d8895e2 100644
--- a/include/target/target_core_backend.h
+++ b/include/target/target_core_backend.h
@@ -61,6 +61,7 @@ void	target_complete_cmd(struct se_cmd *, u8);
 void	target_complete_cmd_with_sense(struct se_cmd *, sense_reason_t);
 void	target_complete_cmd_with_length(struct se_cmd *, u8, int);
 
+struct scatterlist *sbc_create_compare_and_write_sg(struct se_cmd *);
 sense_reason_t	spc_parse_cdb(struct se_cmd *cmd, unsigned int *size);
 sense_reason_t	spc_emulate_report_luns(struct se_cmd *cmd);
 sense_reason_t	spc_emulate_inquiry_std(struct se_cmd *, unsigned char *);
-- 
1.8.3.1

^ permalink raw reply related	[flat|nested] 33+ messages in thread

* [PATCH 14/18] libceph: fix pr_fmt compile issues
  2015-07-29  9:23 [PATCH 01/18] libceph: add scatterlist messenger data type mchristi
                   ` (11 preceding siblings ...)
  2015-07-29  9:23 ` [PATCH 13/18] target: add COMPARE_AND_WRITE sg creation helper mchristi
@ 2015-07-29  9:23 ` mchristi
  2015-07-29  9:23 ` [PATCH 15/18] rbd: export some functions used by lio rbd backend mchristi
                   ` (6 subsequent siblings)
  19 siblings, 0 replies; 33+ messages in thread
From: mchristi @ 2015-07-29  9:23 UTC (permalink / raw)
  To: ceph-devel, target-devel

From: Mike Christie <michaelc@cs.wisc.edu>

When using ceph from other modules like the target ones, pr_fmt
might already be defined. This just ifndefs it.

Signed-off-by: Mike Christie <michaelc@cs.wisc.edu>
---
 include/linux/ceph/ceph_debug.h | 2 ++
 1 file changed, 2 insertions(+)

diff --git a/include/linux/ceph/ceph_debug.h b/include/linux/ceph/ceph_debug.h
index aa2e191..05711eb 100644
--- a/include/linux/ceph/ceph_debug.h
+++ b/include/linux/ceph/ceph_debug.h
@@ -1,7 +1,9 @@
 #ifndef _FS_CEPH_DEBUG_H
 #define _FS_CEPH_DEBUG_H
 
+#ifndef pr_fmt
 #define pr_fmt(fmt) KBUILD_MODNAME ": " fmt
+#endif
 
 #ifdef CONFIG_CEPH_LIB_PRETTYDEBUG
 
-- 
1.8.3.1

^ permalink raw reply related	[flat|nested] 33+ messages in thread

* [PATCH 15/18] rbd: export some functions used by lio rbd backend
  2015-07-29  9:23 [PATCH 01/18] libceph: add scatterlist messenger data type mchristi
                   ` (12 preceding siblings ...)
  2015-07-29  9:23 ` [PATCH 14/18] libceph: fix pr_fmt compile issues mchristi
@ 2015-07-29  9:23 ` mchristi
  2015-07-29  9:23 ` [PATCH 16/18] rbd: move structs used by lio rbd to new header mchristi
                   ` (5 subsequent siblings)
  19 siblings, 0 replies; 33+ messages in thread
From: mchristi @ 2015-07-29  9:23 UTC (permalink / raw)
  To: ceph-devel, target-devel

From: Mike Christie <michaelc@cs.wisc.edu>

The lio rbd backend will make img_request rbd calls, so this
patch exports the functions it uses.

Signed-off-by: Mike Christie <michaelc@cs.wisc.edu>
---
 drivers/block/rbd.c | 23 +++++++++++++----------
 1 file changed, 13 insertions(+), 10 deletions(-)

diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index 931d2d7..d89749c 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -1552,7 +1552,8 @@ static void rbd_img_request_get(struct rbd_img_request *img_request)
 static bool img_request_child_test(struct rbd_img_request *img_request);
 static void rbd_parent_request_destroy(struct kref *kref);
 static void rbd_img_request_destroy(struct kref *kref);
-static void rbd_img_request_put(struct rbd_img_request *img_request)
+
+void rbd_img_request_put(struct rbd_img_request *img_request)
 {
 	rbd_assert(img_request != NULL);
 	dout("%s: img %p (was %d)\n", __func__, img_request,
@@ -1562,6 +1563,7 @@ static void rbd_img_request_put(struct rbd_img_request *img_request)
 	else
 		kref_put(&img_request->kref, rbd_img_request_destroy);
 }
+EXPORT_SYMBOL(rbd_img_request_put);
 
 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
 					struct rbd_obj_request *obj_request)
@@ -2300,11 +2302,10 @@ static bool rbd_dev_parent_get(struct rbd_device *rbd_dev)
  * that comprises the image request, and the Linux request pointer
  * (if there is one).
  */
-static struct rbd_img_request *rbd_img_request_create(
-					struct rbd_device *rbd_dev,
-					u64 offset, u64 length,
-					enum obj_operation_type op_type,
-					struct ceph_snap_context *snapc)
+struct rbd_img_request *rbd_img_request_create(struct rbd_device *rbd_dev,
+					       u64 offset, u64 length,
+					       enum obj_operation_type op_type,
+					       struct ceph_snap_context *snapc)
 {
 	struct rbd_img_request *img_request;
 
@@ -2349,6 +2350,7 @@ static struct rbd_img_request *rbd_img_request_create(
 
 	return img_request;
 }
+EXPORT_SYMBOL(rbd_img_request_create);
 
 static void rbd_img_request_destroy(struct kref *kref)
 {
@@ -2602,9 +2604,8 @@ static void rbd_img_obj_request_fill(struct rbd_obj_request *obj_request,
  * function assumes data_desc describes memory sufficient to hold
  * all data described by the image request.
  */
-static int rbd_img_request_fill(struct rbd_img_request *img_request,
-					enum obj_request_type type,
-					void *data_desc)
+int rbd_img_request_fill(struct rbd_img_request *img_request,
+			 enum obj_request_type type, void *data_desc)
 {
 	struct rbd_device *rbd_dev = img_request->rbd_dev;
 	struct rbd_obj_request *obj_request = NULL;
@@ -2729,6 +2730,7 @@ out_unwind:
 
 	return -ENOMEM;
 }
+EXPORT_SYMBOL(rbd_img_request_fill);
 
 int rbd_img_cmp_and_write_request_fill(struct rbd_img_request *img_request,
 				       struct scatterlist *cmp_sgl,
@@ -3234,7 +3236,7 @@ static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
 	return rbd_img_obj_exists_submit(obj_request);
 }
 
-static int rbd_img_request_submit(struct rbd_img_request *img_request)
+int rbd_img_request_submit(struct rbd_img_request *img_request)
 {
 	struct rbd_obj_request *obj_request;
 	struct rbd_obj_request *next_obj_request;
@@ -3250,6 +3252,7 @@ static int rbd_img_request_submit(struct rbd_img_request *img_request)
 
 	return 0;
 }
+EXPORT_SYMBOL(rbd_img_request_submit);
 
 static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
 {
-- 
1.8.3.1


^ permalink raw reply related	[flat|nested] 33+ messages in thread

* [PATCH 16/18] rbd: move structs used by lio rbd to new header
  2015-07-29  9:23 [PATCH 01/18] libceph: add scatterlist messenger data type mchristi
                   ` (13 preceding siblings ...)
  2015-07-29  9:23 ` [PATCH 15/18] rbd: export some functions used by lio rbd backend mchristi
@ 2015-07-29  9:23 ` mchristi
  2015-07-29  9:23 ` [PATCH 17/18] target: add rbd backend mchristi
                   ` (4 subsequent siblings)
  19 siblings, 0 replies; 33+ messages in thread
From: mchristi @ 2015-07-29  9:23 UTC (permalink / raw)
  To: ceph-devel, target-devel

From: Mike Christie <michaelc@cs.wisc.edu>

This moves structs and other definitions needed by the lio rbd
backend module to a header.

Signed-off-by: Mike Christie <michaelc@cs.wisc.edu>
---
 drivers/block/rbd.c         | 146 +------------------------------------
 include/linux/ceph/librbd.h | 174 ++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 175 insertions(+), 145 deletions(-)
 create mode 100644 include/linux/ceph/librbd.h

diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index d89749c..248af2b 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -32,6 +32,7 @@
 #include <linux/ceph/osd_client.h>
 #include <linux/ceph/mon_client.h>
 #include <linux/ceph/decode.h>
+#include <linux/ceph/librbd.h>
 #include <linux/parser.h>
 #include <linux/bsearch.h>
 
@@ -51,15 +52,6 @@
 #define RBD_DEBUG	/* Activate rbd_assert() calls */
 
 /*
- * The basic unit of block I/O is a sector.  It is interpreted in a
- * number of contexts in Linux (blk, bio, genhd), but the default is
- * universally 512 bytes.  These symbols are just slightly more
- * meaningful than the bare numbers they represent.
- */
-#define	SECTOR_SHIFT	9
-#define	SECTOR_SIZE	(1ULL << SECTOR_SHIFT)
-
-/*
  * Increment the given counter and return its updated value.
  * If the counter is already 0 it will not be incremented.
  * If the counter is already at its maximum value returns
@@ -92,8 +84,6 @@ static int atomic_dec_return_safe(atomic_t *v)
 	return -EINVAL;
 }
 
-#define RBD_DRV_NAME "rbd"
-
 #define RBD_MINORS_PER_MAJOR		256
 #define RBD_SINGLE_MAJOR_PART_SHIFT	4
 
@@ -125,35 +115,6 @@ static int atomic_dec_return_safe(atomic_t *v)
 #define RBD_FEATURES_SUPPORTED	(RBD_FEATURES_ALL)
 
 /*
- * An RBD device name will be "rbd#", where the "rbd" comes from
- * RBD_DRV_NAME above, and # is a unique integer identifier.
- * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
- * enough to hold all possible device names.
- */
-#define DEV_NAME_LEN		32
-#define MAX_INT_FORMAT_WIDTH	((5 * sizeof (int)) / 2 + 1)
-
-/*
- * block device image metadata (in-memory version)
- */
-struct rbd_image_header {
-	/* These six fields never change for a given rbd image */
-	char *object_prefix;
-	__u8 obj_order;
-	__u8 crypt_type;
-	__u8 comp_type;
-	u64 stripe_unit;
-	u64 stripe_count;
-	u64 features;		/* Might be changeable someday? */
-
-	/* The remaining fields need to be updated occasionally */
-	u64 image_size;
-	struct ceph_snap_context *snapc;
-	char *snap_names;	/* format 1 only */
-	u64 *snap_sizes;	/* format 1 only */
-};
-
-/*
  * An rbd image specification.
  *
  * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
@@ -200,26 +161,11 @@ struct rbd_client {
 	struct list_head	node;
 };
 
-struct rbd_img_request;
-typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
-
 #define	BAD_WHICH	U32_MAX		/* Good which or bad which, which? */
 
 struct rbd_obj_request;
 typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
 
-enum obj_request_type {
-	OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES, OBJ_REQUEST_SG,
-};
-
-enum obj_operation_type {
-	OBJ_OP_WRITE,
-	OBJ_OP_READ,
-	OBJ_OP_DISCARD,
-	OBJ_OP_CMP_AND_WRITE,
-	OBJ_OP_WRITESAME,
-};
-
 enum obj_req_flags {
 	OBJ_REQ_DONE,		/* completion flag: not done = 0, done = 1 */
 	OBJ_REQ_IMG_DATA,	/* object usage: standalone = 0, image = 1 */
@@ -295,39 +241,6 @@ enum img_req_flags {
 	IMG_REQ_WRITESAME,	/* normal = 0, write same = 1 */
 };
 
-struct rbd_img_request {
-	struct rbd_device	*rbd_dev;
-	u64			offset;	/* starting image byte offset */
-	u64			length;	/* byte count from offset */
-	unsigned long		flags;
-
-	u64			snap_id;	/* for reads */
-	struct ceph_snap_context *snapc;	/* for writes */
-
-	struct request		*rq;		/* block request */
-	struct rbd_obj_request	*obj_request;	/* obj req initiator */
-	void			*lio_cmd_data;	/* lio specific data */
-
-	struct page		**copyup_pages;
-	u32			copyup_page_count;
-	spinlock_t		completion_lock;/* protects next_completion */
-	u32			next_completion;
-	rbd_img_callback_t	callback;
-	/*
-	 * xferred is the bytes that have successfully been transferred.
-	 * completed is the bytes that have been accounted for and includes
-	 * both failed and successfully transffered bytes.
-	 */
-	u64			xferred;/* aggregate bytes transferred */
-	u64			completed;
-	int			result;	/* first nonzero obj_request result */
-
-	u32			obj_request_count;
-	struct list_head	obj_requests;	/* rbd_obj_request structs */
-
-	struct kref		kref;
-};
-
 #define for_each_obj_request(ireq, oreq) \
 	list_for_each_entry(oreq, &(ireq)->obj_requests, links)
 #define for_each_obj_request_from(ireq, oreq) \
@@ -335,61 +248,6 @@ struct rbd_img_request {
 #define for_each_obj_request_safe(ireq, oreq, n) \
 	list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
 
-struct rbd_mapping {
-	u64                     size;
-	u64                     features;
-	bool			read_only;
-};
-
-/*
- * a single device
- */
-struct rbd_device {
-	int			dev_id;		/* blkdev unique id */
-
-	int			major;		/* blkdev assigned major */
-	int			minor;
-	struct gendisk		*disk;		/* blkdev's gendisk and rq */
-
-	u32			image_format;	/* Either 1 or 2 */
-	struct rbd_client	*rbd_client;
-
-	char			name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
-
-	spinlock_t		lock;		/* queue, flags, open_count */
-
-	struct rbd_image_header	header;
-	unsigned long		flags;		/* possibly lock protected */
-	struct rbd_spec		*spec;
-	struct rbd_options	*opts;
-
-	char			*header_name;
-
-	struct ceph_file_layout	layout;
-
-	struct ceph_osd_event   *watch_event;
-	struct rbd_obj_request	*watch_request;
-
-	struct rbd_spec		*parent_spec;
-	u64			parent_overlap;
-	atomic_t		parent_ref;
-	struct rbd_device	*parent;
-
-	/* Block layer tags. */
-	struct blk_mq_tag_set	tag_set;
-
-	/* protects updating the header */
-	struct rw_semaphore     header_rwsem;
-
-	struct rbd_mapping	mapping;
-
-	struct list_head	node;
-
-	/* sysfs related */
-	struct device		dev;
-	unsigned long		open_count;	/* protected by lock */
-};
-
 /*
  * Flag bits for rbd_dev->flags.  If atomicity is required,
  * rbd_dev->lock is used to protect access.
@@ -429,8 +287,6 @@ static bool single_major = false;
 module_param(single_major, bool, S_IRUGO);
 MODULE_PARM_DESC(single_major, "Use a single major number for all rbd devices (default: false)");
 
-static int rbd_img_request_submit(struct rbd_img_request *img_request);
-
 static void rbd_dev_device_release(struct device *dev);
 
 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
diff --git a/include/linux/ceph/librbd.h b/include/linux/ceph/librbd.h
new file mode 100644
index 0000000..6e25dfd
--- /dev/null
+++ b/include/linux/ceph/librbd.h
@@ -0,0 +1,174 @@
+#ifndef _LIBRBD_H
+#define _LIBRBD_H
+
+#include <linux/blk-mq.h>
+#include <linux/device.h>
+
+/*
+ * The basic unit of block I/O is a sector.  It is interpreted in a
+ * number of contexts in Linux (blk, bio, genhd), but the default is
+ * universally 512 bytes.  These symbols are just slightly more
+ * meaningful than the bare numbers they represent.
+ */
+#define	SECTOR_SHIFT	9
+#define	SECTOR_SIZE	(1ULL << SECTOR_SHIFT)
+
+#define RBD_DRV_NAME "rbd"
+
+/*
+ * An RBD device name will be "rbd#", where the "rbd" comes from
+ * RBD_DRV_NAME above, and # is a unique integer identifier.
+ * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
+ * enough to hold all possible device names.
+ */
+#define DEV_NAME_LEN		32
+#define MAX_INT_FORMAT_WIDTH	((5 * sizeof (int)) / 2 + 1)
+
+/*
+ * block device image metadata (in-memory version)
+ */
+struct rbd_image_header {
+	/* These six fields never change for a given rbd image */
+	char *object_prefix;
+	__u8 obj_order;
+	__u8 crypt_type;
+	__u8 comp_type;
+	u64 stripe_unit;
+	u64 stripe_count;
+	u64 features;		/* Might be changeable someday? */
+
+	/* The remaining fields need to be updated occasionally */
+	u64 image_size;
+	struct ceph_snap_context *snapc;
+	char *snap_names;	/* format 1 only */
+	u64 *snap_sizes;	/* format 1 only */
+};
+
+enum obj_request_type {
+	OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES, OBJ_REQUEST_SG,
+};
+
+enum obj_operation_type {
+	OBJ_OP_WRITE,
+	OBJ_OP_READ,
+	OBJ_OP_DISCARD,
+	OBJ_OP_CMP_AND_WRITE,
+	OBJ_OP_WRITESAME,
+};
+
+struct rbd_img_request;
+typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
+
+struct rbd_obj_request;
+
+struct rbd_img_request {
+	struct rbd_device	*rbd_dev;
+	u64			offset;	/* starting image byte offset */
+	u64			length;	/* byte count from offset */
+	unsigned long		flags;
+
+	u64			snap_id;	/* for reads */
+	struct ceph_snap_context *snapc;	/* for writes */
+
+	struct request		*rq;		/* block request */
+	struct rbd_obj_request	*obj_request;	/* obj req initiator */
+	void			*lio_cmd_data;	/* lio specific data */
+
+	struct page		**copyup_pages;
+	u32			copyup_page_count;
+	spinlock_t		completion_lock;/* protects next_completion */
+	u32			next_completion;
+	rbd_img_callback_t	callback;
+        /*
+	 * xferred is the bytes that have successfully been transferred.
+	 * completed is the bytes that have been accounted for and includes
+	 * failures.
+	 */
+	u64			xferred;/* aggregate bytes transferred */
+	u64			completed;/* aggregate bytes completed */
+	int			result;	/* first nonzero obj_request result */
+
+	u32			obj_request_count;
+	struct list_head	obj_requests;	/* rbd_obj_request structs */
+
+	struct kref		kref;
+};
+
+struct rbd_mapping {
+	u64                     size;
+	u64                     features;
+	bool			read_only;
+};
+
+struct rbd_client;
+struct rbd_spec;
+struct rbd_options;
+
+/*
+ * a single device
+ */
+struct rbd_device {
+	int			dev_id;		/* blkdev unique id */
+
+	int			major;		/* blkdev assigned major */
+	int			minor;
+	struct gendisk		*disk;		/* blkdev's gendisk and rq */
+
+	u32			image_format;	/* Either 1 or 2 */
+	struct rbd_client	*rbd_client;
+
+	char			name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
+
+	spinlock_t		lock;		/* queue, flags, open_count */
+
+	struct rbd_image_header	header;
+	unsigned long		flags;		/* possibly lock protected */
+	struct rbd_spec		*spec;
+	struct rbd_options	*opts;
+
+	char			*header_name;
+
+	struct ceph_file_layout	layout;
+
+	struct ceph_osd_event   *watch_event;
+	struct rbd_obj_request	*watch_request;
+
+	struct rbd_spec		*parent_spec;
+	u64			parent_overlap;
+	atomic_t		parent_ref;
+	struct rbd_device	*parent;
+
+	/* Block layer tags. */
+	struct blk_mq_tag_set	tag_set;
+
+	/* protects updating the header */
+	struct rw_semaphore     header_rwsem;
+
+	struct rbd_mapping	mapping;
+
+	struct list_head	node;
+
+	/* sysfs related */
+	struct device		dev;
+	unsigned long		open_count;	/* protected by lock */
+};
+
+extern struct rbd_img_request *rbd_img_request_create(
+					struct rbd_device *rbd_dev,
+					u64 offset, u64 length,
+					enum obj_operation_type op_type,
+					struct ceph_snap_context *snapc);
+extern int rbd_img_cmp_and_write_request_fill(
+					struct rbd_img_request *img_request,
+					struct scatterlist *cmp_sgl,
+					u64 cmp_length,
+					struct scatterlist *write_sgl,
+					u64 write_length,
+					struct page **response_pages,
+					u64 response_length);
+extern int rbd_img_request_fill(struct rbd_img_request *img_request,
+				enum obj_request_type type, void *data_desc);
+extern int rbd_img_request_submit(struct rbd_img_request *img_request);
+extern void rbd_img_request_put(struct rbd_img_request *img_request);
+
+#endif
-- 
1.8.3.1


^ permalink raw reply related	[flat|nested] 33+ messages in thread

* [PATCH 17/18] target: add rbd backend
  2015-07-29  9:23 [PATCH 01/18] libceph: add scatterlist messenger data type mchristi
                   ` (14 preceding siblings ...)
  2015-07-29  9:23 ` [PATCH 16/18] rbd: move structs used by lio rbd to new header mchristi
@ 2015-07-29  9:23 ` mchristi
  2015-07-29 14:27   ` Bart Van Assche
  2015-07-29  9:23 ` [PATCH 18/18] target: add lio rbd to makefile/Kconfig mchristi
                   ` (3 subsequent siblings)
  19 siblings, 1 reply; 33+ messages in thread
From: mchristi @ 2015-07-29  9:23 UTC (permalink / raw)
  To: ceph-devel, target-devel

From: Mike Christie <michaelc@cs.wisc.edu>

This adds a lio rbd backend. It translates scsi commands to ceph/rados
calls directly instead of going through the block layer (se_cmd -> bio ->
request -> ceph/rados).

It currently still uses the request_queue created by rbd for some setup
in tcm_rbd_configure_device and for matching a rbd device to a lio
se_device in tcm_rbd_set_configfs_dev_params.

Signed-off-by: Mike Christie <michaelc@cs.wisc.edu>
---
 drivers/target/target_core_rbd.c | 733 +++++++++++++++++++++++++++++++++++++++
 drivers/target/target_core_rbd.h |  23 ++
 2 files changed, 756 insertions(+)
 create mode 100644 drivers/target/target_core_rbd.c
 create mode 100644 drivers/target/target_core_rbd.h

diff --git a/drivers/target/target_core_rbd.c b/drivers/target/target_core_rbd.c
new file mode 100644
index 0000000..c181ee8
--- /dev/null
+++ b/drivers/target/target_core_rbd.c
@@ -0,0 +1,733 @@
+/*******************************************************************************
+ * Filename:  target_core_rbd.c
+ *
+ * This file contains the Storage Engine  <-> Ceph RBD transport
+ * specific functions.
+ *
+ * [Was based off of target_core_iblock.c from
+ *  Nicholas A. Bellinger <nab@kernel.org>]
+ *
+ * (c) Copyright 2003-2013 Datera, Inc.
+ * (c) Copyright 2015 Red Hat, Inc
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+ *
+ ******************************************************************************/
+
+#include <linux/string.h>
+#include <linux/parser.h>
+#include <linux/blkdev.h>
+#include <linux/spinlock.h>
+#include <linux/genhd.h>
+#include <linux/module.h>
+#include <asm/unaligned.h>
+
+#include <scsi/scsi_proto.h>
+
+#include <linux/ceph/libceph.h>
+#include <linux/ceph/osd_client.h>
+#include <linux/ceph/mon_client.h>
+#include <linux/ceph/librbd.h>
+
+#include <target/target_core_base.h>
+#include <target/target_core_backend.h>
+#include <target/target_core_fabric.h>
+
+#include "target_core_rbd.h"
+
+static inline struct tcm_rbd_dev *TCM_RBD_DEV(struct se_device *dev)
+{
+	return container_of(dev, struct tcm_rbd_dev, dev);
+}
+
+static int tcm_rbd_attach_hba(struct se_hba *hba, u32 host_id)
+{
+	pr_debug("CORE_HBA[%d] - TCM RBD HBA Driver %s on"
+		" Generic Target Core Stack %s\n", hba->hba_id,
+		TCM_RBD_VERSION, TARGET_CORE_VERSION);
+	return 0;
+}
+
+static void tcm_rbd_detach_hba(struct se_hba *hba)
+{
+}
+
+static struct se_device *tcm_rbd_alloc_device(struct se_hba *hba,
+					      const char *name)
+{
+	struct tcm_rbd_dev *tcm_rbd_dev;
+
+	tcm_rbd_dev = kzalloc(sizeof(struct tcm_rbd_dev), GFP_KERNEL);
+	if (!tcm_rbd_dev) {
+		pr_err("Unable to allocate struct tcm_rbd_dev\n");
+		return NULL;
+	}
+
+	pr_debug( "TCM RBD: Allocated tcm_rbd_dev for %s\n", name);
+
+	return &tcm_rbd_dev->dev;
+}
+
+static int tcm_rbd_configure_device(struct se_device *dev)
+{
+	struct tcm_rbd_dev *tcm_rbd_dev = TCM_RBD_DEV(dev);
+	struct request_queue *q;
+	struct block_device *bd = NULL;
+	fmode_t mode;
+
+	if (!(tcm_rbd_dev->bd_flags & TCM_RBD_HAS_UDEV_PATH)) {
+		pr_err("Missing udev_path= parameters for TCM RBD\n");
+		return -EINVAL;
+	}
+
+	pr_debug( "TCM RBD: Claiming struct block_device: %s\n",
+		 tcm_rbd_dev->bd_udev_path);
+
+	mode = FMODE_READ|FMODE_EXCL;
+	if (!tcm_rbd_dev->bd_readonly)
+		mode |= FMODE_WRITE;
+
+	bd = blkdev_get_by_path(tcm_rbd_dev->bd_udev_path, mode, tcm_rbd_dev);
+	if (IS_ERR(bd))
+		return PTR_ERR(bd);
+
+	tcm_rbd_dev->bd = bd;
+
+	q = bdev_get_queue(bd);
+	tcm_rbd_dev->rbd_dev = q->queuedata;
+
+	dev->dev_attrib.hw_block_size = bdev_logical_block_size(bd);
+	dev->dev_attrib.hw_max_sectors = queue_max_hw_sectors(q);
+	dev->dev_attrib.hw_queue_depth = q->nr_requests;
+
+	/*
+	 * Check if the underlying struct block_device request_queue supports
+	 * the QUEUE_FLAG_DISCARD bit for UNMAP/WRITE_SAME in SCSI + TRIM
+	 * in ATA and we need to set TPE=1
+	 */
+	if (blk_queue_discard(q)) {
+		dev->dev_attrib.max_unmap_lba_count =
+				q->limits.max_discard_sectors;
+
+		/*
+		 * Currently hardcoded to 1 in Linux/SCSI code..
+		 */
+		dev->dev_attrib.max_unmap_block_desc_count = 1;
+		dev->dev_attrib.unmap_granularity =
+				q->limits.discard_granularity >> 9;
+		dev->dev_attrib.unmap_granularity_alignment =
+				q->limits.discard_alignment;
+
+		pr_debug("TCM RBD: BLOCK Discard support available disabled by default\n");
+	}
+	/*
+	 * Enable write same emulation for RBD and use 0xFFFF as
+	 * the smaller WRITE_SAME(10) only has a two-byte block count.
+	 */
+	dev->dev_attrib.max_write_same_len = 0xFFFF;
+	dev->dev_attrib.is_nonrot = 1;
+	return 0;
+}
+
+static void tcm_rbd_free_device(struct se_device *dev)
+{
+	struct tcm_rbd_dev *tcm_rbd_dev = TCM_RBD_DEV(dev);
+
+	if (tcm_rbd_dev->bd != NULL)
+		blkdev_put(tcm_rbd_dev->bd, FMODE_WRITE|FMODE_READ|FMODE_EXCL);
+
+	kfree(tcm_rbd_dev);
+}
+
+static sector_t tcm_rbd_get_blocks(struct se_device *dev)
+{
+	struct tcm_rbd_dev *tcm_rbd_dev = TCM_RBD_DEV(dev);
+	sector_t blocks_long = tcm_rbd_dev->rbd_dev->mapping.size >>
+								SECTOR_SHIFT;
+
+	if (SECTOR_SIZE == dev->dev_attrib.block_size)
+		return blocks_long;
+
+	switch (SECTOR_SIZE) {
+	case 4096:
+		switch (dev->dev_attrib.block_size) {
+		case 2048:
+			blocks_long <<= 1;
+			break;
+		case 1024:
+			blocks_long <<= 2;
+			break;
+		case 512:
+			blocks_long <<= 3;
+		default:
+			break;
+		}
+		break;
+	case 2048:
+		switch (dev->dev_attrib.block_size) {
+		case 4096:
+			blocks_long >>= 1;
+			break;
+		case 1024:
+			blocks_long <<= 1;
+			break;
+		case 512:
+			blocks_long <<= 2;
+			break;
+		default:
+			break;
+		}
+		break;
+	case 1024:
+		switch (dev->dev_attrib.block_size) {
+		case 4096:
+			blocks_long >>= 2;
+			break;
+		case 2048:
+			blocks_long >>= 1;
+			break;
+		case 512:
+			blocks_long <<= 1;
+			break;
+		default:
+			break;
+		}
+		break;
+	case 512:
+		switch (dev->dev_attrib.block_size) {
+		case 4096:
+			blocks_long >>= 3;
+			break;
+		case 2048:
+			blocks_long >>= 2;
+			break;
+		case 1024:
+			blocks_long >>= 1;
+			break;
+		default:
+			break;
+		}
+		break;
+	default:
+		break;
+	}
+
+	return blocks_long;
+}
+
+static void rbd_complete_cmd(struct se_cmd *cmd)
+{
+	struct rbd_img_request *img_request = cmd->priv;
+	u8 status = SAM_STAT_GOOD;
+
+	if (img_request && img_request->result)
+		status = SAM_STAT_CHECK_CONDITION;
+	else
+		status = SAM_STAT_GOOD;
+
+	target_complete_cmd(cmd, status);
+	if (img_request)
+		rbd_img_request_put(img_request);
+}
+
+static sense_reason_t tcm_rbd_execute_sync_cache(struct se_cmd *cmd)
+{
+	/* Ceph/Rados supports flush, but kRBD does not yet */
+	target_complete_cmd(cmd, SAM_STAT_GOOD);
+	return 0;
+}
+
+/*
+ * Convert the blocksize advertised to the initiator to the RBD offset.
+ */
+static u64 rbd_lba_shift(struct se_device *dev, unsigned long long task_lba)
+{
+	sector_t block_lba;
+
+	/* convert to linux block which uses 512 byte sectors */
+	if (dev->dev_attrib.block_size == 4096)
+		block_lba = task_lba << 3;
+	else if (dev->dev_attrib.block_size == 2048)
+		block_lba = task_lba << 2;
+	else if (dev->dev_attrib.block_size == 1024)
+		block_lba = task_lba << 1;
+	else
+		block_lba = task_lba;
+
+	/* convert to RBD offset */
+	return block_lba << SECTOR_SHIFT;
+}
+
+static void tcm_rbd_async_callback(struct rbd_img_request *img_request)
+{
+	rbd_complete_cmd(img_request->lio_cmd_data);
+}
+
+static void tcm_rbd_sync_callback(struct rbd_img_request *img_request)
+{
+	struct completion *waiting = img_request->lio_cmd_data;
+
+	complete(waiting);
+}
+
+static sense_reason_t
+tcm_rbd_execute_cmd(struct se_cmd *cmd, struct rbd_device *rbd_dev,
+		    struct scatterlist *sgl, enum obj_operation_type op_type,
+		    u64 offset, u64 length, bool sync)
+{
+	struct rbd_img_request *img_request;
+	struct ceph_snap_context *snapc = NULL;
+	DECLARE_COMPLETION_ONSTACK(wait);
+	sense_reason_t sense = TCM_NO_SENSE;
+	int ret;
+
+	if (op_type == OBJ_OP_WRITE || op_type == OBJ_OP_WRITESAME) {
+		down_read(&rbd_dev->header_rwsem);
+		snapc = rbd_dev->header.snapc;
+		ceph_get_snap_context(snapc);
+		up_read(&rbd_dev->header_rwsem);
+	}
+
+	img_request = rbd_img_request_create(rbd_dev, offset, length,
+					     op_type, snapc);
+	if (!img_request) {
+		sense = TCM_OUT_OF_RESOURCES;
+		goto free_snapc;
+	}
+
+	ret = rbd_img_request_fill(img_request,
+				   sgl ? OBJ_REQUEST_SG : OBJ_REQUEST_NODATA,
+				   sgl);
+	if (ret) {
+		sense = TCM_OUT_OF_RESOURCES;
+		goto free_req;
+	}
+
+	if (sync) {
+		img_request->lio_cmd_data = &wait;
+		img_request->callback = tcm_rbd_sync_callback;
+	} else {
+		img_request->lio_cmd_data = cmd;
+		img_request->callback = tcm_rbd_async_callback;
+	}
+	cmd->priv = img_request;
+
+	ret = rbd_img_request_submit(img_request);
+	if (ret == -ENOMEM) {
+		sense = TCM_OUT_OF_RESOURCES;
+		goto free_req;
+	} else if (ret) {
+		sense = TCM_LOGICAL_UNIT_COMMUNICATION_FAILURE;
+		goto free_req;
+	}
+
+	if (sync) {
+		wait_for_completion(&wait);
+		if (img_request->result)
+			sense = TCM_LOGICAL_UNIT_COMMUNICATION_FAILURE;
+		rbd_img_request_put(img_request);
+	}
+
+	return sense;
+
+free_req:
+	rbd_img_request_put(img_request);
+free_snapc:
+	ceph_put_snap_context(snapc);
+	return sense;
+}
+
+static sense_reason_t tcm_rbd_execute_unmap(struct se_cmd *cmd,
+					    sector_t lba, sector_t nolb)
+{
+	struct tcm_rbd_dev *tcm_rbd_dev = TCM_RBD_DEV(cmd->se_dev);
+	struct rbd_device *rbd_dev = tcm_rbd_dev->rbd_dev;
+
+	return tcm_rbd_execute_cmd(cmd, rbd_dev, NULL, OBJ_OP_DISCARD,
+				   lba << SECTOR_SHIFT, nolb << SECTOR_SHIFT,
+				   true);
+}
+
+static sense_reason_t tcm_rbd_execute_write_same(struct se_cmd *cmd)
+{
+	struct se_device *dev = cmd->se_dev;
+	struct tcm_rbd_dev *tcm_rbd_dev = TCM_RBD_DEV(dev);
+	struct rbd_device *rbd_dev = tcm_rbd_dev->rbd_dev;
+	sector_t sectors = sbc_get_write_same_sectors(cmd);
+	u64 length = rbd_lba_shift(dev, sectors);
+	struct scatterlist *sg;
+
+	if (cmd->prot_op) {
+		pr_err("WRITE_SAME: Protection information with IBLOCK"
+		       " backends not supported\n");
+		return TCM_LOGICAL_UNIT_COMMUNICATION_FAILURE;
+	}
+	sg = &cmd->t_data_sg[0];
+
+	if (cmd->t_data_nents > 1 ||
+	    sg->length != cmd->se_dev->dev_attrib.block_size) {
+		pr_err("WRITE_SAME: Illegal SGL t_data_nents: %u length: %u"
+			" block_size: %u\n", cmd->t_data_nents, sg->length,
+			cmd->se_dev->dev_attrib.block_size);
+		return TCM_INVALID_CDB_FIELD;
+	}
+
+	return tcm_rbd_execute_cmd(cmd, rbd_dev, sg, OBJ_OP_WRITESAME,
+				   rbd_lba_shift(dev, cmd->t_task_lba), length,
+				   false);
+}
+
+static void tcm_rbd_cmp_and_write_callback(struct rbd_img_request *img_request)
+{
+	struct se_cmd *cmd = img_request->lio_cmd_data;
+	struct se_device *dev = cmd->se_dev;
+	struct tcm_rbd_dev *tcm_rbd_dev = TCM_RBD_DEV(dev);
+	sense_reason_t sense_reason = TCM_NO_SENSE;
+	u64 miscompare_off;
+
+	if (img_request->result == -EILSEQ) {
+		ceph_copy_from_page_vector(tcm_rbd_dev->cmp_and_write_pages,
+					   &miscompare_off, 0,
+					   sizeof(miscompare_off));
+		cmd->sense_info = (u32)le64_to_cpu(miscompare_off);
+		pr_err("COMPARE_AND_WRITE: miscompare at offset %llu\n",
+		       (unsigned long long)cmd->bad_sector);
+		sense_reason = TCM_MISCOMPARE_VERIFY;
+	}
+	kfree(tcm_rbd_dev->cmp_and_write_sg);
+	tcm_rbd_dev->cmp_and_write_sg = NULL;
+	up(&dev->caw_sem);
+
+	if (sense_reason != TCM_NO_SENSE) {
+		/* TODO pass miscompare offset */
+		target_complete_cmd_with_sense(cmd, sense_reason);
+	} else if (img_request->result) {
+		target_complete_cmd(cmd, SAM_STAT_CHECK_CONDITION);
+	} else {
+		target_complete_cmd(cmd, SAM_STAT_GOOD);
+	}
+	rbd_img_request_put(img_request);
+}
+
+static sense_reason_t tcm_rbd_execute_cmp_and_write(struct se_cmd *cmd)
+{
+	struct se_device *dev = cmd->se_dev;
+	struct tcm_rbd_dev *tcm_rbd_dev = TCM_RBD_DEV(dev);
+	struct rbd_device *rbd_dev = tcm_rbd_dev->rbd_dev;
+	struct rbd_img_request *img_request;
+	struct ceph_snap_context *snapc;
+	sense_reason_t sense = TCM_NO_SENSE;
+	unsigned int len = cmd->t_task_nolb * dev->dev_attrib.block_size;
+	u64 response_len = len + sizeof(u64);
+	struct page **pages;
+	u32 page_count;
+	int ret;
+
+	down_read(&rbd_dev->header_rwsem);
+	snapc = rbd_dev->header.snapc;
+	ceph_get_snap_context(snapc);
+	up_read(&rbd_dev->header_rwsem);
+
+	img_request = rbd_img_request_create(rbd_dev,
+					     rbd_lba_shift(dev, cmd->t_task_lba),
+					     len, OBJ_OP_CMP_AND_WRITE, snapc);
+	if (!img_request) {
+		sense = TCM_OUT_OF_RESOURCES;
+		goto free_snapc;
+	}
+
+	ret = down_interruptible(&dev->caw_sem);
+	if (ret != 0 || signal_pending(current)) {
+		sense = TCM_LOGICAL_UNIT_COMMUNICATION_FAILURE;
+		goto free_req;
+	}
+
+	tcm_rbd_dev->cmp_and_write_sg = sbc_create_compare_and_write_sg(cmd);
+	if (!tcm_rbd_dev->cmp_and_write_sg)
+		goto rel_caw_sem;
+
+	page_count = (u32)calc_pages_for(0, len + sizeof(u64));
+	pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
+	if (IS_ERR(pages))
+		goto free_write_sg;
+	tcm_rbd_dev->cmp_and_write_pages = pages;
+
+	ret = rbd_img_cmp_and_write_request_fill(img_request, cmd->t_data_sg,
+						 len,
+						 tcm_rbd_dev->cmp_and_write_sg,
+						 len, pages, response_len);
+	if (ret == -EOPNOTSUPP) {
+		sense = TCM_INVALID_CDB_FIELD;
+		goto free_pages;
+	} else if (ret) {
+		sense = TCM_OUT_OF_RESOURCES;
+		goto free_pages;
+	}
+
+	cmd->priv = img_request;
+	img_request->lio_cmd_data = cmd;
+	img_request->callback = tcm_rbd_cmp_and_write_callback;
+
+	ret = rbd_img_request_submit(img_request);
+
+	if (ret == -ENOMEM) {
+		sense = TCM_OUT_OF_RESOURCES;
+		goto free_write_sg;
+	} else if (ret) {
+		sense = TCM_LOGICAL_UNIT_COMMUNICATION_FAILURE;
+		goto free_write_sg;
+	}
+	return 0;
+
+free_pages:
+	ceph_release_page_vector(pages, page_count);
+free_write_sg:
+	kfree(tcm_rbd_dev->cmp_and_write_sg);
+	tcm_rbd_dev->cmp_and_write_sg = NULL;
+rel_caw_sem:
+	up(&dev->caw_sem);
+free_req:
+	rbd_img_request_put(img_request);
+free_snapc:
+	ceph_put_snap_context(snapc);
+	return sense;
+}
+
+enum {
+	Opt_udev_path, Opt_readonly, Opt_force, Opt_err
+};
+
+static match_table_t tokens = {
+	{Opt_udev_path, "udev_path=%s"},
+	{Opt_readonly, "readonly=%d"},
+	{Opt_force, "force=%d"},
+	{Opt_err, NULL}
+};
+
+static ssize_t
+tcm_rbd_set_configfs_dev_params(struct se_device *dev, const char *page,
+				ssize_t count)
+{
+	struct tcm_rbd_dev *tcm_rbd_dev = TCM_RBD_DEV(dev);
+	char *orig, *ptr, *arg_p, *opts;
+	substring_t args[MAX_OPT_ARGS];
+	int ret = 0, token;
+	unsigned long tmp_readonly;
+
+	opts = kstrdup(page, GFP_KERNEL);
+	if (!opts)
+		return -ENOMEM;
+
+	orig = opts;
+
+	while ((ptr = strsep(&opts, ",\n")) != NULL) {
+		if (!*ptr)
+			continue;
+
+		token = match_token(ptr, tokens, args);
+		switch (token) {
+		case Opt_udev_path:
+			if (tcm_rbd_dev->bd) {
+				pr_err("Unable to set udev_path= while"
+					" tcm_rbd_dev->bd exists\n");
+				ret = -EEXIST;
+				goto out;
+			}
+			if (match_strlcpy(tcm_rbd_dev->bd_udev_path, &args[0],
+				SE_UDEV_PATH_LEN) == 0) {
+				ret = -EINVAL;
+				break;
+			}
+			pr_debug("TCM RBD: Referencing UDEV path: %s\n",
+				 tcm_rbd_dev->bd_udev_path);
+			tcm_rbd_dev->bd_flags |= TCM_RBD_HAS_UDEV_PATH;
+			break;
+		case Opt_readonly:
+			arg_p = match_strdup(&args[0]);
+			if (!arg_p) {
+				ret = -ENOMEM;
+				break;
+			}
+			ret = kstrtoul(arg_p, 0, &tmp_readonly);
+			kfree(arg_p);
+			if (ret < 0) {
+				pr_err("kstrtoul() failed for readonly=\n");
+				goto out;
+			}
+			tcm_rbd_dev->bd_readonly = tmp_readonly;
+			pr_debug("TCM RBD: readonly: %d\n",
+				 tcm_rbd_dev->bd_readonly);
+			break;
+		case Opt_force:
+			break;
+		default:
+			break;
+		}
+	}
+
+out:
+	kfree(orig);
+	return (!ret) ? count : ret;
+}
+
+static ssize_t tcm_rbd_show_configfs_dev_params(struct se_device *dev, char *b)
+{
+	struct tcm_rbd_dev *tcm_rbd_dev = TCM_RBD_DEV(dev);
+	struct block_device *bd = tcm_rbd_dev->bd;
+	char buf[BDEVNAME_SIZE];
+	ssize_t bl = 0;
+
+	if (bd)
+		bl += sprintf(b + bl, "rbd device: %s", bdevname(bd, buf));
+	if (tcm_rbd_dev->bd_flags & TCM_RBD_HAS_UDEV_PATH)
+		bl += sprintf(b + bl, "  UDEV PATH: %s",
+			      tcm_rbd_dev->bd_udev_path);
+	bl += sprintf(b + bl, "  readonly: %d\n", tcm_rbd_dev->bd_readonly);
+
+	bl += sprintf(b + bl, "        ");
+	if (bd) {
+		bl += sprintf(b + bl, "Major: %d Minor: %d  %s\n",
+			      MAJOR(bd->bd_dev), MINOR(bd->bd_dev),
+			      (!bd->bd_contains) ?
+			      "" : (bd->bd_holder == tcm_rbd_dev) ?
+			      "CLAIMED: RBD" : "CLAIMED: OS");
+	} else {
+		bl += sprintf(b + bl, "Major: 0 Minor: 0\n");
+	}
+
+	return bl;
+}
+
+static sense_reason_t
+tcm_rbd_execute_rw(struct se_cmd *cmd, struct scatterlist *sgl, u32 sgl_nents,
+		   enum dma_data_direction data_direction)
+{
+	struct se_device *dev = cmd->se_dev;
+	struct tcm_rbd_dev *tcm_rbd_dev = TCM_RBD_DEV(dev);
+	struct rbd_device *rbd_dev = tcm_rbd_dev->rbd_dev;
+	enum obj_operation_type op_type;
+
+	if (!sgl_nents) {
+		rbd_complete_cmd(cmd);
+		return 0;
+	}
+
+	if (data_direction == DMA_FROM_DEVICE) {
+		op_type = OBJ_OP_READ;
+	} else {
+		op_type = OBJ_OP_WRITE;
+	}
+
+	return tcm_rbd_execute_cmd(cmd, rbd_dev, sgl, op_type,
+				   rbd_lba_shift(dev, cmd->t_task_lba),
+				   cmd->data_length, false);
+}
+
+static sector_t tcm_rbd_get_alignment_offset_lbas(struct se_device *dev)
+{
+	struct tcm_rbd_dev *tcm_rbd_dev = TCM_RBD_DEV(dev);
+	struct block_device *bd = tcm_rbd_dev->bd;
+	int ret;
+
+	ret = bdev_alignment_offset(bd);
+	if (ret == -1)
+		return 0;
+
+	/* convert offset-bytes to offset-lbas */
+	return ret / bdev_logical_block_size(bd);
+}
+
+static unsigned int tcm_rbd_get_lbppbe(struct se_device *dev)
+{
+	struct tcm_rbd_dev *tcm_rbd_dev = TCM_RBD_DEV(dev);
+	struct block_device *bd = tcm_rbd_dev->bd;
+	int logs_per_phys = bdev_physical_block_size(bd) / bdev_logical_block_size(bd);
+
+	return ilog2(logs_per_phys);
+}
+
+static unsigned int tcm_rbd_get_io_min(struct se_device *dev)
+{
+	struct tcm_rbd_dev *tcm_rbd_dev = TCM_RBD_DEV(dev);
+	struct block_device *bd = tcm_rbd_dev->bd;
+
+	return bdev_io_min(bd);
+}
+
+static unsigned int tcm_rbd_get_io_opt(struct se_device *dev)
+{
+	struct tcm_rbd_dev *tcm_rbd_dev = TCM_RBD_DEV(dev);
+	struct block_device *bd = tcm_rbd_dev->bd;
+
+	return bdev_io_opt(bd);
+}
+
+static struct sbc_ops tcm_rbd_sbc_ops = {
+	.execute_rw		= tcm_rbd_execute_rw,
+	.execute_sync_cache	= tcm_rbd_execute_sync_cache,
+	.execute_write_same	= tcm_rbd_execute_write_same,
+	.execute_unmap		= tcm_rbd_execute_unmap,
+	.execute_compare_and_write = tcm_rbd_execute_cmp_and_write,
+};
+
+static sense_reason_t tcm_rbd_parse_cdb(struct se_cmd *cmd)
+{
+	return sbc_parse_cdb(cmd, &tcm_rbd_sbc_ops);
+}
+
+static bool tcm_rbd_get_write_cache(struct se_device *dev)
+{
+	return false;
+}
+
+static const struct target_backend_ops tcm_rbd_ops = {
+	.name				= "rbd",
+	.inquiry_prod			= "RBD",
+	.inquiry_rev			= TCM_RBD_VERSION,
+	.owner				= THIS_MODULE,
+	.attach_hba			= tcm_rbd_attach_hba,
+	.detach_hba			= tcm_rbd_detach_hba,
+	.alloc_device			= tcm_rbd_alloc_device,
+	.configure_device		= tcm_rbd_configure_device,
+	.free_device			= tcm_rbd_free_device,
+	.parse_cdb			= tcm_rbd_parse_cdb,
+	.set_configfs_dev_params	= tcm_rbd_set_configfs_dev_params,
+	.show_configfs_dev_params	= tcm_rbd_show_configfs_dev_params,
+	.get_device_type		= sbc_get_device_type,
+	.get_blocks			= tcm_rbd_get_blocks,
+	.get_alignment_offset_lbas	= tcm_rbd_get_alignment_offset_lbas,
+	.get_lbppbe			= tcm_rbd_get_lbppbe,
+	.get_io_min			= tcm_rbd_get_io_min,
+	.get_io_opt			= tcm_rbd_get_io_opt,
+	.get_write_cache		= tcm_rbd_get_write_cache,
+	.tb_dev_attrib_attrs		= sbc_attrib_attrs,
+};
+
+static int __init tcm_rbd_module_init(void)
+{
+	return transport_backend_register(&tcm_rbd_ops);
+}
+
+static void __exit tcm_rbd_module_exit(void)
+{
+	target_backend_unregister(&tcm_rbd_ops);
+}
+
+MODULE_DESCRIPTION("TCM Ceph RBD subsystem plugin");
+MODULE_AUTHOR("Mike Christie");
+MODULE_LICENSE("GPL");
+
+module_init(tcm_rbd_module_init);
+module_exit(tcm_rbd_module_exit);
diff --git a/drivers/target/target_core_rbd.h b/drivers/target/target_core_rbd.h
new file mode 100644
index 0000000..2859101
--- /dev/null
+++ b/drivers/target/target_core_rbd.h
@@ -0,0 +1,23 @@
+#ifndef TARGET_CORE_TCM_RBD_H
+#define TARGET_CORE_TCM_RBD_H
+
+#define TCM_RBD_VERSION		"4.0"
+
+#define TCM_RBD_HAS_UDEV_PATH		0x01
+
+struct scatterlist;
+
+struct tcm_rbd_dev {
+	struct se_device dev;
+	struct rbd_device *rbd_dev;
+
+	struct scatterlist *cmp_and_write_sg;
+	struct page **cmp_and_write_pages;
+
+	unsigned char bd_udev_path[SE_UDEV_PATH_LEN];
+	u32 bd_flags;
+	struct block_device *bd;
+	bool bd_readonly;
+} ____cacheline_aligned;
+
+#endif /* TARGET_CORE_IBLOCK_H */
-- 
1.8.3.1


^ permalink raw reply related	[flat|nested] 33+ messages in thread

* [PATCH 18/18] target: add lio rbd to makefile/Kconfig
  2015-07-29  9:23 [PATCH 01/18] libceph: add scatterlist messenger data type mchristi
                   ` (15 preceding siblings ...)
  2015-07-29  9:23 ` [PATCH 17/18] target: add rbd backend mchristi
@ 2015-07-29  9:23 ` mchristi
  2015-07-29 13:34 ` [PATCH 01/18] libceph: add scatterlist messenger data type Alex Elder
                   ` (2 subsequent siblings)
  19 siblings, 0 replies; 33+ messages in thread
From: mchristi @ 2015-07-29  9:23 UTC (permalink / raw)
  To: ceph-devel, target-devel

From: Mike Christie <michaelc@cs.wisc.edu>

Add lio rbd backend module to target Makefile and Kconfig.

Signed-off-by: Mike Christie <michaelc@cs.wisc.edu>
---
 drivers/target/Kconfig  | 6 ++++++
 drivers/target/Makefile | 1 +
 2 files changed, 7 insertions(+)

diff --git a/drivers/target/Kconfig b/drivers/target/Kconfig
index 2573612..29c51c2 100644
--- a/drivers/target/Kconfig
+++ b/drivers/target/Kconfig
@@ -39,6 +39,12 @@ config TCM_USER2
 	process to handle requests. This is version 2 of the ABI; version 1
 	is obsolete.
 
+config TCM_RBD
+	tristate "TCM/RBD Subsystem Plugin for Linux/RBD"
+	help
+	Say Y here to enable the TCM/RBD subsystem plugin for Ceph RBD
+	access.
+
 source "drivers/target/loopback/Kconfig"
 source "drivers/target/tcm_fc/Kconfig"
 source "drivers/target/iscsi/Kconfig"
diff --git a/drivers/target/Makefile b/drivers/target/Makefile
index e619c02..6011d05 100644
--- a/drivers/target/Makefile
+++ b/drivers/target/Makefile
@@ -23,6 +23,7 @@ obj-$(CONFIG_TCM_IBLOCK)	+= target_core_iblock.o
 obj-$(CONFIG_TCM_FILEIO)	+= target_core_file.o
 obj-$(CONFIG_TCM_PSCSI)		+= target_core_pscsi.o
 obj-$(CONFIG_TCM_USER2)		+= target_core_user.o
+obj-$(CONFIG_TCM_RBD)		+= target_core_rbd.o
 
 # Fabric modules
 obj-$(CONFIG_LOOPBACK_TARGET)	+= loopback/
-- 
1.8.3.1


^ permalink raw reply related	[flat|nested] 33+ messages in thread

* Re: [PATCH 01/18] libceph: add scatterlist messenger data type
  2015-07-29  9:23 [PATCH 01/18] libceph: add scatterlist messenger data type mchristi
                   ` (16 preceding siblings ...)
  2015-07-29  9:23 ` [PATCH 18/18] target: add lio rbd to makefile/Kconfig mchristi
@ 2015-07-29 13:34 ` Alex Elder
  2015-07-29 17:49   ` Mike Christie
  2015-07-29 17:55 ` Christoph Hellwig
  2016-02-04 10:33 ` David Disseldorp
  19 siblings, 1 reply; 33+ messages in thread
From: Alex Elder @ 2015-07-29 13:34 UTC (permalink / raw)
  To: mchristi, ceph-devel, target-devel

On 07/29/2015 04:23 AM, mchristi@redhat.com wrote:
> From: Mike Christie <michaelc@cs.wisc.edu>
> 
> LIO uses scatterlist for its page/data management. This patch
> adds a scatterlist messenger data type, so LIO can pass its sg
> down directly to rbd.
> 
> Signed-off-by: Mike Christie <michaelc@cs.wisc.edu>

I'm not going to be able to review all of these, and this
isnt' even a complete review.  But it's something...

You're clearly on the right track, but I want to provide
a meaningful review for correctness and design so I'm
looking for a bit more information.

> ---
>  include/linux/ceph/messenger.h  | 13 ++++++
>  include/linux/ceph/osd_client.h | 12 +++++-
>  net/ceph/messenger.c            | 96 +++++++++++++++++++++++++++++++++++++++++
>  net/ceph/osd_client.c           | 26 +++++++++++
>  4 files changed, 146 insertions(+), 1 deletion(-)
> 
> diff --git a/include/linux/ceph/messenger.h b/include/linux/ceph/messenger.h
> index 3775327..bc1bde8 100644
> --- a/include/linux/ceph/messenger.h
> +++ b/include/linux/ceph/messenger.h
> @@ -79,6 +79,7 @@ enum ceph_msg_data_type {
>  #ifdef CONFIG_BLOCK
>  	CEPH_MSG_DATA_BIO,	/* data source/destination is a bio list */
>  #endif /* CONFIG_BLOCK */
> +	CEPH_MSG_DATA_SG,	/* data source/destination is a scatterlist */
>  };
>  
>  static __inline__ bool ceph_msg_data_type_valid(enum ceph_msg_data_type type)
> @@ -90,6 +91,7 @@ static __inline__ bool ceph_msg_data_type_valid(enum ceph_msg_data_type type)
>  #ifdef CONFIG_BLOCK
>  	case CEPH_MSG_DATA_BIO:
>  #endif /* CONFIG_BLOCK */
> +	case CEPH_MSG_DATA_SG:
>  		return true;
>  	default:
>  		return false;
> @@ -112,6 +114,11 @@ struct ceph_msg_data {
>  			unsigned int	alignment;	/* first page */
>  		};
>  		struct ceph_pagelist	*pagelist;
> +		struct {
> +			struct scatterlist *sgl;
> +			unsigned int	sgl_init_offset;
> +			u64		sgl_length;
> +		};

Can you supply a short explanation of what these fields
represent?  It seems sgl_init_offset is the offset of the
starting byte in the sgl, but is the purpose of that for
page offset calculation, or does it represent an offset
into the total length of the sgl?  Or put another way,
does sgl_init_offset represent some portion of the
sgl_length that has been consumed already (and so the
initial residual length is sgl_length - sgl_init_offset)?

>  	};
>  };
>  
> @@ -139,6 +146,10 @@ struct ceph_msg_data_cursor {
>  			struct page	*page;		/* page from list */
>  			size_t		offset;		/* bytes from list */
>  		};
> +		struct {
> +			struct scatterlist	*sg;		/* curr sg */

							/* current sg */

> +			unsigned int		sg_consumed;

Here too, what does sg_consumed represent with respect to the
initial offset and the length?

I guess I'm going to stop with that.  It'll be a lot easier
for me to review this if I'm sure I'm sure I understand what
these represent.  Thanks.

					-Alex

> +		};
>  	};
>  };
>  

. . .

^ permalink raw reply	[flat|nested] 33+ messages in thread

* Re: [PATCH 17/18] target: add rbd backend
  2015-07-29  9:23 ` [PATCH 17/18] target: add rbd backend mchristi
@ 2015-07-29 14:27   ` Bart Van Assche
  2015-07-29 17:07     ` Mike Christie
  0 siblings, 1 reply; 33+ messages in thread
From: Bart Van Assche @ 2015-07-29 14:27 UTC (permalink / raw)
  To: mchristi, ceph-devel, target-devel

On 07/29/15 02:23, mchristi@redhat.com wrote:
> +static sector_t tcm_rbd_get_blocks(struct se_device *dev)
> +{
> +	struct tcm_rbd_dev *tcm_rbd_dev = TCM_RBD_DEV(dev);
> +	sector_t blocks_long = tcm_rbd_dev->rbd_dev->mapping.size >>
> +								SECTOR_SHIFT;
> +
> +	if (SECTOR_SIZE == dev->dev_attrib.block_size)
> +		return blocks_long;
> +
> +	switch (SECTOR_SIZE) {
> +	case 4096:
> +		switch (dev->dev_attrib.block_size) {
> +		case 2048:
> +			blocks_long <<= 1;
> +			break;
> +		case 1024:
> +			blocks_long <<= 2;
> +			break;
> +		case 512:
> +			blocks_long <<= 3;
> +		default:
> +			break;
> +		}
> +		break;
> +	case 2048:
> +		switch (dev->dev_attrib.block_size) {
> +		case 4096:
> +			blocks_long >>= 1;
> +			break;
> +		case 1024:
> +			blocks_long <<= 1;
> +			break;
> +		case 512:
> +			blocks_long <<= 2;
> +			break;
> +		default:
> +			break;
> +		}
> +		break;
> +	case 1024:
> +		switch (dev->dev_attrib.block_size) {
> +		case 4096:
> +			blocks_long >>= 2;
> +			break;
> +		case 2048:
> +			blocks_long >>= 1;
> +			break;
> +		case 512:
> +			blocks_long <<= 1;
> +			break;
> +		default:
> +			break;
> +		}
> +		break;
> +	case 512:
> +		switch (dev->dev_attrib.block_size) {
> +		case 4096:
> +			blocks_long >>= 3;
> +			break;
> +		case 2048:
> +			blocks_long >>= 2;
> +			break;
> +		case 1024:
> +			blocks_long >>= 1;
> +			break;
> +		default:
> +			break;
> +		}
> +		break;
> +	default:
> +		break;
> +	}
> +
> +	return blocks_long;
> +}

Hello Mike,

Had you already considered to replace the above switch / case statement 
by something like the following ?

static sector_t tcm_rbd_get_blocks(struct se_device *dev)
{
	return TCM_RBD_DEV(dev)->rbd_dev->mapping.size >>
				ilog2(dev->dev_attrib.block_size);
}

Thanks,

Bart.

^ permalink raw reply	[flat|nested] 33+ messages in thread

* Re: [PATCH 17/18] target: add rbd backend
  2015-07-29 14:27   ` Bart Van Assche
@ 2015-07-29 17:07     ` Mike Christie
  0 siblings, 0 replies; 33+ messages in thread
From: Mike Christie @ 2015-07-29 17:07 UTC (permalink / raw)
  To: Bart Van Assche, ceph-devel, target-devel

On 07/29/2015 09:27 AM, Bart Van Assche wrote:
> On 07/29/15 02:23, mchristi@redhat.com wrote:
>> +static sector_t tcm_rbd_get_blocks(struct se_device *dev)
>> +{
>> +    struct tcm_rbd_dev *tcm_rbd_dev = TCM_RBD_DEV(dev);
>> +    sector_t blocks_long = tcm_rbd_dev->rbd_dev->mapping.size >>
>> +                                SECTOR_SHIFT;
>> +
>> +    if (SECTOR_SIZE == dev->dev_attrib.block_size)
>> +        return blocks_long;
>> +
>> +    switch (SECTOR_SIZE) {
>> +    case 4096:
>> +        switch (dev->dev_attrib.block_size) {
>> +        case 2048:
>> +            blocks_long <<= 1;
>> +            break;
>> +        case 1024:
>> +            blocks_long <<= 2;
>> +            break;
>> +        case 512:
>> +            blocks_long <<= 3;
>> +        default:
>> +            break;
>> +        }
>> +        break;
>> +    case 2048:
>> +        switch (dev->dev_attrib.block_size) {
>> +        case 4096:
>> +            blocks_long >>= 1;
>> +            break;
>> +        case 1024:
>> +            blocks_long <<= 1;
>> +            break;
>> +        case 512:
>> +            blocks_long <<= 2;
>> +            break;
>> +        default:
>> +            break;
>> +        }
>> +        break;
>> +    case 1024:
>> +        switch (dev->dev_attrib.block_size) {
>> +        case 4096:
>> +            blocks_long >>= 2;
>> +            break;
>> +        case 2048:
>> +            blocks_long >>= 1;
>> +            break;
>> +        case 512:
>> +            blocks_long <<= 1;
>> +            break;
>> +        default:
>> +            break;
>> +        }
>> +        break;
>> +    case 512:
>> +        switch (dev->dev_attrib.block_size) {
>> +        case 4096:
>> +            blocks_long >>= 3;
>> +            break;
>> +        case 2048:
>> +            blocks_long >>= 2;
>> +            break;
>> +        case 1024:
>> +            blocks_long >>= 1;
>> +            break;
>> +        default:
>> +            break;
>> +        }
>> +        break;
>> +    default:
>> +        break;
>> +    }
>> +
>> +    return blocks_long;
>> +}
> 
> Hello Mike,
> 
> Had you already considered to replace the above switch / case statement
> by something like the following ?
> 
> static sector_t tcm_rbd_get_blocks(struct se_device *dev)
> {
>     return TCM_RBD_DEV(dev)->rbd_dev->mapping.size >>
>                 ilog2(dev->dev_attrib.block_size);
> }
> 

Thanks. That code was copy and pasted and slightly modified from iblock.
I will code up your suggestion in the next posting.

^ permalink raw reply	[flat|nested] 33+ messages in thread

* Re: [PATCH 01/18] libceph: add scatterlist messenger data type
  2015-07-29 13:34 ` [PATCH 01/18] libceph: add scatterlist messenger data type Alex Elder
@ 2015-07-29 17:49   ` Mike Christie
  0 siblings, 0 replies; 33+ messages in thread
From: Mike Christie @ 2015-07-29 17:49 UTC (permalink / raw)
  To: Alex Elder, ceph-devel, target-devel

On 07/29/2015 08:34 AM, Alex Elder wrote:
> On 07/29/2015 04:23 AM, mchristi@redhat.com wrote:
>> From: Mike Christie <michaelc@cs.wisc.edu>
>>
>> LIO uses scatterlist for its page/data management. This patch
>> adds a scatterlist messenger data type, so LIO can pass its sg
>> down directly to rbd.
>>
>> Signed-off-by: Mike Christie <michaelc@cs.wisc.edu>
> 
> I'm not going to be able to review all of these, and this
> isnt' even a complete review.  But it's something...

No problem. Thanks for any comments.

> 
> You're clearly on the right track, but I want to provide
> a meaningful review for correctness and design so I'm
> looking for a bit more information.
> 
>> ---
>>  include/linux/ceph/messenger.h  | 13 ++++++
>>  include/linux/ceph/osd_client.h | 12 +++++-
>>  net/ceph/messenger.c            | 96 +++++++++++++++++++++++++++++++++++++++++
>>  net/ceph/osd_client.c           | 26 +++++++++++
>>  4 files changed, 146 insertions(+), 1 deletion(-)
>>
>> diff --git a/include/linux/ceph/messenger.h b/include/linux/ceph/messenger.h
>> index 3775327..bc1bde8 100644
>> --- a/include/linux/ceph/messenger.h
>> +++ b/include/linux/ceph/messenger.h
>> @@ -79,6 +79,7 @@ enum ceph_msg_data_type {
>>  #ifdef CONFIG_BLOCK
>>  	CEPH_MSG_DATA_BIO,	/* data source/destination is a bio list */
>>  #endif /* CONFIG_BLOCK */
>> +	CEPH_MSG_DATA_SG,	/* data source/destination is a scatterlist */
>>  };
>>  
>>  static __inline__ bool ceph_msg_data_type_valid(enum ceph_msg_data_type type)
>> @@ -90,6 +91,7 @@ static __inline__ bool ceph_msg_data_type_valid(enum ceph_msg_data_type type)
>>  #ifdef CONFIG_BLOCK
>>  	case CEPH_MSG_DATA_BIO:
>>  #endif /* CONFIG_BLOCK */
>> +	case CEPH_MSG_DATA_SG:
>>  		return true;
>>  	default:
>>  		return false;
>> @@ -112,6 +114,11 @@ struct ceph_msg_data {
>>  			unsigned int	alignment;	/* first page */
>>  		};
>>  		struct ceph_pagelist	*pagelist;
>> +		struct {
>> +			struct scatterlist *sgl;
>> +			unsigned int	sgl_init_offset;
>> +			u64		sgl_length;
>> +		};
> 
> Can you supply a short explanation of what these fields
> represent?  It seems sgl_init_offset is the offset of the
> starting byte in the sgl, but is the purpose of that for
> page offset calculation, or does it represent an offset
> into the total length of the sgl?  Or put another way,
> does sgl_init_offset represent some portion of the
> sgl_length that has been consumed already (and so the
> initial ressidual length is sgl_length - sgl_init_offset)?


sgl - starting scatterlist entry we are going to send/receive to/from.

sgl_init_offset - byte offset in the sgl above we will start executing
from. It is for cases like where a LIO command crossed segment/object
boundaries, so we had to break it up, and the first obj request ended up
in the middle of a scatterlist entry. For the second obj request we set
the sgl to the sg we ended on in the first request, and then set the
sgl_init_offset to where we left off in the first request.

So it basically allows me to not have to clone the list similar to how
the bio code does it. However, if we did clone it then I could just
manipulate the cloned sg's sg->offset instead of adding the
sgl_init_offset field.

sgl_length - number of bytes in the sgl we are going to send/receive.
This also is for the case where we broke up the LIO command into
multiple obj requests.


> 
>>  	};
>>  };
>>  
>> @@ -139,6 +146,10 @@ struct ceph_msg_data_cursor {
>>  			struct page	*page;		/* page from list */
>>  			size_t		offset;		/* bytes from list */
>>  		};
>> +		struct {
>> +			struct scatterlist	*sg;		/* curr sg */
> 
> 							/* current sg */
> 
>> +			unsigned int		sg_consumed;
> 
> Here too, what does sg_consumed represent with respect to the
> initial offset and the length?

It the number of bytes in the sgl we have sent/received. It is used by
the messenger advance code to track if we need to advance to the next sg
or if we still have data left from the current one.

^ permalink raw reply	[flat|nested] 33+ messages in thread

* Re: [PATCH 01/18] libceph: add scatterlist messenger data type
  2015-07-29  9:23 [PATCH 01/18] libceph: add scatterlist messenger data type mchristi
                   ` (17 preceding siblings ...)
  2015-07-29 13:34 ` [PATCH 01/18] libceph: add scatterlist messenger data type Alex Elder
@ 2015-07-29 17:55 ` Christoph Hellwig
  2015-07-29 22:59   ` Mike Christie
  2016-02-04 10:33 ` David Disseldorp
  19 siblings, 1 reply; 33+ messages in thread
From: Christoph Hellwig @ 2015-07-29 17:55 UTC (permalink / raw)
  To: mchristi; +Cc: ceph-devel, target-devel

On Wed, Jul 29, 2015 at 04:23:38AM -0500, mchristi@redhat.com wrote:
> From: Mike Christie <michaelc@cs.wisc.edu>
> 
> LIO uses scatterlist for its page/data management. This patch
> adds a scatterlist messenger data type, so LIO can pass its sg
> down directly to rbd.

Just as I mentioned for David's patches this is the wrong way to attack
your problem.  The block layer already supports WRITE SAME, and COMPARE
and WRITE nees to be supported at that level too insted of creating
artifical bypasses.

^ permalink raw reply	[flat|nested] 33+ messages in thread

* Re: [PATCH 01/18] libceph: add scatterlist messenger data type
  2015-07-29 17:55 ` Christoph Hellwig
@ 2015-07-29 22:59   ` Mike Christie
  2015-07-29 23:40     ` Mike Christie
  0 siblings, 1 reply; 33+ messages in thread
From: Mike Christie @ 2015-07-29 22:59 UTC (permalink / raw)
  To: Christoph Hellwig; +Cc: ceph-devel, target-devel

On 07/29/2015 12:55 PM, Christoph Hellwig wrote:
> On Wed, Jul 29, 2015 at 04:23:38AM -0500, mchristi@redhat.com wrote:
>> From: Mike Christie <michaelc@cs.wisc.edu>
>>
>> LIO uses scatterlist for its page/data management. This patch
>> adds a scatterlist messenger data type, so LIO can pass its sg
>> down directly to rbd.
> 
> Just as I mentioned for David's patches this is the wrong way to attack
> your problem.  The block layer already supports WRITE SAME, and COMPARE
> and WRITE nees to be supported at that level too insted of creating
> artifical bypasses.

Why do I have to use the block layer? I just want to map the se_cmd to a
ceph request and then put it back on the wire. I don't think I need any
of the block layer services. We will do things like io scheduling on the
OSD side. We just want to use LIO as more of a passthrough.

^ permalink raw reply	[flat|nested] 33+ messages in thread

* Re: [PATCH 01/18] libceph: add scatterlist messenger data type
  2015-07-29 22:59   ` Mike Christie
@ 2015-07-29 23:40     ` Mike Christie
  2015-07-30  7:34       ` Nicholas A. Bellinger
  2015-07-30 14:55       ` Christoph Hellwig
  0 siblings, 2 replies; 33+ messages in thread
From: Mike Christie @ 2015-07-29 23:40 UTC (permalink / raw)
  To: Christoph Hellwig; +Cc: ceph-devel, target-devel

On 07/29/2015 05:59 PM, Mike Christie wrote:
> On 07/29/2015 12:55 PM, Christoph Hellwig wrote:
>> On Wed, Jul 29, 2015 at 04:23:38AM -0500, mchristi@redhat.com wrote:
>>> From: Mike Christie <michaelc@cs.wisc.edu>
>>>
>>> LIO uses scatterlist for its page/data management. This patch
>>> adds a scatterlist messenger data type, so LIO can pass its sg
>>> down directly to rbd.
>>
>> Just as I mentioned for David's patches this is the wrong way to attack
>> your problem.  The block layer already supports WRITE SAME, and COMPARE
>> and WRITE nees to be supported at that level too insted of creating
>> artifical bypasses.
> 
> Why do I have to use the block layer? I just want to map the se_cmd to a
> ceph request and then put it back on the wire. I don't think I need any
> of the block layer services. We will do things like io scheduling on the
> OSD side. We just want to use LIO as more of a passthrough.

Maybe I misunderstood you.

I guess I was viewing this similar to cephfs where it does not use rbd
and the block layer. It just makes ceph/rados calls directly using
libceph. I am using rbd.c for its helper/wrapper functions around the
libceph ones, but I could just make libceph calls directly too.

Were you saying because for lio support we need to do more block
layer'ish operations like write same, compare and write, etc than
cephfs, then I should not do the lio backend and we should always go
through rbd for lio support?

Is that for all operations? For distributed TMFs and PRs then are you
thinking I should make those more block layer based (some sort of queue
or block deivce callouts or REQ_ types), or should those still have some
sort of lio callouts which could call different locking/cluster APIs
like libceph?

^ permalink raw reply	[flat|nested] 33+ messages in thread

* Re: [PATCH 01/18] libceph: add scatterlist messenger data type
  2015-07-29 23:40     ` Mike Christie
@ 2015-07-30  7:34       ` Nicholas A. Bellinger
  2015-07-30 14:55       ` Christoph Hellwig
  1 sibling, 0 replies; 33+ messages in thread
From: Nicholas A. Bellinger @ 2015-07-30  7:34 UTC (permalink / raw)
  To: Mike Christie; +Cc: Christoph Hellwig, ceph-devel, target-devel

Hey Mike & HCH,

On Wed, 2015-07-29 at 18:40 -0500, Mike Christie wrote:
> On 07/29/2015 05:59 PM, Mike Christie wrote:
> > On 07/29/2015 12:55 PM, Christoph Hellwig wrote:
> >> On Wed, Jul 29, 2015 at 04:23:38AM -0500, mchristi@redhat.com wrote:
> >>> From: Mike Christie <michaelc@cs.wisc.edu>
> >>>
> >>> LIO uses scatterlist for its page/data management. This patch
> >>> adds a scatterlist messenger data type, so LIO can pass its sg
> >>> down directly to rbd.
> >>
> >> Just as I mentioned for David's patches this is the wrong way to attack
> >> your problem.  The block layer already supports WRITE SAME, and COMPARE
> >> and WRITE nees to be supported at that level too insted of creating
> >> artifical bypasses.
> > 
> > Why do I have to use the block layer? I just want to map the se_cmd to a
> > ceph request and then put it back on the wire. I don't think I need any
> > of the block layer services. We will do things like io scheduling on the
> > OSD side. We just want to use LIO as more of a passthrough.
> 
> Maybe I misunderstood you.
> 
> I guess I was viewing this similar to cephfs where it does not use rbd
> and the block layer. It just makes ceph/rados calls directly using
> libceph. I am using rbd.c for its helper/wrapper functions around the
> libceph ones, but I could just make libceph calls directly too.
> 
> Were you saying because for lio support we need to do more block
> layer'ish operations like write same, compare and write, etc than
> cephfs, then I should not do the lio backend and we should always go
> through rbd for lio support?

If we're using common request_queue function pointers it would avoid the
need to maintain an extra backend drivers, and be a generic offload
interface for other make_request_fn() based block drivers using
target-core to utilize.

In the WRITE_SAME + COMPARE_AND_WRITE pass-through cases, IBLOCK se_cmd
pass-through should be invoking the driver provided VAAI callbacks
directly if they exist, and disabling local target-core CDB emulation.

For EXTENDED_COPY pass-through, target-core copy-manager still needs to
be responsible for config_group dependencies across multiple se_device
backends, with a IBLOCK pass-through providing both block_device *src_bd
+ *dst_bd pointers into a request_queue callback for different PUSH/PULL
offload models.

It should also be able to fall back to local copy if source +
destination devices do not both support the same type copy-offload
pass-through.

> 
> Is that for all operations? For distributed TMFs and PRs then are you
> thinking I should make those more block layer based (some sort of queue
> or block deivce callouts or REQ_ types), or should those still have some
> sort of lio callouts which could call different locking/cluster APIs
> like libceph?

The PR-OUT logic for REGISTER w/ remote I_PORT and remote PREEMPT-*
currently obtains the necessary config_group dependencies, and would
need to be considered for request_queue based PR pass-through too.

Exposing target TMFs pass-through into request_queue is a different
beast..


^ permalink raw reply	[flat|nested] 33+ messages in thread

* Re: [PATCH 01/18] libceph: add scatterlist messenger data type
  2015-07-29 23:40     ` Mike Christie
  2015-07-30  7:34       ` Nicholas A. Bellinger
@ 2015-07-30 14:55       ` Christoph Hellwig
  1 sibling, 0 replies; 33+ messages in thread
From: Christoph Hellwig @ 2015-07-30 14:55 UTC (permalink / raw)
  To: Mike Christie; +Cc: Christoph Hellwig, ceph-devel, target-devel

On Wed, Jul 29, 2015 at 06:40:01PM -0500, Mike Christie wrote:
> I guess I was viewing this similar to cephfs where it does not use rbd
> and the block layer. It just makes ceph/rados calls directly using
> libceph. I am using rbd.c for its helper/wrapper functions around the
> libceph ones, but I could just make libceph calls directly too.
> 
> Were you saying because for lio support we need to do more block
> layer'ish operations like write same, compare and write, etc than
> cephfs, then I should not do the lio backend and we should always go
> through rbd for lio support?

I'd really prefer that.  We have other users for these facilities as
well, and I'd much prefer having block layer support rather than working
around it.

> Is that for all operations? For distributed TMFs and PRs then are you
> thinking I should make those more block layer based (some sort of queue
> or block deivce callouts or REQ_ types), or should those still have some
> sort of lio callouts which could call different locking/cluster APIs
> like libceph?

Yes.  FYI, I've pushed out my WIP work for PRs here:

http://git.infradead.org/users/hch/scsi.git/shortlog/refs/heads/pr-api

TMFs are a bit of boderline case, but instead of needing special
bypasses I'd rather find a way to add them.  For example we already
have TMF ioctls for SCSI, so we might as well pull this up to the block
layer.

^ permalink raw reply	[flat|nested] 33+ messages in thread

* Re: [PATCH 12/18] target: compare and write backend driver sense handling
  2015-07-29  9:23 ` [PATCH 12/18] target: compare and write backend driver sense handling mchristi
@ 2015-09-04 19:41   ` Mike Christie
  2015-09-04 22:34     ` Andy Grover
  2015-09-06  6:38     ` Sagi Grimberg
  2015-09-06  7:12   ` Christoph Hellwig
  1 sibling, 2 replies; 33+ messages in thread
From: Mike Christie @ 2015-09-04 19:41 UTC (permalink / raw)
  To: ceph-devel, target-devel

Hey Nick and Christoph,

For the target_core_iblock + rbd/COMPARE_AND_WRITE support approach, I
still need a patch like below to be able to allow target_core_iblock.c
to be able to specify specific codes like TCM_MISCOMPARE_VERIFY.

Is the patch below ok? I made it work similar to how we have other
completion functions, target_complete_cmd_with_*, that take in extra
amounts of info like length or sense.

Instead, I can modify target_complete_cmd and all the callers so they
just take/pass the sense and info if needed:

void target_complete_cmd(struct se_cmd *cmd, u8 scsi_status,
			 sense_reason_t sense, u32 info)

On 07/29/2015 04:23 AM, mchristi@redhat.com wrote:
> From: Mike Christie <michaelc@cs.wisc.edu>
> 
> Currently, backend drivers seem to only fail IO with
> SAM_STAT_CHECK_CONDITION which gets us
> TCM_LOGICAL_UNIT_COMMUNICATION_FAILURE.
> For compare and write support we will want to be able to fail with
> TCM_MISCOMPARE_VERIFY. This patch adds a new helper that allows backend
> drivers to fail with specific sense codes.
> 
> It also allows the backend driver to set the miscompare offset.
> 
> Signed-off-by: Mike Christie <michaelc@cs.wisc.edu>
> ---
>  drivers/target/target_core_transport.c | 33 ++++++++++++++++++++++++++++-----
>  include/target/target_core_backend.h   |  1 +
>  include/target/target_core_base.h      |  5 ++++-
>  3 files changed, 33 insertions(+), 6 deletions(-)
> 
> diff --git a/drivers/target/target_core_transport.c b/drivers/target/target_core_transport.c
> index ce8574b..f9b0527 100644
> --- a/drivers/target/target_core_transport.c
> +++ b/drivers/target/target_core_transport.c
> @@ -639,8 +639,7 @@ static void target_complete_failure_work(struct work_struct *work)
>  {
>  	struct se_cmd *cmd = container_of(work, struct se_cmd, work);
>  
> -	transport_generic_request_failure(cmd,
> -			TCM_LOGICAL_UNIT_COMMUNICATION_FAILURE);
> +	transport_generic_request_failure(cmd, cmd->sense_reason);
>  }
>  
>  /*
> @@ -666,14 +665,15 @@ static unsigned char *transport_get_sense_buffer(struct se_cmd *cmd)
>  	return cmd->sense_buffer;
>  }
>  
> -void target_complete_cmd(struct se_cmd *cmd, u8 scsi_status)
> +static void __target_complete_cmd(struct se_cmd *cmd, u8 scsi_status,
> +				  sense_reason_t sense_reason)
>  {
>  	struct se_device *dev = cmd->se_dev;
>  	int success = scsi_status == GOOD;
>  	unsigned long flags;
>  
>  	cmd->scsi_status = scsi_status;
> -
> +	cmd->sense_reason = sense_reason;
>  
>  	spin_lock_irqsave(&cmd->t_state_lock, flags);
>  	cmd->transport_state &= ~CMD_T_BUSY;
> @@ -716,8 +716,22 @@ void target_complete_cmd(struct se_cmd *cmd, u8 scsi_status)
>  
>  	queue_work(target_completion_wq, &cmd->work);
>  }
> +
> +void target_complete_cmd(struct se_cmd *cmd, u8 scsi_status)
> +{
> +	__target_complete_cmd(cmd, scsi_status, scsi_status ?
> +			     TCM_LOGICAL_UNIT_COMMUNICATION_FAILURE :
> +			     TCM_NO_SENSE);
> +}
>  EXPORT_SYMBOL(target_complete_cmd);
>  
> +void target_complete_cmd_with_sense(struct se_cmd *cmd,
> +				    sense_reason_t sense_reason)
> +{
> +	__target_complete_cmd(cmd, SAM_STAT_CHECK_CONDITION, sense_reason);
> +}
> +EXPORT_SYMBOL(target_complete_cmd_with_sense);
> +
>  void target_complete_cmd_with_length(struct se_cmd *cmd, u8 scsi_status, int length)
>  {
>  	if (scsi_status == SAM_STAT_GOOD && length < cmd->data_length) {
> @@ -1650,6 +1664,7 @@ void transport_generic_request_failure(struct se_cmd *cmd,
>  	case TCM_LOGICAL_BLOCK_GUARD_CHECK_FAILED:
>  	case TCM_LOGICAL_BLOCK_APP_TAG_CHECK_FAILED:
>  	case TCM_LOGICAL_BLOCK_REF_TAG_CHECK_FAILED:
> +	case TCM_MISCOMPARE_VERIFY:
>  		break;
>  	case TCM_OUT_OF_RESOURCES:
>  		sense_reason = TCM_LOGICAL_UNIT_COMMUNICATION_FAILURE;
> @@ -2633,12 +2648,19 @@ void transport_err_sector_info(unsigned char *buffer, sector_t bad_sector)
>  	buffer[SPC_ADD_SENSE_LEN_OFFSET] = 0xc;
>  	buffer[SPC_DESC_TYPE_OFFSET] = 0; /* Information */
>  	buffer[SPC_ADDITIONAL_DESC_LEN_OFFSET] = 0xa;
> -	buffer[SPC_VALIDITY_OFFSET] = 0x80;
> +	buffer[SPC_CMD_INFO_VALIDITY_OFFSET] = 0x80;
>  
>  	/* Descriptor Information: failing sector */
>  	put_unaligned_be64(bad_sector, &buffer[12]);
>  }
>  
> +static void transport_err_sense_info(unsigned char *buffer, u32 info)
> +{
> +	buffer[SPC_INFO_VALIDITY_OFFSET] |= 0x80;
> +	/* Sense Information */
> +	put_unaligned_be32(info, &buffer[3]);
> +}
> +
>  int
>  transport_send_check_condition_and_sense(struct se_cmd *cmd,
>  		sense_reason_t reason, int from_transport)
> @@ -2831,6 +2853,7 @@ transport_send_check_condition_and_sense(struct se_cmd *cmd,
>  		/* MISCOMPARE DURING VERIFY OPERATION */
>  		buffer[SPC_ASC_KEY_OFFSET] = 0x1d;
>  		buffer[SPC_ASCQ_KEY_OFFSET] = 0x00;
> +		transport_err_sense_info(buffer, cmd->sense_info);
>  		break;
>  	case TCM_LOGICAL_BLOCK_GUARD_CHECK_FAILED:
>  		/* CURRENT ERROR */
> diff --git a/include/target/target_core_backend.h b/include/target/target_core_backend.h
> index ec5a09f..c98f6e6 100644
> --- a/include/target/target_core_backend.h
> +++ b/include/target/target_core_backend.h
> @@ -58,6 +58,7 @@ int	transport_backend_register(const struct target_backend_ops *);
>  void	target_backend_unregister(const struct target_backend_ops *);
>  
>  void	target_complete_cmd(struct se_cmd *, u8);
> +void	target_complete_cmd_with_sense(struct se_cmd *, sense_reason_t);
>  void	target_complete_cmd_with_length(struct se_cmd *, u8, int);
>  
>  sense_reason_t	spc_parse_cdb(struct se_cmd *cmd, unsigned int *size);
> diff --git a/include/target/target_core_base.h b/include/target/target_core_base.h
> index 17ae2d6..b83a8ec 100644
> --- a/include/target/target_core_base.h
> +++ b/include/target/target_core_base.h
> @@ -22,11 +22,12 @@
>   */
>  #define TRANSPORT_SENSE_BUFFER			96
>  /* Used by transport_send_check_condition_and_sense() */
> +#define SPC_INFO_VALIDITY_OFFSET		0
>  #define SPC_SENSE_KEY_OFFSET			2
>  #define SPC_ADD_SENSE_LEN_OFFSET		7
>  #define SPC_DESC_TYPE_OFFSET			8
>  #define SPC_ADDITIONAL_DESC_LEN_OFFSET		9
> -#define SPC_VALIDITY_OFFSET			10
> +#define SPC_CMD_INFO_VALIDITY_OFFSET		10
>  #define SPC_ASC_KEY_OFFSET			12
>  #define SPC_ASCQ_KEY_OFFSET			13
>  #define TRANSPORT_IQN_LEN			224
> @@ -439,6 +440,8 @@ struct se_dif_v1_tuple {
>  #define TCM_ACA_TAG	0x24
>  
>  struct se_cmd {
> +	sense_reason_t		sense_reason;
> +	u32			sense_info;
>  	/* SAM response code being sent to initiator */
>  	u8			scsi_status;
>  	u8			scsi_asc;
> 

^ permalink raw reply	[flat|nested] 33+ messages in thread

* Re: [PATCH 12/18] target: compare and write backend driver sense handling
  2015-09-04 19:41   ` Mike Christie
@ 2015-09-04 22:34     ` Andy Grover
  2015-09-06  6:38     ` Sagi Grimberg
  1 sibling, 0 replies; 33+ messages in thread
From: Andy Grover @ 2015-09-04 22:34 UTC (permalink / raw)
  To: Mike Christie, ceph-devel, target-devel

On 09/04/2015 12:41 PM, Mike Christie wrote:
> Hey Nick and Christoph,
>
> For the target_core_iblock + rbd/COMPARE_AND_WRITE support approach, I
> still need a patch like below to be able to allow target_core_iblock.c
> to be able to specify specific codes like TCM_MISCOMPARE_VERIFY.
>
> Is the patch below ok? I made it work similar to how we have other
> completion functions, target_complete_cmd_with_*, that take in extra
> amounts of info like length or sense.
>
> Instead, I can modify target_complete_cmd and all the callers so they
> just take/pass the sense and info if needed:
>
> void target_complete_cmd(struct se_cmd *cmd, u8 scsi_status,
> 			 sense_reason_t sense, u32 info)

We also want user/pscsi passthrough to be able to return correct sense 
info, and your patch would let them do this.

-- Andy

^ permalink raw reply	[flat|nested] 33+ messages in thread

* Re: [PATCH 12/18] target: compare and write backend driver sense handling
  2015-09-04 19:41   ` Mike Christie
  2015-09-04 22:34     ` Andy Grover
@ 2015-09-06  6:38     ` Sagi Grimberg
  1 sibling, 0 replies; 33+ messages in thread
From: Sagi Grimberg @ 2015-09-06  6:38 UTC (permalink / raw)
  To: Mike Christie, ceph-devel, target-devel

On 9/4/2015 10:41 PM, Mike Christie wrote:
> Hey Nick and Christoph,

Hi Mike,

>
> For the target_core_iblock + rbd/COMPARE_AND_WRITE support approach, I
> still need a patch like below to be able to allow target_core_iblock.c
> to be able to specify specific codes like TCM_MISCOMPARE_VERIFY.

I just posted a couple of patches to use scsi helpers to set the
sense buffer with/without information (see translate_sense_reason).
You can just add TCM_MISCOMPARE_VERIFY entry to sense_info_table with
.add_sector_info = true.

Moreover, we have a field in the command structure to keep the error
sector (used in DIF). So you should go ahead and use that.

Sagi.

^ permalink raw reply	[flat|nested] 33+ messages in thread

* Re: [PATCH 12/18] target: compare and write backend driver sense handling
  2015-07-29  9:23 ` [PATCH 12/18] target: compare and write backend driver sense handling mchristi
  2015-09-04 19:41   ` Mike Christie
@ 2015-09-06  7:12   ` Christoph Hellwig
  1 sibling, 0 replies; 33+ messages in thread
From: Christoph Hellwig @ 2015-09-06  7:12 UTC (permalink / raw)
  To: mchristi; +Cc: ceph-devel, target-devel

On Wed, Jul 29, 2015 at 04:23:49AM -0500, mchristi@redhat.com wrote:
> From: Mike Christie <michaelc@cs.wisc.edu>
> 
> Currently, backend drivers seem to only fail IO with
> SAM_STAT_CHECK_CONDITION which gets us
> TCM_LOGICAL_UNIT_COMMUNICATION_FAILURE.
> For compare and write support we will want to be able to fail with
> TCM_MISCOMPARE_VERIFY. This patch adds a new helper that allows backend
> drivers to fail with specific sense codes.
> 
> It also allows the backend driver to set the miscompare offset.

I agree that we should allwo for better passing of sense data, but I
also think we need to redo the sense handling instead of adding more
warts.

One premise is that with various updates to the standards it will become
more common to generate sense data even if we did not fail the whole
command, so this might be a good opportunity to preparate for that.

> diff --git a/drivers/target/target_core_transport.c b/drivers/target/target_core_transport.c
> index ce8574b..f9b0527 100644
> --- a/drivers/target/target_core_transport.c
> +++ b/drivers/target/target_core_transport.c
> @@ -639,8 +639,7 @@ static void target_complete_failure_work(struct work_struct *work)
>  {
>  	struct se_cmd *cmd = container_of(work, struct se_cmd, work);
>  
> -	transport_generic_request_failure(cmd,
> -			TCM_LOGICAL_UNIT_COMMUNICATION_FAILURE);
> +	transport_generic_request_failure(cmd, cmd->sense_reason);
>  }

So I think we should merge target_complete_failure_work and
target_complete_ok_work as a first step.

Then as a second do away with transport_generic_request_failure and just
have single target_complete_cmd that will return success or error based
on the scsi_status field an generate sense if cmd->sense_reason is set.

Third we should replace SCF_TRANSPORT_TASK_SENSE and
SCF_EMULATED_TASK_SENSE with a single driver visible flag and instead
have a new TCM_PASSTHROUGH_SENSE sense code to not generate new sense
data if pscsi passed on sense data.

>  struct se_cmd {
> +	sense_reason_t		sense_reason;

At this point you should probably also remove the sense_reason from the
iscsi_cmd now that it's in the generic CMD.

^ permalink raw reply	[flat|nested] 33+ messages in thread

* Re: [PATCH 04/18] libceph: support bidirectional requests
  2015-07-29  9:23 ` [PATCH 04/18] libceph: support bidirectional requests mchristi
@ 2015-11-21 23:32   ` Goldwyn Rodrigues
  0 siblings, 0 replies; 33+ messages in thread
From: Goldwyn Rodrigues @ 2015-11-21 23:32 UTC (permalink / raw)
  To: mchristi, ceph-devel, target-devel

Hi Mike,

On 07/29/2015 04:23 AM, mchristi@redhat.com wrote:
> From: Mike Christie <michaelc@cs.wisc.edu>
>
> The next patch will add support for SCSI's compare and write
> command. This command sends N bytes, compares them to N bytes on disk,
> then returns success or the offset in the buffer where a miscompare
> occured. For Ceph support, I implemented this as a multiple op request:
>
> 1. a new CMPEXT (compare extent) operation that compare N bytes
> and if a miscompare occured then returns the offset it miscompared
> and also returns the buffer.
> 2. a write request. If the CMPEXT succeeds then this will be executed.
>
> This patch modifies libceph so it can support both a request buffer
> and response buffer for extent based IO, so the CMPEXT command can
> send its comparision buffer and also receive the failed buffer if needed.
>
> Signed-off-by: Mike Christie <michaelc@cs.wisc.edu>
> ---
>   fs/ceph/addr.c                  |   4 +-
>   include/linux/ceph/osd_client.h |   3 +-
>   net/ceph/osd_client.c           | 109 +++++++++++++++++++++++++++++++---------
>   3 files changed, 89 insertions(+), 27 deletions(-)
>
> diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c
> index 890c509..0360b44 100644
> --- a/fs/ceph/addr.c
> +++ b/fs/ceph/addr.c
> @@ -269,7 +269,7 @@ static void finish_read(struct ceph_osd_request *req, struct ceph_msg *msg)
>   	dout("finish_read %p req %p rc %d bytes %d\n", inode, req, rc, bytes);
>
>   	/* unlock all pages, zeroing any data we didn't read */
> -	osd_data = osd_req_op_extent_osd_data(req, 0);
> +	osd_data = osd_req_op_extent_osd_response_data(req, 0);
>   	BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_PAGES);
>   	num_pages = calc_pages_for((u64)osd_data->alignment,
>   					(u64)osd_data->length);
> @@ -618,7 +618,7 @@ static void writepages_finish(struct ceph_osd_request *req,
>   	long writeback_stat;
>   	unsigned issued = ceph_caps_issued(ci);
>
> -	osd_data = osd_req_op_extent_osd_data(req, 0);
> +	osd_data = osd_req_op_extent_osd_request_data(req, 0);

Both these functions should be added in include/linux/ceph/osd_client.h 
so that this (cephfs) compiles. And, osd_req_op_extent_osd_data should 
be removed.

-- 
Goldwyn

>   	BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_PAGES);
>   	num_pages = calc_pages_for((u64)osd_data->alignment,
>   					(u64)osd_data->length);
> diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h
> index 2152f06..e737173 100644
> --- a/include/linux/ceph/osd_client.h
> +++ b/include/linux/ceph/osd_client.h
> @@ -90,7 +90,8 @@ struct ceph_osd_req_op {
>   			u64 offset, length;
>   			u64 truncate_size;
>   			u32 truncate_seq;
> -			struct ceph_osd_data osd_data;
> +			struct ceph_osd_data request_data;
> +			struct ceph_osd_data response_data;
>   		} extent;
>   		struct {
>   			u32 name_len;
> diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
> index fd0a52e..3bf0849 100644
> --- a/net/ceph/osd_client.c
> +++ b/net/ceph/osd_client.c
> @@ -153,12 +153,20 @@ osd_req_op_raw_data_in(struct ceph_osd_request *osd_req, unsigned int which)
>   }
>
>   struct ceph_osd_data *
> -osd_req_op_extent_osd_data(struct ceph_osd_request *osd_req,
> -			unsigned int which)
> +osd_req_op_extent_osd_request_data(struct ceph_osd_request *osd_req,
> +				   unsigned int which)
>   {
> -	return osd_req_op_data(osd_req, which, extent, osd_data);
> +	return osd_req_op_data(osd_req, which, extent, request_data);
>   }
> -EXPORT_SYMBOL(osd_req_op_extent_osd_data);
> +EXPORT_SYMBOL(osd_req_op_extent_osd_request_data);
> +
> +struct ceph_osd_data *
> +osd_req_op_extent_osd_response_data(struct ceph_osd_request *osd_req,
> +				    unsigned int which)
> +{
> +	return osd_req_op_data(osd_req, which, extent, response_data);
> +}
> +EXPORT_SYMBOL(osd_req_op_extent_osd_response_data);
>
>   struct ceph_osd_data *
>   osd_req_op_cls_response_data(struct ceph_osd_request *osd_req,
> @@ -186,21 +194,46 @@ void osd_req_op_extent_osd_data_pages(struct ceph_osd_request *osd_req,
>   			u64 length, u32 alignment,
>   			bool pages_from_pool, bool own_pages)
>   {
> -	struct ceph_osd_data *osd_data;
> +	struct ceph_osd_req_op *op = &osd_req->r_ops[which];
>
> -	osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
> -	ceph_osd_data_pages_init(osd_data, pages, length, alignment,
> -				pages_from_pool, own_pages);
> +	switch (op->op) {
> +	case CEPH_OSD_OP_READ:
> +	case CEPH_OSD_OP_ZERO:
> +	case CEPH_OSD_OP_TRUNCATE:
> +		ceph_osd_data_pages_init(&op->extent.response_data, pages,
> +					 length, alignment, pages_from_pool,
> +					 own_pages);
> +		break;
> +	case CEPH_OSD_OP_WRITE:
> +		ceph_osd_data_pages_init(&op->extent.request_data, pages,
> +					  length, alignment, pages_from_pool,
> +					  own_pages);
> +		break;
> +	default:
> +		BUG();
> +	}
>   }
>   EXPORT_SYMBOL(osd_req_op_extent_osd_data_pages);
>
>   void osd_req_op_extent_osd_data_pagelist(struct ceph_osd_request *osd_req,
>   			unsigned int which, struct ceph_pagelist *pagelist)
>   {
> -	struct ceph_osd_data *osd_data;
> +	struct ceph_osd_req_op *op = &osd_req->r_ops[which];
>
> -	osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
> -	ceph_osd_data_pagelist_init(osd_data, pagelist);
> +	switch (op->op) {
> +	case CEPH_OSD_OP_READ:
> +	case CEPH_OSD_OP_ZERO:
> +	case CEPH_OSD_OP_TRUNCATE:
> +		ceph_osd_data_pagelist_init(&op->extent.response_data,
> +					    pagelist);
> +		break;
> +	case CEPH_OSD_OP_WRITE:
> +		ceph_osd_data_pagelist_init(&op->extent.request_data,
> +					    pagelist);
> +		break;
> +	default:
> +		BUG();
> +	}
>   }
>   EXPORT_SYMBOL(osd_req_op_extent_osd_data_pagelist);
>
> @@ -208,10 +241,22 @@ EXPORT_SYMBOL(osd_req_op_extent_osd_data_pagelist);
>   void osd_req_op_extent_osd_data_bio(struct ceph_osd_request *osd_req,
>   			unsigned int which, struct bio *bio, size_t bio_length)
>   {
> -	struct ceph_osd_data *osd_data;
> +	struct ceph_osd_req_op *op = &osd_req->r_ops[which];
>
> -	osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
> -	ceph_osd_data_bio_init(osd_data, bio, bio_length);
> +	switch (op->op) {
> +	case CEPH_OSD_OP_READ:
> +	case CEPH_OSD_OP_ZERO:
> +	case CEPH_OSD_OP_TRUNCATE:
> +		ceph_osd_data_bio_init(&op->extent.response_data, bio,
> +				       bio_length);
> +		break;
> +	case CEPH_OSD_OP_WRITE:
> +		ceph_osd_data_bio_init(&op->extent.request_data, bio,
> +				       bio_length);
> +		break;
> +	default:
> +		BUG();
> +	}
>   }
>   EXPORT_SYMBOL(osd_req_op_extent_osd_data_bio);
>   #endif /* CONFIG_BLOCK */
> @@ -220,10 +265,22 @@ void osd_req_op_extent_osd_data_sg(struct ceph_osd_request *osd_req,
>   			unsigned int which, struct scatterlist *sgl,
>   			unsigned int init_sg_offset, u64 length)
>   {
> -	struct ceph_osd_data *osd_data;
> +	struct ceph_osd_req_op *op = &osd_req->r_ops[which];
>
> -	osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
> -	ceph_osd_data_sg_init(osd_data, sgl, init_sg_offset, length);
> +	switch (op->op) {
> +	case CEPH_OSD_OP_READ:
> +	case CEPH_OSD_OP_ZERO:
> +	case CEPH_OSD_OP_TRUNCATE:
> +		ceph_osd_data_sg_init(&op->extent.response_data,
> +				      sgl, init_sg_offset, length);
> +		break;
> +	case CEPH_OSD_OP_WRITE:
> +		ceph_osd_data_sg_init(&op->extent.request_data,
> +				      sgl, init_sg_offset, length);
> +		break;
> +	default:
> +		BUG();
> +	}
>   }
>   EXPORT_SYMBOL(osd_req_op_extent_osd_data_sg);
>
> @@ -368,8 +425,10 @@ static void osd_req_op_data_release(struct ceph_osd_request *osd_req,
>
>   	switch (op->op) {
>   	case CEPH_OSD_OP_READ:
> +		ceph_osd_data_release(&op->extent.response_data);
> +		break;
>   	case CEPH_OSD_OP_WRITE:
> -		ceph_osd_data_release(&op->extent.osd_data);
> +		ceph_osd_data_release(&op->extent.request_data);
>   		break;
>   	case CEPH_OSD_OP_CALL:
>   		ceph_osd_data_release(&op->cls.request_info);
> @@ -783,19 +842,21 @@ static u64 osd_req_encode_op(struct ceph_osd_request *req,
>   	case CEPH_OSD_OP_WRITE:
>   	case CEPH_OSD_OP_ZERO:
>   	case CEPH_OSD_OP_TRUNCATE:
> -		if (src->op == CEPH_OSD_OP_WRITE)
> -			request_data_len = src->extent.length;
>   		dst->extent.offset = cpu_to_le64(src->extent.offset);
>   		dst->extent.length = cpu_to_le64(src->extent.length);
>   		dst->extent.truncate_size =
>   			cpu_to_le64(src->extent.truncate_size);
>   		dst->extent.truncate_seq =
>   			cpu_to_le32(src->extent.truncate_seq);
> -		osd_data = &src->extent.osd_data;
> -		if (src->op == CEPH_OSD_OP_WRITE)
> +		if (src->op == CEPH_OSD_OP_WRITE) {
> +			osd_data = &src->extent.request_data;
>   			ceph_osdc_msg_data_add(req->r_request, osd_data);
> -		else
> +
> +			request_data_len = src->extent.length;
> +		} else {
> +			osd_data = &src->extent.response_data;
>   			ceph_osdc_msg_data_add(req->r_reply, osd_data);
> +		}
>   		break;
>   	case CEPH_OSD_OP_CALL:
>   		dst->cls.class_len = src->cls.class_len;
> @@ -3326,7 +3387,7 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
>   		 * XXX page data.  Probably OK for reads, but this
>   		 * XXX ought to be done more generally.
>   		 */
> -		osd_data = osd_req_op_extent_osd_data(req, 0);
> +		osd_data = osd_req_op_extent_osd_response_data(req, 0);
>   		if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES) {
>   			if (osd_data->pages &&
>   				unlikely(osd_data->length < data_len)) {
>

^ permalink raw reply	[flat|nested] 33+ messages in thread

* Re: [PATCH 01/18] libceph: add scatterlist messenger data type
  2015-07-29  9:23 [PATCH 01/18] libceph: add scatterlist messenger data type mchristi
                   ` (18 preceding siblings ...)
  2015-07-29 17:55 ` Christoph Hellwig
@ 2016-02-04 10:33 ` David Disseldorp
  19 siblings, 0 replies; 33+ messages in thread
From: David Disseldorp @ 2016-02-04 10:33 UTC (permalink / raw)
  To: mchristi; +Cc: ceph-devel

On Wed, 29 Jul 2015 04:23:38 -0500, mchristi@redhat.com wrote:

> From: Mike Christie <michaelc@cs.wisc.edu>
> 
> LIO uses scatterlist for its page/data management. This patch
> adds a scatterlist messenger data type, so LIO can pass its sg
> down directly to rbd.
> 
> Signed-off-by: Mike Christie <michaelc@cs.wisc.edu>

...

>  /*
> + * For a sg data item, a piece is whatever remains of the next
> + * entry in the current sg entry, or the first entry in the next
> + * sg in the list.
> + */
> +static void ceph_msg_data_sg_cursor_init(struct ceph_msg_data_cursor *cursor,
> +					 size_t length)
> +{
> +	struct ceph_msg_data *data = cursor->data;
> +	struct scatterlist *sg;
> +
> +	BUG_ON(data->type != CEPH_MSG_DATA_SG);
> +
> +	sg = data->sgl;
> +	BUG_ON(!sg);
> +
> +	cursor->resid = min_t(u64, length, data->sgl_length);
> +	cursor->sg = sg;
> +	cursor->sg_consumed = data->sgl_init_offset;
> +	cursor->last_piece = cursor->resid <= sg->length;
> +}

Just in case the CEPH_MSG_DATA_SG changes are picked up, the
cursor->last_piece calculation here needs to take into account the
data->sgl_init_offset:
	if (cursor->resid <= (sg->length - data->sgl_init_offset))
		cursor->last_piece = true;
	...

Cheers, David

^ permalink raw reply	[flat|nested] 33+ messages in thread

end of thread, other threads:[~2016-02-04 10:33 UTC | newest]

Thread overview: 33+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2015-07-29  9:23 [PATCH 01/18] libceph: add scatterlist messenger data type mchristi
2015-07-29  9:23 ` [PATCH 02/18] rbd: add support for scatterlist obj_request_type mchristi
2015-07-29  9:23 ` [PATCH 03/18] rbd: add lio specific data area mchristi
2015-07-29  9:23 ` [PATCH 04/18] libceph: support bidirectional requests mchristi
2015-11-21 23:32   ` Goldwyn Rodrigues
2015-07-29  9:23 ` [PATCH 05/18] libceph: add support for CMPEXT compare extent requests mchristi
2015-07-29  9:23 ` [PATCH 06/18] rbd: add write test helper mchristi
2015-07-29  9:23 ` [PATCH 07/18] rbd: add num ops calculator helper mchristi
2015-07-29  9:23 ` [PATCH 08/18] rbd: add support for COMPARE_AND_WRITE/CMPEXT mchristi
2015-07-29  9:23 ` [PATCH 09/18] libceph: add support for write same requests mchristi
2015-07-29  9:23 ` [PATCH 10/18] rbd: add support for writesame requests mchristi
2015-07-29  9:23 ` [PATCH 11/18] target: add compare and write callback mchristi
2015-07-29  9:23 ` [PATCH 12/18] target: compare and write backend driver sense handling mchristi
2015-09-04 19:41   ` Mike Christie
2015-09-04 22:34     ` Andy Grover
2015-09-06  6:38     ` Sagi Grimberg
2015-09-06  7:12   ` Christoph Hellwig
2015-07-29  9:23 ` [PATCH 13/18] target: add COMPARE_AND_WRITE sg creation helper mchristi
2015-07-29  9:23 ` [PATCH 14/18] libceph: fix pr_fmt compile issues mchristi
2015-07-29  9:23 ` [PATCH 15/18] rbd: export some functions used by lio rbd backend mchristi
2015-07-29  9:23 ` [PATCH 16/18] rbd: move structs used by lio rbd to new header mchristi
2015-07-29  9:23 ` [PATCH 17/18] target: add rbd backend mchristi
2015-07-29 14:27   ` Bart Van Assche
2015-07-29 17:07     ` Mike Christie
2015-07-29  9:23 ` [PATCH 18/18] target: add lio rbd to makefile/Kconfig mchristi
2015-07-29 13:34 ` [PATCH 01/18] libceph: add scatterlist messenger data type Alex Elder
2015-07-29 17:49   ` Mike Christie
2015-07-29 17:55 ` Christoph Hellwig
2015-07-29 22:59   ` Mike Christie
2015-07-29 23:40     ` Mike Christie
2015-07-30  7:34       ` Nicholas A. Bellinger
2015-07-30 14:55       ` Christoph Hellwig
2016-02-04 10:33 ` David Disseldorp

This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.