All of lore.kernel.org
 help / color / mirror / Atom feed
* [PATCH 00/10] ceph/rbd: initial support for lio ha
@ 2015-04-28 22:05 mchristi
  2015-04-28 22:05 ` [PATCH 01/10] rbd: add obj request execution helper mchristi
                   ` (9 more replies)
  0 siblings, 10 replies; 19+ messages in thread
From: mchristi @ 2015-04-28 22:05 UTC (permalink / raw)
  To: ceph-devel

Sorry for possibly duplicate emails. I think git-send-email got a
error in the middle of sending the patchset, so it looks like it
did not make it the first time.

The following patches made over v4.0 of the upstream linux kernel add
functionality needed to support HA LIO using RBD for the backing store.
More info can be found here
https://wiki.ceph.com/Planning/Blueprints/Hammer/Clustered_SCSI_target_using_RBD

These patches just add the ceph/rbd code. I was hoping to also post
the lio stuff today, but am hitting some bugs, and Doug might need some
of this to implement notify2/watch2 support.

I will try to post at least the TMF handling code that uses these functions
in a couple days.



^ permalink raw reply	[flat|nested] 19+ messages in thread

* [PATCH 01/10] rbd: add obj request execution helper
  2015-04-28 22:05 [PATCH 00/10] ceph/rbd: initial support for lio ha mchristi
@ 2015-04-28 22:05 ` mchristi
  2015-04-29 21:33   ` Alex Elder
  2015-04-28 22:05 ` [PATCH 02/10] ceph: add start/finish encoding helpers mchristi
                   ` (8 subsequent siblings)
  9 siblings, 1 reply; 19+ messages in thread
From: mchristi @ 2015-04-28 22:05 UTC (permalink / raw)
  To: ceph-devel

From: Mike Christie <michaelc@cs.wisc.edu>

This patch breaks out the code that allocates buffers and executes
the request from rbd_obj_method_sync, so future functions in this
patchset can use it.

It also adds support for OBJ_OP_WRITE requests which is needed for
the locking functions which will be added in the next patches.

Signed-off-by: Mike Christie <michaelc@cs.wisc.edu>
---
 drivers/block/rbd.c | 156 ++++++++++++++++++++++++++++++++--------------------
 1 file changed, 95 insertions(+), 61 deletions(-)

diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index b40af32..fafe558 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -3224,89 +3224,123 @@ static void rbd_dev_header_unwatch_sync(struct rbd_device *rbd_dev)
 }
 
 /*
- * Synchronous osd object method call.  Returns the number of bytes
- * returned in the outbound buffer, or a negative error code.
+ * Synchronous osd object op call.  Returns the number of bytes
+ * returned in the inbound buffer, or a negative error code.
  */
-static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
-			     const char *object_name,
-			     const char *class_name,
-			     const char *method_name,
-			     const void *outbound,
-			     size_t outbound_size,
-			     void *inbound,
-			     size_t inbound_size)
+static int rbd_obj_request_sync(struct rbd_device *rbd_dev,
+				struct rbd_obj_request *obj_request,
+				const void *outbound, size_t outbound_size,
+				void *inbound, size_t inbound_size)
 {
 	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
-	struct rbd_obj_request *obj_request;
-	struct page **pages;
-	u32 page_count;
-	int ret;
-
-	/*
-	 * Method calls are ultimately read operations.  The result
-	 * should placed into the inbound buffer provided.  They
-	 * also supply outbound data--parameters for the object
-	 * method.  Currently if this is present it will be a
-	 * snapshot id.
-	 */
-	page_count = (u32)calc_pages_for(0, inbound_size);
-	pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
-	if (IS_ERR(pages))
-		return PTR_ERR(pages);
-
-	ret = -ENOMEM;
-	obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
-							OBJ_REQUEST_PAGES);
-	if (!obj_request)
-		goto out;
-
-	obj_request->pages = pages;
-	obj_request->page_count = page_count;
-
-	obj_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1,
-						  obj_request);
-	if (!obj_request->osd_req)
-		goto out;
+	struct page **pages = NULL;
+	u32 page_count = 0;
+	int ret = -ENOMEM;
+	u16 op = obj_request->osd_req->r_ops[0].op;
+	struct ceph_pagelist *pagelist;
+
+	if (inbound_size) {
+		page_count = (u32)calc_pages_for(0, inbound_size);
+		pages = ceph_alloc_page_vector(page_count, GFP_NOIO);
+		if (IS_ERR(pages))
+			return PTR_ERR(pages);
+
+		obj_request->pages = pages;
+		obj_request->page_count = page_count;
+
+		switch (op) {
+		case CEPH_OSD_OP_CALL:
+			osd_req_op_cls_response_data_pages(obj_request->osd_req,
+							   0, pages,
+							   inbound_size,
+							   0, false, false);
+			break;
+		default:
+			BUG();
+		}
+	}
 
-	osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
-					class_name, method_name);
 	if (outbound_size) {
-		struct ceph_pagelist *pagelist;
-
-		pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
+		pagelist = kmalloc(sizeof (*pagelist), GFP_NOIO);
 		if (!pagelist)
-			goto out;
+			goto free_pages;
 
 		ceph_pagelist_init(pagelist);
 		ceph_pagelist_append(pagelist, outbound, outbound_size);
-		osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0,
-						pagelist);
+
+		switch (op) {
+		case CEPH_OSD_OP_CALL:
+			osd_req_op_cls_request_data_pagelist(
+							obj_request->osd_req, 0,
+							pagelist);
+			break;
+		default:
+			BUG();
+		}
 	}
-	osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
-					obj_request->pages, inbound_size,
-					0, false, false);
-	rbd_osd_req_format_read(obj_request);
+
+	if (inbound_size)
+		rbd_osd_req_format_read(obj_request);
+	else
+		rbd_osd_req_format_write(obj_request);
 
 	ret = rbd_obj_request_submit(osdc, obj_request);
 	if (ret)
-		goto out;
+		goto done;
 	ret = rbd_obj_request_wait(obj_request);
 	if (ret)
-		goto out;
+		goto done;
 
 	ret = obj_request->result;
 	if (ret < 0)
-		goto out;
+		goto done;
 
 	rbd_assert(obj_request->xferred < (u64)INT_MAX);
 	ret = (int)obj_request->xferred;
-	ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
-out:
-	if (obj_request)
-		rbd_obj_request_put(obj_request);
-	else
-		ceph_release_page_vector(pages, page_count);
+	if (inbound_size)
+		ceph_copy_from_page_vector(pages, inbound, 0,
+					   obj_request->xferred);
+done:
+	return ret;
+
+free_pages:
+	ceph_release_page_vector(pages, page_count);
+	return ret;
+}
 
+/*
+ * Synchronous osd object method call.  Returns the number of bytes
+ * returned in the inbound buffer, or a negative error code.
+ */
+static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
+			       const char *object_name,
+			       const char *class_name,
+			       const char *method_name,
+			       const void *outbound,
+			       size_t outbound_size,
+			       void *inbound,
+			       size_t inbound_size)
+{
+	struct rbd_obj_request *obj_request;
+	int ret = -ENOMEM;
+
+	obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
+					     OBJ_REQUEST_PAGES);
+	if (!obj_request)
+		return -ENOMEM;
+
+	obj_request->osd_req = rbd_osd_req_create(rbd_dev,
+					inbound ? OBJ_OP_READ : OBJ_OP_WRITE,
+					1, obj_request);
+	if (!obj_request->osd_req)
+		goto out;
+
+	osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
+			    class_name, method_name);
+	ret = rbd_obj_request_sync(rbd_dev, obj_request, outbound, outbound_size,
+				   inbound, inbound_size);
+out:
+	rbd_obj_request_put(obj_request);
 	return ret;
 }
 
-- 
1.8.3.1


^ permalink raw reply related	[flat|nested] 19+ messages in thread

* [PATCH 02/10] ceph: add start/finish encoding helpers
  2015-04-28 22:05 [PATCH 00/10] ceph/rbd: initial support for lio ha mchristi
  2015-04-28 22:05 ` [PATCH 01/10] rbd: add obj request execution helper mchristi
@ 2015-04-28 22:05 ` mchristi
  2015-04-29 21:54   ` Alex Elder
  2015-04-30 12:22   ` Alex Elder
  2015-04-28 22:05 ` [PATCH 03/10] ceph: export ceph_entity_type_name mchristi
                   ` (7 subsequent siblings)
  9 siblings, 2 replies; 19+ messages in thread
From: mchristi @ 2015-04-28 22:05 UTC (permalink / raw)
  To: ceph-devel

From: Mike Christie <michaelc@cs.wisc.edu>

This patch adds helpers to encode/decode the starting blocks
locking code. They are the equivalent of ENCODE_START and
DECODE_START_LEGACY_COMPAT_LEN in the userspace ceph code.

Signed-off-by: Mike Christie <michaelc@cs.wisc.edu>
---
 include/linux/ceph/decode.h | 55 +++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 55 insertions(+)

diff --git a/include/linux/ceph/decode.h b/include/linux/ceph/decode.h
index a6ef9cc..96ec43d 100644
--- a/include/linux/ceph/decode.h
+++ b/include/linux/ceph/decode.h
@@ -217,6 +217,61 @@ static inline void ceph_encode_string(void **p, void *end,
 	*p += len;
 }
 
+/*
+ * version and length starting block encoders/decoders
+ */
+
+/* current code version (u8) + compat code version (u8) + len of struct (u32) */
+#define CEPH_ENCODING_START_BLK_LEN 6
+
+/**
+ * ceph_start_encoding - start encoding block
+ * @p: buffer to encode data in
+ * @curr_ver: current (code) version of the encoding
+ * @compat_ver: oldest code version that can decode it
+ * @len: length of data that will be encoded in buffer
+ */
+static inline void ceph_start_encoding(void **p, u8 curr_ver, u8 compat_ver,
+				       u32 len)
+{
+	ceph_encode_8(p, curr_ver);
+	ceph_encode_8(p, compat_ver);
+	ceph_encode_32(p, len);
+}
+
+/**
+ * ceph_start_decoding_compat - decode block with legacy support for older schemes
+ * @p: buffer to decode
+ * @end: end of decode buffer
+ * @curr_ver: current version of the encoding that the code supports/encode
+ * @compat_ver: oldest version that includes a __u8 compat version field
+ * @len_ver: oldest version that includes a __u32 length wrapper
+ * @len: buffer to return len of data in buffer
+ */
+static inline int ceph_start_decoding_compat(void **p, void *end, u8 curr_ver,
+					     u8 compat_ver, u8 len_ver, u32 *len)
+{
+	u8 struct_ver, struct_compat;
+
+	ceph_decode_8_safe(p, end, struct_ver, fail);
+	if (struct_ver >= curr_ver) {
+		ceph_decode_8_safe(p, end, struct_compat, fail);
+		if (curr_ver < struct_compat)
+			return -EINVAL;
+	}
+
+	if (struct_ver >= len_ver) {
+		ceph_decode_32_safe(p, end, *len, fail);
+	} else {
+		*len = 0;
+	}
+
+	return 0;
+fail:
+	return -ERANGE;
+}
+
+
 #define ceph_encode_need(p, end, n, bad)			\
 	do {							\
 		if (!likely(ceph_has_room(p, end, n)))		\
-- 
1.8.3.1


^ permalink raw reply related	[flat|nested] 19+ messages in thread

* [PATCH 03/10] ceph: export ceph_entity_type_name
  2015-04-28 22:05 [PATCH 00/10] ceph/rbd: initial support for lio ha mchristi
  2015-04-28 22:05 ` [PATCH 01/10] rbd: add obj request execution helper mchristi
  2015-04-28 22:05 ` [PATCH 02/10] ceph: add start/finish encoding helpers mchristi
@ 2015-04-28 22:05 ` mchristi
  2015-04-28 22:05 ` [PATCH 04/10] ceph/rbd: add support for watch notify payloads mchristi
                   ` (6 subsequent siblings)
  9 siblings, 0 replies; 19+ messages in thread
From: mchristi @ 2015-04-28 22:05 UTC (permalink / raw)
  To: ceph-devel

From: Mike Christie <michaelc@cs.wisc.edu>

The lock info code wants to print this in the debug code so this
patch just exports it.

Signed-off-by: Mike Christie <michaelc@cs.wisc.edu>
---
 net/ceph/ceph_strings.c | 1 +
 1 file changed, 1 insertion(+)

diff --git a/net/ceph/ceph_strings.c b/net/ceph/ceph_strings.c
index 139a9cb..5e5ba46 100644
--- a/net/ceph/ceph_strings.c
+++ b/net/ceph/ceph_strings.c
@@ -15,6 +15,7 @@ const char *ceph_entity_type_name(int type)
 	default: return "unknown";
 	}
 }
+EXPORT_SYMBOL(ceph_entity_type_name);
 
 const char *ceph_osd_op_name(int op)
 {
-- 
1.8.3.1


^ permalink raw reply related	[flat|nested] 19+ messages in thread

* [PATCH 04/10] ceph/rbd: add support for watch notify payloads
  2015-04-28 22:05 [PATCH 00/10] ceph/rbd: initial support for lio ha mchristi
                   ` (2 preceding siblings ...)
  2015-04-28 22:05 ` [PATCH 03/10] ceph: export ceph_entity_type_name mchristi
@ 2015-04-28 22:05 ` mchristi
  2015-04-28 22:05 ` [PATCH 05/10] ceph: decode start helper mchristi
                   ` (5 subsequent siblings)
  9 siblings, 0 replies; 19+ messages in thread
From: mchristi @ 2015-04-28 22:05 UTC (permalink / raw)
  To: ceph-devel

From: Mike Christie <michaelc@cs.wisc.edu>

This patch adds support for proto version 1 of watch-notify,
so drivers like rbd can be sent a buffer with information like
the notify operation being performed.

Signed-off-by: Mike Christie <michaelc@cs.wisc.edu>
---
 drivers/block/rbd.c             |  3 ++-
 include/linux/ceph/osd_client.h |  7 +++++--
 net/ceph/osd_client.c           | 21 ++++++++++++++++-----
 3 files changed, 23 insertions(+), 8 deletions(-)

diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index fafe558..bb4a0fe 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -3081,7 +3081,8 @@ out:
 	return ret;
 }
 
-static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
+static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data,
+			 void *payload, int payload_len)
 {
 	struct rbd_device *rbd_dev = (struct rbd_device *)data;
 	int ret;
diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h
index 61b19c4..2e9f89e 100644
--- a/include/linux/ceph/osd_client.h
+++ b/include/linux/ceph/osd_client.h
@@ -184,7 +184,7 @@ struct ceph_osd_event {
 	u64 cookie;
 	int one_shot;
 	struct ceph_osd_client *osdc;
-	void (*cb)(u64, u64, u8, void *);
+	void (*cb)(u64, u64, u8, void *, void *, int);
 	void *data;
 	struct rb_node node;
 	struct list_head osd_node;
@@ -197,6 +197,8 @@ struct ceph_osd_event_work {
         u64 ver;
         u64 notify_id;
         u8 opcode;
+	void *payload;
+	int payload_len;
 };
 
 struct ceph_osd_client {
@@ -369,7 +371,8 @@ extern int ceph_osdc_writepages(struct ceph_osd_client *osdc,
 
 /* watch/notify events */
 extern int ceph_osdc_create_event(struct ceph_osd_client *osdc,
-				  void (*event_cb)(u64, u64, u8, void *),
+				  void (*event_cb)(u64, u64, u8, void *, void *,
+						   int),
 				  void *data, struct ceph_osd_event **pevent);
 extern void ceph_osdc_cancel_event(struct ceph_osd_event *event);
 extern void ceph_osdc_put_event(struct ceph_osd_event *event);
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index 41a4abc..305a0b9 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -2262,7 +2262,7 @@ static void __remove_event(struct ceph_osd_event *event)
 }
 
 int ceph_osdc_create_event(struct ceph_osd_client *osdc,
-			   void (*event_cb)(u64, u64, u8, void *),
+			   void (*event_cb)(u64, u64, u8, void *, void *, int),
 			   void *data, struct ceph_osd_event **pevent)
 {
 	struct ceph_osd_event *event;
@@ -2314,7 +2314,8 @@ static void do_event_work(struct work_struct *work)
 	u8 opcode = event_work->opcode;
 
 	dout("do_event_work completing %p\n", event);
-	event->cb(ver, notify_id, opcode, event->data);
+	event->cb(ver, notify_id, opcode, event->data, event_work->payload,
+		  event_work->payload_len);
 	dout("do_event_work completed %p\n", event);
 	ceph_osdc_put_event(event);
 	kfree(event_work);
@@ -2327,10 +2328,11 @@ static void do_event_work(struct work_struct *work)
 static void handle_watch_notify(struct ceph_osd_client *osdc,
 				struct ceph_msg *msg)
 {
-	void *p, *end;
+	void *p, *end, *payload = NULL;
 	u8 proto_ver;
 	u64 cookie, ver, notify_id;
 	u8 opcode;
+	u32 payload_len = 0;
 	struct ceph_osd_event *event;
 	struct ceph_osd_event_work *event_work;
 
@@ -2343,6 +2345,13 @@ static void handle_watch_notify(struct ceph_osd_client *osdc,
 	ceph_decode_64_safe(&p, end, ver, bad);
 	ceph_decode_64_safe(&p, end, notify_id, bad);
 
+	if (proto_ver >= 1) {
+		ceph_decode_32_safe(&p, end, payload_len, bad);
+		if (end - p < payload_len)
+			goto bad;
+		payload = p;
+	}
+
 	spin_lock(&osdc->event_lock);
 	event = __find_event(osdc, cookie);
 	if (event) {
@@ -2350,8 +2359,8 @@ static void handle_watch_notify(struct ceph_osd_client *osdc,
 		get_event(event);
 	}
 	spin_unlock(&osdc->event_lock);
-	dout("handle_watch_notify cookie %lld ver %lld event %p\n",
-	     cookie, ver, event);
+	dout("handle_watch_notify cookie %lld ver %lld event %p notify id %llu payload len %u\n",
+	     cookie, ver, event, notify_id, payload_len);
 	if (event) {
 		event_work = kmalloc(sizeof(*event_work), GFP_NOIO);
 		if (!event_work) {
@@ -2364,6 +2373,8 @@ static void handle_watch_notify(struct ceph_osd_client *osdc,
 		event_work->ver = ver;
 		event_work->notify_id = notify_id;
 		event_work->opcode = opcode;
+		event_work->payload = payload;
+		event_work->payload_len = payload_len;
 
 		queue_work(osdc->notify_wq, &event_work->work);
 	}
-- 
1.8.3.1


^ permalink raw reply related	[flat|nested] 19+ messages in thread

* [PATCH 05/10] ceph: decode start helper
  2015-04-28 22:05 [PATCH 00/10] ceph/rbd: initial support for lio ha mchristi
                   ` (3 preceding siblings ...)
  2015-04-28 22:05 ` [PATCH 04/10] ceph/rbd: add support for watch notify payloads mchristi
@ 2015-04-28 22:05 ` mchristi
  2015-04-28 22:05 ` [PATCH 06/10] ceph/rbd: add support for header version 2 and 3 mchristi
                   ` (4 subsequent siblings)
  9 siblings, 0 replies; 19+ messages in thread
From: mchristi @ 2015-04-28 22:05 UTC (permalink / raw)
  To: ceph-devel

From: Mike Christie <michaelc@cs.wisc.edu>

Add support for userspace ceph DECODE_START.

Signed-off-by: Mike Christie <michaelc@cs.wisc.edu>
---
 include/linux/ceph/decode.h | 23 +++++++++++++++++++++++
 1 file changed, 23 insertions(+)

diff --git a/include/linux/ceph/decode.h b/include/linux/ceph/decode.h
index 96ec43d..4f551b8 100644
--- a/include/linux/ceph/decode.h
+++ b/include/linux/ceph/decode.h
@@ -271,6 +271,29 @@ fail:
 	return -ERANGE;
 }
 
+/**
+ * ceph_start_decoding - start a decoding block
+ * @p: buffer to decode
+ * @end: end of buffer
+ * @curr_ver: current version of the encoding that the code supports/encode
+ * @len: buffer to return len of data in buffer
+ */
+static inline int ceph_start_decoding(void **p, void *end, u8 curr_ver,
+				      u32 *len)
+{
+	u8 struct_ver, struct_compat;
+
+	ceph_decode_8_safe(p, end, struct_ver, fail);
+	ceph_decode_8_safe(p, end, struct_compat, fail);
+
+	if (curr_ver < struct_compat)
+		return -EINVAL;
+
+	ceph_decode_32_safe(p, end, *len, fail);
+	return 0;
+fail:
+	return -ERANGE;
+}
 
 #define ceph_encode_need(p, end, n, bad)			\
 	do {							\
-- 
1.8.3.1


^ permalink raw reply related	[flat|nested] 19+ messages in thread

* [PATCH 06/10] ceph/rbd: add support for header version 2 and 3
  2015-04-28 22:05 [PATCH 00/10] ceph/rbd: initial support for lio ha mchristi
                   ` (4 preceding siblings ...)
  2015-04-28 22:05 ` [PATCH 05/10] ceph: decode start helper mchristi
@ 2015-04-28 22:05 ` mchristi
  2015-04-28 22:05 ` [PATCH 07/10] ceph/rbd: update watch-notify ceph_osd_op mchristi
                   ` (3 subsequent siblings)
  9 siblings, 0 replies; 19+ messages in thread
From: mchristi @ 2015-04-28 22:05 UTC (permalink / raw)
  To: ceph-devel

From: Mike Christie <michaelc@cs.wisc.edu>

This adds support watch-notify header 2 and 3 support, so we can
get a return_code from those operations.

Signed-off-by: Mike Christie <michaelc@cs.wisc.edu>
---
 drivers/block/rbd.c             |  5 +++--
 include/linux/ceph/osd_client.h | 10 ++++++----
 net/ceph/osd_client.c           | 25 +++++++++++++++++++------
 3 files changed, 28 insertions(+), 12 deletions(-)

diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index bb4a0fe..8caf345 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -3081,8 +3081,9 @@ out:
 	return ret;
 }
 
-static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data,
-			 void *payload, int payload_len)
+static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, s32 return_code,
+			 u64 notifier_gid, void *data, void *payload,
+			 u32 payload_len)
 {
 	struct rbd_device *rbd_dev = (struct rbd_device *)data;
 	int ret;
diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h
index 2e9f89e..f180883 100644
--- a/include/linux/ceph/osd_client.h
+++ b/include/linux/ceph/osd_client.h
@@ -184,7 +184,7 @@ struct ceph_osd_event {
 	u64 cookie;
 	int one_shot;
 	struct ceph_osd_client *osdc;
-	void (*cb)(u64, u64, u8, void *, void *, int);
+	void (*cb)(u64, u64, u8, s32, u64, void *, void *, u32);
 	void *data;
 	struct rb_node node;
 	struct list_head osd_node;
@@ -197,8 +197,10 @@ struct ceph_osd_event_work {
         u64 ver;
         u64 notify_id;
         u8 opcode;
+	s32 return_code;
+	u64 notifier_gid;
 	void *payload;
-	int payload_len;
+	u32 payload_len;
 };
 
 struct ceph_osd_client {
@@ -371,8 +373,8 @@ extern int ceph_osdc_writepages(struct ceph_osd_client *osdc,
 
 /* watch/notify events */
 extern int ceph_osdc_create_event(struct ceph_osd_client *osdc,
-				  void (*event_cb)(u64, u64, u8, void *, void *,
-						   int),
+				  void (*event_cb)(u64, u64, u8, s32, u64,
+						   void *, void *, u32),
 				  void *data, struct ceph_osd_event **pevent);
 extern void ceph_osdc_cancel_event(struct ceph_osd_event *event);
 extern void ceph_osdc_put_event(struct ceph_osd_event *event);
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index 305a0b9..0ce860d 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -2262,7 +2262,8 @@ static void __remove_event(struct ceph_osd_event *event)
 }
 
 int ceph_osdc_create_event(struct ceph_osd_client *osdc,
-			   void (*event_cb)(u64, u64, u8, void *, void *, int),
+			   void (*event_cb)(u64, u64, u8, s32, u64, void *,
+					    void *, u32),
 			   void *data, struct ceph_osd_event **pevent)
 {
 	struct ceph_osd_event *event;
@@ -2312,10 +2313,12 @@ static void do_event_work(struct work_struct *work)
 	u64 ver = event_work->ver;
 	u64 notify_id = event_work->notify_id;
 	u8 opcode = event_work->opcode;
+	s32 return_code = event_work->return_code;
+	u64 notifier_gid = event_work->notifier_gid;
 
 	dout("do_event_work completing %p\n", event);
-	event->cb(ver, notify_id, opcode, event->data, event_work->payload,
-		  event_work->payload_len);
+	event->cb(ver, notify_id, opcode, return_code, notifier_gid,
+		  event->data, event_work->payload, event_work->payload_len);
 	dout("do_event_work completed %p\n", event);
 	ceph_osdc_put_event(event);
 	kfree(event_work);
@@ -2330,9 +2333,10 @@ static void handle_watch_notify(struct ceph_osd_client *osdc,
 {
 	void *p, *end, *payload = NULL;
 	u8 proto_ver;
-	u64 cookie, ver, notify_id;
+	u64 cookie, ver, notify_id, notifier_gid = 0;
 	u8 opcode;
 	u32 payload_len = 0;
+	s32 return_code = 0;
 	struct ceph_osd_event *event;
 	struct ceph_osd_event_work *event_work;
 
@@ -2350,8 +2354,15 @@ static void handle_watch_notify(struct ceph_osd_client *osdc,
 		if (end - p < payload_len)
 			goto bad;
 		payload = p;
+		p += payload_len;
 	}
 
+	if (msg->hdr.version >= 2)
+		ceph_decode_32_safe(&p, end, return_code, bad);
+
+	if (msg->hdr.version >= 3)
+		ceph_decode_32_safe(&p, end, notifier_gid, bad);
+
 	spin_lock(&osdc->event_lock);
 	event = __find_event(osdc, cookie);
 	if (event) {
@@ -2359,8 +2370,8 @@ static void handle_watch_notify(struct ceph_osd_client *osdc,
 		get_event(event);
 	}
 	spin_unlock(&osdc->event_lock);
-	dout("handle_watch_notify cookie %lld ver %lld event %p notify id %llu payload len %u\n",
-	     cookie, ver, event, notify_id, payload_len);
+	dout("handle_watch_notify cookie %lld ver %lld event %p notify id %llu payload len %u return code %d notifier gid %llu\n",
+	     cookie, ver, event, notify_id, payload_len, return_code, notifier_gid);
 	if (event) {
 		event_work = kmalloc(sizeof(*event_work), GFP_NOIO);
 		if (!event_work) {
@@ -2373,6 +2384,8 @@ static void handle_watch_notify(struct ceph_osd_client *osdc,
 		event_work->ver = ver;
 		event_work->notify_id = notify_id;
 		event_work->opcode = opcode;
+		event_work->return_code = return_code;
+		event_work->notifier_gid = notifier_gid;
 		event_work->payload = payload;
 		event_work->payload_len = payload_len;
 
-- 
1.8.3.1


^ permalink raw reply related	[flat|nested] 19+ messages in thread

* [PATCH 07/10] ceph/rbd: update watch-notify ceph_osd_op
  2015-04-28 22:05 [PATCH 00/10] ceph/rbd: initial support for lio ha mchristi
                   ` (5 preceding siblings ...)
  2015-04-28 22:05 ` [PATCH 06/10] ceph/rbd: add support for header version 2 and 3 mchristi
@ 2015-04-28 22:05 ` mchristi
  2015-04-28 22:05 ` [PATCH 08/10] ceph/rbd: add notify support mchristi
                   ` (2 subsequent siblings)
  9 siblings, 0 replies; 19+ messages in thread
From: mchristi @ 2015-04-28 22:05 UTC (permalink / raw)
  To: ceph-devel

From: Mike Christie <michaelc@cs.wisc.edu>

This syncs the ceph_osd_op struct with the current version of ceph
where the watch struct has been updated to support more ops and
the notify-ack support has been broken out of the watch struct.

Ceph commits
1a82cc3926fc7bc4cfbdd2fd4dfee8660d5107a1
2288f318e1b1f6a1c42b185fc1b4c41f23995247
73720130c34424bf1fe36058ebe8da66976f40fb

It still has us use the legacy watch op for now. I will add support
later. It is mostly a prepartion patch for more advanced notify support.

Questions:

1. Should linger also be set for CEPH_OSD_WATCH_OP_RECONNECT?
2. Not sure what watch.gen is used for. Is that for our internal
use or does the osd do something with it.

Signed-off-by: Mike Christie <michaelc@cs.wisc.edu>
---
 drivers/block/rbd.c             | 19 +++++++++++--------
 include/linux/ceph/osd_client.h |  7 +++----
 include/linux/ceph/rados.h      | 15 +++++++++++++--
 net/ceph/osd_client.c           | 14 +++++++-------
 4 files changed, 34 insertions(+), 21 deletions(-)

diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index 8caf345..a70447c 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -3067,8 +3067,8 @@ static int rbd_obj_notify_ack_sync(struct rbd_device *rbd_dev, u64 notify_id)
 	if (!obj_request->osd_req)
 		goto out;
 
-	osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
-					notify_id, 0, 0);
+	osd_req_op_watch_init(obj_request->osd_req, 0,
+			      CEPH_OSD_OP_NOTIFY_ACK, 0, notify_id);
 	rbd_osd_req_format_read(obj_request);
 
 	ret = rbd_obj_request_submit(osdc, obj_request);
@@ -3116,7 +3116,7 @@ static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, s32 return_code,
  */
 static struct rbd_obj_request *rbd_obj_watch_request_helper(
 						struct rbd_device *rbd_dev,
-						bool watch)
+						u8 watch_opcode)
 {
 	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
 	struct rbd_obj_request *obj_request;
@@ -3135,10 +3135,11 @@ static struct rbd_obj_request *rbd_obj_watch_request_helper(
 	}
 
 	osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
-			      rbd_dev->watch_event->cookie, 0, watch);
+			      watch_opcode, rbd_dev->watch_event->cookie);
 	rbd_osd_req_format_write(obj_request);
 
-	if (watch)
+	if (watch_opcode == CEPH_OSD_WATCH_OP_LEGACY_WATCH ||
+	    watch_opcode == CEPH_OSD_WATCH_OP_WATCH)
 		ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
 
 	ret = rbd_obj_request_submit(osdc, obj_request);
@@ -3151,7 +3152,7 @@ static struct rbd_obj_request *rbd_obj_watch_request_helper(
 
 	ret = obj_request->result;
 	if (ret) {
-		if (watch)
+		if (watch_opcode != CEPH_OSD_WATCH_OP_UNWATCH)
 			rbd_obj_request_end(obj_request);
 		goto out;
 	}
@@ -3180,7 +3181,8 @@ static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev)
 	if (ret < 0)
 		return ret;
 
-	obj_request = rbd_obj_watch_request_helper(rbd_dev, true);
+	obj_request = rbd_obj_watch_request_helper(rbd_dev,
+						CEPH_OSD_WATCH_OP_LEGACY_WATCH);
 	if (IS_ERR(obj_request)) {
 		ceph_osdc_cancel_event(rbd_dev->watch_event);
 		rbd_dev->watch_event = NULL;
@@ -3214,7 +3216,8 @@ static void rbd_dev_header_unwatch_sync(struct rbd_device *rbd_dev)
 	rbd_obj_request_put(rbd_dev->watch_request);
 	rbd_dev->watch_request = NULL;
 
-	obj_request = rbd_obj_watch_request_helper(rbd_dev, false);
+	obj_request = rbd_obj_watch_request_helper(rbd_dev,
+						   CEPH_OSD_WATCH_OP_UNWATCH);
 	if (!IS_ERR(obj_request))
 		rbd_obj_request_put(obj_request);
 	else
diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h
index f180883..8c4ba9a 100644
--- a/include/linux/ceph/osd_client.h
+++ b/include/linux/ceph/osd_client.h
@@ -106,9 +106,8 @@ struct ceph_osd_req_op {
 		struct {
 			u64 cookie;
 			u64 ver;
-			u32 prot_ver;
-			u32 timeout;
-			__u8 flag;
+			__u8 op;
+			u32 gen;
 		} watch;
 		struct {
 			u64 expected_object_size;
@@ -311,7 +310,7 @@ extern int osd_req_op_xattr_init(struct ceph_osd_request *osd_req, unsigned int
 				 size_t size, u8 cmp_op, u8 cmp_mode);
 extern void osd_req_op_watch_init(struct ceph_osd_request *osd_req,
 					unsigned int which, u16 opcode,
-					u64 cookie, u64 version, int flag);
+					u8 watch_opcode, u64 cookie);
 extern void osd_req_op_alloc_hint_init(struct ceph_osd_request *osd_req,
 				       unsigned int which,
 				       u64 expected_object_size,
diff --git a/include/linux/ceph/rados.h b/include/linux/ceph/rados.h
index 2f822dc..7d3721f 100644
--- a/include/linux/ceph/rados.h
+++ b/include/linux/ceph/rados.h
@@ -417,6 +417,16 @@ enum {
 
 #define RADOS_NOTIFY_VER	1
 
+enum {
+	CEPH_OSD_WATCH_OP_UNWATCH = 0,
+	CEPH_OSD_WATCH_OP_LEGACY_WATCH = 1,
+	/* note: use only ODD ids to prevent pre-giant code from
+	 * interpreting the op as UNWATCH */
+	CEPH_OSD_WATCH_OP_WATCH = 3,
+	CEPH_OSD_WATCH_OP_RECONNECT = 5,
+	CEPH_OSD_WATCH_OP_PING = 7,
+};
+
 /*
  * an individual object operation.  each may be accompanied by some data
  * payload
@@ -450,8 +460,9 @@ struct ceph_osd_op {
 	        } __attribute__ ((packed)) snap;
 		struct {
 			__le64 cookie;
-			__le64 ver;
-			__u8 flag;	/* 0 = unwatch, 1 = watch */
+			__le64 ver;	/* no longer used */
+			__u8 op;	/* CEPH_OSD_WATCH_OP_* */
+			__u32 gen;	/* registration generation */
 		} __attribute__ ((packed)) watch;
 		struct {
 			__le64 offset, length;
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index 0ce860d..cfdb6aa 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -581,18 +581,17 @@ int osd_req_op_xattr_init(struct ceph_osd_request *osd_req, unsigned int which,
 }
 EXPORT_SYMBOL(osd_req_op_xattr_init);
 
-void osd_req_op_watch_init(struct ceph_osd_request *osd_req,
-				unsigned int which, u16 opcode,
-				u64 cookie, u64 version, int flag)
+void osd_req_op_watch_init(struct ceph_osd_request *osd_req, unsigned int which,
+			   u16 opcode, u8 watch_opcode, u64 cookie)
 {
 	struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, opcode);
 
 	BUG_ON(opcode != CEPH_OSD_OP_NOTIFY_ACK && opcode != CEPH_OSD_OP_WATCH);
 
 	op->watch.cookie = cookie;
-	op->watch.ver = version;
-	if (opcode == CEPH_OSD_OP_WATCH && flag)
-		op->watch.flag = (u8)1;
+	op->watch.ver = 0;
+	op->watch.op = watch_opcode;
+	op->watch.gen = 0;
 }
 EXPORT_SYMBOL(osd_req_op_watch_init);
 
@@ -703,7 +702,8 @@ static u64 osd_req_encode_op(struct ceph_osd_request *req,
 	case CEPH_OSD_OP_WATCH:
 		dst->watch.cookie = cpu_to_le64(src->watch.cookie);
 		dst->watch.ver = cpu_to_le64(src->watch.ver);
-		dst->watch.flag = src->watch.flag;
+		dst->watch.op = src->watch.op;
+		dst->watch.gen = cpu_to_le32(src->watch.gen);
 		break;
 	case CEPH_OSD_OP_SETALLOCHINT:
 		dst->alloc_hint.expected_object_size =
-- 
1.8.3.1


^ permalink raw reply related	[flat|nested] 19+ messages in thread

* [PATCH 08/10] ceph/rbd: add notify support
  2015-04-28 22:05 [PATCH 00/10] ceph/rbd: initial support for lio ha mchristi
                   ` (6 preceding siblings ...)
  2015-04-28 22:05 ` [PATCH 07/10] ceph/rbd: update watch-notify ceph_osd_op mchristi
@ 2015-04-28 22:05 ` mchristi
  2015-04-30  0:50   ` Jason Dillaman
  2015-04-28 22:05 ` [PATCH 09/10] rbd: add rados locking mchristi
  2015-04-28 22:05 ` [PATCH 10/10] rbd: distribute scsi pr info through rbd class calls mchristi
  9 siblings, 1 reply; 19+ messages in thread
From: mchristi @ 2015-04-28 22:05 UTC (permalink / raw)
  To: ceph-devel

From: Mike Christie <michaelc@cs.wisc.edu>

This adds support for rados's notify call. It is being used to notify
scsi PR and TMF watchers that the scsi pr info has changed, or that
we want to sync up on TMF execution (currently only LUN_RESET).

I did not add support for the notify2 recv buffer as I am not using
it yet. Currently, this results in log messages like:

kernel: libceph: read_partial_message skipping long message (48 > 0)

This commit message used to say, I was going to add it later as I need
it to be able to send scsi sense codes, but I guess Doug is going to do
that now. Thanks Doug!

Signed-off-by: Mike Christie <michaelc@cs.wisc.edu>
---
 drivers/block/rbd.c             | 118 ++++++++++++++++++++++++++++++++++++----
 include/linux/ceph/osd_client.h |  16 +++++-
 include/linux/ceph/rados.h      |   9 +++
 net/ceph/osd_client.c           |  51 +++++++++++++++++
 4 files changed, 182 insertions(+), 12 deletions(-)

diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index a70447c..aed38c0 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -132,6 +132,21 @@ static int atomic_dec_return_safe(atomic_t *v)
 #define DEV_NAME_LEN		32
 #define MAX_INT_FORMAT_WIDTH	((5 * sizeof (int)) / 2 + 1)
 
+enum rbd_notify_op {
+	RBD_NOTIFY_OP_ACQUIRED_LOCK	= 0,
+	RBD_NOTIFY_OP_RELEASED_LOCK	= 1,
+	RBD_NOTIFY_OP_REQUEST_LOCK	= 2,
+	RBD_NOTIFY_OP_HEADER_UPDATE	= 3,
+	RBD_NOTIFY_OP_ASYNC_PROGRESS	= 4,
+	RBD_NOTIFY_OP_ASYNC_COMPLETE	= 5,
+	RBD_NOTIFY_OP_FLATTEN		= 6,
+	RBD_NOTIFY_OP_RESIZE		= 7,
+	RBD_NOTIFY_OP_SNAP_CREATE	= 8,
+	RBD_NOTIFY_OP_SCSI_PR_UPDATE	= 9,
+	RBD_NOTIFY_OP_SCSI_LUN_RESET_START	= 10,
+	RBD_NOTIFY_OP_SCSI_LUN_RESET_COMPLETE	= 11,
+};
+
 /*
  * block device image metadata (in-memory version)
  */
@@ -1847,6 +1862,7 @@ static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
 	case CEPH_OSD_OP_CALL:
 	case CEPH_OSD_OP_NOTIFY_ACK:
 	case CEPH_OSD_OP_WATCH:
+	case CEPH_OSD_OP_NOTIFY:
 		rbd_osd_trivial_callback(obj_request);
 		break;
 	default:
@@ -3087,27 +3103,51 @@ static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, s32 return_code,
 {
 	struct rbd_device *rbd_dev = (struct rbd_device *)data;
 	int ret;
+	u32 len, notify_op = -1;
+	void *p = payload, *end = p + payload_len;
 
 	if (!rbd_dev)
 		return;
 
-	dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
+	dout("%s: \"%s\" notify_id %llu opcode %u rc %d bl len %u\n", __func__,
 		rbd_dev->header_name, (unsigned long long)notify_id,
-		(unsigned int)opcode);
+		(unsigned int)opcode, return_code, payload_len);
 
-	/*
-	 * Until adequate refresh error handling is in place, there is
-	 * not much we can do here, except warn.
-	 *
-	 * See http://tracker.ceph.com/issues/5040
-	 */
-	ret = rbd_dev_refresh(rbd_dev);
-	if (ret)
-		rbd_warn(rbd_dev, "refresh failed: %d", ret);
+	if (payload_len) {
+		if (ceph_start_decoding(&p, end, 1, &len))
+			goto decode_fail;
+		ceph_decode_32_safe(&p, end, notify_op, decode_fail);
+	}
+
+	if (opcode == CEPH_WATCH_EVENT_DISCONNECT)
+		return;
+
+	dout("%s: \"%s\" RBD notify op %u\n", __func__, rbd_dev->header_name,
+	     notify_op);
+
+	switch (notify_op) {
+	case RBD_NOTIFY_OP_SCSI_PR_UPDATE:
+		break;
+	default:
+		/*
+		 * Until adequate refresh error handling is in place, there is
+		 * not much we can do here, except warn.
+		 *
+		 * See http://tracker.ceph.com/issues/5040
+		 */
+		ret = rbd_dev_refresh(rbd_dev);
+		if (ret)
+			rbd_warn(rbd_dev, "refresh failed: %d", ret);
+	}
 
 	ret = rbd_obj_notify_ack_sync(rbd_dev, notify_id);
 	if (ret)
 		rbd_warn(rbd_dev, "notify_ack ret %d", ret);
+	return;
+
+decode_fail:
+	rbd_warn(rbd_dev, "Invalid op/notify_op %u/%u", (unsigned int)opcode,
+		 notify_op);
 }
 
 /*
@@ -3260,6 +3300,12 @@ static int rbd_obj_request_sync(struct rbd_device *rbd_dev,
 							   inbound_size,
 							   0, false, false);
 			break;
+		case CEPH_OSD_OP_NOTIFY:
+			osd_req_op_notify_response_data_pages(
+							obj_request->osd_req,
+							0, pages, inbound_size,
+							0, false, false);
+			break;
 		default:
 			BUG();
 		}
@@ -3279,6 +3325,11 @@ static int rbd_obj_request_sync(struct rbd_device *rbd_dev,
 							obj_request->osd_req, 0,
 							pagelist);
 			break;
+		case CEPH_OSD_OP_NOTIFY:
+			osd_req_op_notify_request_data_pagelist(
+							obj_request->osd_req, 0,
+							pagelist);
+			break;
 		default:
 			BUG();
 		}
@@ -3349,6 +3400,51 @@ out:
 	return ret;
 }
 
+static int rbd_obj_notify_scsi_event_sync(struct rbd_device *rbd_dev,
+					  u32 notify_op,
+					  u32 notify_timeout)
+{
+	struct rbd_obj_request *obj_request;
+	int ret = -ENOMEM;
+	struct {
+		__le32 version;
+		__le32 timeout;
+		__le32 buf_len;
+		/* payload only supports basic ops where we just send the op */
+		u8 curr_ver;
+		u8 compat_ver;
+		__le32 len;
+		__le32 notify_op;
+	} __attribute__ ((packed)) notify_buf = { 0 };
+
+	notify_buf.version = cpu_to_le32(0);
+	notify_buf.timeout = cpu_to_le32(notify_timeout);
+	notify_buf.buf_len = cpu_to_le32(10);
+	notify_buf.curr_ver = 2;
+	notify_buf.compat_ver = 1;
+	notify_buf.len = cpu_to_le32(sizeof(__le32));
+	notify_buf.notify_op = cpu_to_le32(notify_op);
+
+	obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
+					     OBJ_REQUEST_PAGES);
+	if (!obj_request)
+		return -ENOMEM;
+
+	obj_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_WRITE,
+						  1, obj_request);
+	if (!obj_request->osd_req)
+		goto out;
+
+	osd_req_op_notify_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY,
+			       rbd_dev->watch_event->cookie);
+
+	ret = rbd_obj_request_sync(rbd_dev, obj_request, &notify_buf,
+				   sizeof(notify_buf), NULL, 0);
+out:
+	rbd_obj_request_put(obj_request);
+	return ret;
+}
+
 static void rbd_queue_workfn(struct work_struct *work)
 {
 	struct request *rq = blk_mq_rq_from_pdu(work);
diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h
index 8c4ba9a..d512dfa 100644
--- a/include/linux/ceph/osd_client.h
+++ b/include/linux/ceph/osd_client.h
@@ -110,6 +110,11 @@ struct ceph_osd_req_op {
 			u32 gen;
 		} watch;
 		struct {
+			u64 cookie;
+			struct ceph_osd_data request_data;
+			struct ceph_osd_data response_data;
+		} notify;
+		struct {
 			u64 expected_object_size;
 			u64 expected_write_size;
 		} alloc_hint;
@@ -301,7 +306,16 @@ extern void osd_req_op_cls_response_data_pages(struct ceph_osd_request *,
 					struct page **pages, u64 length,
 					u32 alignment, bool pages_from_pool,
 					bool own_pages);
-
+extern void osd_req_op_notify_request_data_pagelist(struct ceph_osd_request *,
+					unsigned int which,
+					struct ceph_pagelist *pagelist);
+extern void osd_req_op_notify_response_data_pages(struct ceph_osd_request *,
+					unsigned int which,
+					struct page **pages, u64 length,
+					u32 alignment, bool pages_from_pool,
+					bool own_pages);
+extern void osd_req_op_notify_init(struct ceph_osd_request *osd_req,
+				   unsigned int which, u16 opcode, u64 cookie);
 extern void osd_req_op_cls_init(struct ceph_osd_request *osd_req,
 					unsigned int which, u16 opcode,
 					const char *class, const char *method);
diff --git a/include/linux/ceph/rados.h b/include/linux/ceph/rados.h
index 7d3721f..cae82b36 100644
--- a/include/linux/ceph/rados.h
+++ b/include/linux/ceph/rados.h
@@ -427,6 +427,12 @@ enum {
 	CEPH_OSD_WATCH_OP_PING = 7,
 };
 
+enum {
+	CEPH_WATCH_EVENT_NOTIFY			= 1, /* notifying watcher */
+	CEPH_WATCH_EVENT_NOTIFY_COMPLETE	= 2, /* notifier notified when done */
+	CEPH_WATCH_EVENT_DISCONNECT		= 3, /* we were disconnected */
+};
+
 /*
  * an individual object operation.  each may be accompanied by some data
  * payload
@@ -465,6 +471,9 @@ struct ceph_osd_op {
 			__u32 gen;	/* registration generation */
 		} __attribute__ ((packed)) watch;
 		struct {
+			__le64 cookie;
+		} __attribute__ ((packed)) notify;
+		struct {
 			__le64 offset, length;
 			__le64 src_offset;
 		} __attribute__ ((packed)) clonerange;
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index cfdb6aa..8e90ee3 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -243,6 +243,29 @@ void osd_req_op_cls_response_data_pages(struct ceph_osd_request *osd_req,
 }
 EXPORT_SYMBOL(osd_req_op_cls_response_data_pages);
 
+void osd_req_op_notify_response_data_pages(struct ceph_osd_request *osd_req,
+			unsigned int which, struct page **pages, u64 length,
+			u32 alignment, bool pages_from_pool, bool own_pages)
+{
+	struct ceph_osd_data *osd_data;
+
+	osd_data = osd_req_op_data(osd_req, which, notify, response_data);
+	ceph_osd_data_pages_init(osd_data, pages, length, alignment,
+				pages_from_pool, own_pages);
+}
+EXPORT_SYMBOL(osd_req_op_notify_response_data_pages);
+
+void osd_req_op_notify_request_data_pagelist(
+			struct ceph_osd_request *osd_req,
+			unsigned int which, struct ceph_pagelist *pagelist)
+{
+	struct ceph_osd_data *osd_data;
+
+	osd_data = osd_req_op_data(osd_req, which, notify, request_data);
+	ceph_osd_data_pagelist_init(osd_data, pagelist);
+}
+EXPORT_SYMBOL(osd_req_op_notify_request_data_pagelist);
+
 static u64 ceph_osd_data_length(struct ceph_osd_data *osd_data)
 {
 	switch (osd_data->type) {
@@ -292,6 +315,10 @@ static void osd_req_op_data_release(struct ceph_osd_request *osd_req,
 		ceph_osd_data_release(&op->cls.request_data);
 		ceph_osd_data_release(&op->cls.response_data);
 		break;
+	case CEPH_OSD_OP_NOTIFY:
+		ceph_osd_data_release(&op->notify.request_data);
+		ceph_osd_data_release(&op->notify.response_data);
+		break;
 	case CEPH_OSD_OP_SETXATTR:
 	case CEPH_OSD_OP_CMPXATTR:
 		ceph_osd_data_release(&op->xattr.osd_data);
@@ -581,6 +608,16 @@ int osd_req_op_xattr_init(struct ceph_osd_request *osd_req, unsigned int which,
 }
 EXPORT_SYMBOL(osd_req_op_xattr_init);
 
+void osd_req_op_notify_init(struct ceph_osd_request *osd_req, unsigned int which,
+			    u16 opcode, u64 cookie)
+{
+	struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, opcode);
+
+	BUG_ON(opcode != CEPH_OSD_OP_NOTIFY);
+	op->watch.cookie = cookie;
+}
+EXPORT_SYMBOL(osd_req_op_notify_init);
+
 void osd_req_op_watch_init(struct ceph_osd_request *osd_req, unsigned int which,
 			   u16 opcode, u8 watch_opcode, u64 cookie)
 {
@@ -698,6 +735,20 @@ static u64 osd_req_encode_op(struct ceph_osd_request *req,
 		break;
 	case CEPH_OSD_OP_STARTSYNC:
 		break;
+	case CEPH_OSD_OP_NOTIFY:
+		dst->notify.cookie = cpu_to_le64(src->notify.cookie);
+
+		osd_data = &src->notify.request_data;
+		data_length = ceph_osd_data_length(osd_data);
+		if (data_length) {
+			BUG_ON(osd_data->type == CEPH_OSD_DATA_TYPE_NONE);
+			ceph_osdc_msg_data_add(req->r_request, osd_data);
+			src->payload_len += data_length;
+			request_data_len += data_length;
+		}
+		osd_data = &src->notify.response_data;
+		ceph_osdc_msg_data_add(req->r_reply, osd_data);
+		break;
 	case CEPH_OSD_OP_NOTIFY_ACK:
 	case CEPH_OSD_OP_WATCH:
 		dst->watch.cookie = cpu_to_le64(src->watch.cookie);
-- 
1.8.3.1


^ permalink raw reply related	[flat|nested] 19+ messages in thread

* [PATCH 09/10] rbd: add rados locking
  2015-04-28 22:05 [PATCH 00/10] ceph/rbd: initial support for lio ha mchristi
                   ` (7 preceding siblings ...)
  2015-04-28 22:05 ` [PATCH 08/10] ceph/rbd: add notify support mchristi
@ 2015-04-28 22:05 ` mchristi
  2015-04-28 22:05 ` [PATCH 10/10] rbd: distribute scsi pr info through rbd class calls mchristi
  9 siblings, 0 replies; 19+ messages in thread
From: mchristi @ 2015-04-28 22:05 UTC (permalink / raw)
  To: ceph-devel

From: Mike Christie <michaelc@cs.wisc.edu>

This patch adds support for rados lock, unlock and break lock.
This will be used to sync up scsi pr info manipulation and
TMF execution.

It also adds support for list locks and get lock info, but
that and the sysfs support is only for debugging. I do not
think that we want the sysfs interface for the final version
and will remove it in the final patchset. I just kept it in
in case people wanted to test with it. Do we want it in debugfs though?

Signed-off-by: Mike Christie <michaelc@cs.wisc.edu>
---
 drivers/block/rbd.c | 478 ++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 478 insertions(+)

diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index aed38c0..f4e7b0f 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -32,6 +32,7 @@
 #include <linux/ceph/osd_client.h>
 #include <linux/ceph/mon_client.h>
 #include <linux/ceph/decode.h>
+#include <linux/ceph/msgr.h>
 #include <linux/parser.h>
 #include <linux/bsearch.h>
 
@@ -44,6 +45,7 @@
 #include <linux/slab.h>
 #include <linux/idr.h>
 #include <linux/workqueue.h>
+#include <linux/in6.h>
 
 #include "rbd_types.h"
 
@@ -123,6 +125,8 @@ static int atomic_dec_return_safe(atomic_t *v)
 
 #define RBD_FEATURES_SUPPORTED	(RBD_FEATURES_ALL)
 
+#define RBD_MAX_LOCK_STR_LEN	16
+
 /*
  * An RBD device name will be "rbd#", where the "rbd" comes from
  * RBD_DRV_NAME above, and # is a unique integer identifier.
@@ -443,6 +447,11 @@ static ssize_t rbd_remove_single_major(struct bus_type *bus, const char *buf,
 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping);
 static void rbd_spec_put(struct rbd_spec *spec);
 
+typedef int (locker_iter_fn) (struct rbd_device *rbd_dev, char *name,
+			      u8 entity_type, u64 entity_num, char *cookie,
+			      struct ceph_entity_addr *addr,
+			      struct timespec *ts, char *desc);
+
 static int rbd_dev_id_to_minor(int dev_id)
 {
 	return dev_id << RBD_SINGLE_MAJOR_PART_SHIFT;
@@ -4085,6 +4094,467 @@ static ssize_t rbd_image_refresh(struct device *dev,
 	return size;
 }
 
+/**
+ * rbd_dev_lock - grab rados lock for device
+ * @rbd_dev: device to take lock for
+ * @name: the name of the lock
+ * @type: lock type (RADOS_LOCK_EXCLUSIVE or RADOS_LOCK_SHARED)
+ * @cookie: user-defined identifier for this instance of the lock
+ * @tag: if RADOS_LOCK_SHARED, tag of the lock. NULL if non shared.
+ * desc: user-defined lock description
+ * @flags: lock flags
+ */
+static int rbd_dev_lock(struct rbd_device *rbd_dev, char *name, u8 type,
+			char *cookie, char *tag, char *desc, u8 flags)
+{
+	int lock_op_buf_size;
+	int name_len = strlen(name);
+	int cookie_len = strlen(cookie);
+	int tag_len = strlen(tag);
+	int desc_len = strlen(desc);
+	void *lock_op_buf, *p, *end;
+	struct timespec mtime;
+	int ret;
+
+	lock_op_buf_size = name_len + sizeof(__le32) +
+				cookie_len + sizeof(__le32) +
+				tag_len + sizeof(__le32) +
+				desc_len + sizeof(__le32) +
+				sizeof(mtime) +
+				/* flag and type */
+				sizeof(u8) + sizeof(u8) +
+				CEPH_ENCODING_START_BLK_LEN;
+	p = lock_op_buf = kzalloc(lock_op_buf_size, GFP_KERNEL);
+	if (!p)
+		return -ENOMEM;
+
+	end = p + lock_op_buf_size;
+
+	ceph_start_encoding(&p, 1, 1,
+			    lock_op_buf_size - CEPH_ENCODING_START_BLK_LEN);
+	/* encode cls_lock_lock_op struct */
+	ceph_encode_string(&p, end, name, name_len);
+	ceph_encode_8(&p, type);
+	ceph_encode_string(&p, end, cookie, cookie_len);
+	ceph_encode_string(&p, end, tag, tag_len);
+	ceph_encode_string(&p, end, desc, desc_len);
+	/* only support infinite duration */
+	memset(&mtime, 0, sizeof(mtime));
+	ceph_encode_timespec(p, &mtime);
+	p += sizeof(struct ceph_timespec);
+	ceph_encode_8(&p, flags);
+
+	dout("%s: %s %d %s %s %s %d\n", __func__,
+	     name, type, cookie, tag, desc, flags);
+
+	ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
+				  "lock", "lock", lock_op_buf,
+				  lock_op_buf_size, NULL, 0);
+	dout("%s: status %d\n", __func__, ret);
+	kfree(lock_op_buf);
+	return ret;
+}
+
+/**
+ * rbd_dev_unlock - release rados lock for device
+ * @rbd_dev: device to take lock for
+ * @name: the name of the lock
+ * @cookie: user-defined identifier for this instance of the lock
+ */
+static int rbd_dev_unlock(struct rbd_device *rbd_dev, char *name, char *cookie)
+{
+	int unlock_op_buf_size;
+	int name_len = strlen(name);
+	int cookie_len = strlen(cookie);
+	void *unlock_op_buf, *p, *end;
+	int ret;
+
+	unlock_op_buf_size = name_len + sizeof(__le32) +
+				cookie_len + sizeof(__le32) +
+				CEPH_ENCODING_START_BLK_LEN;
+	p = unlock_op_buf = kzalloc(unlock_op_buf_size, GFP_KERNEL);
+	if (!p)
+		return -ENOMEM;
+
+	end = p + unlock_op_buf_size;
+
+	ceph_start_encoding(&p, 1, 1,
+			    unlock_op_buf_size - CEPH_ENCODING_START_BLK_LEN);
+	/* encode cls_lock_unlock_op struct */
+	ceph_encode_string(&p, end, name, name_len);
+	ceph_encode_string(&p, end, cookie, cookie_len);
+
+	dout("%s: %s %s\n", __func__, name, cookie);
+	ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
+				  "lock", "unlock", unlock_op_buf,
+				  unlock_op_buf_size, NULL, 0);
+	dout("%s: status %d\n", __func__, ret);
+	kfree(unlock_op_buf);
+	return ret;
+}
+
+/* decode a cls_lock_get_info_reply */
+static int rbd_dev_parse_lockers(struct rbd_device *rbd_dev, char *name,
+				void *p, void *end, locker_iter_fn *iter_fn)
+{
+	int i, ret;
+	struct ceph_entity_addr addr;
+	struct timespec ts;
+	struct ceph_timespec ceph_ts;
+	char *cookie, *desc;
+	size_t str_len;
+	u32 num_lockers, len;
+	u64 num;
+	u8 type;
+
+	ret = ceph_start_decoding_compat(&p, end, 1, 1, 1, &len);
+	if (ret)
+		return ret;
+	ceph_decode_32_safe(&p, end, num_lockers, einval);
+
+	dout("got %u lockers in struct len %u\n", num_lockers, len);
+	for (i = 0; i < num_lockers; i++) {
+		/* decode locker_id_t */
+		ret = ceph_start_decoding_compat(&p, end, 1, 1, 1, &len);
+		if (ret)
+			break;
+
+		ceph_decode_8_safe(&p, end, type, einval);
+		ceph_decode_64_safe(&p, end, num, einval);
+
+		cookie = ceph_extract_encoded_string(&p, end, &str_len,
+						     GFP_KERNEL);
+		if (IS_ERR(cookie)) {
+			ret = PTR_ERR(cookie);
+			goto fail;
+		}
+		/* decode locker_info_t */
+		ret = ceph_start_decoding_compat(&p, end, 1, 1, 1, &len);
+		if (ret)
+			goto free_cookie;
+
+		ceph_decode_copy_safe(&p, end, &ceph_ts, sizeof(ceph_ts),
+				      free_cookie);
+		ceph_decode_timespec(&ts, &ceph_ts);
+
+		ceph_decode_copy_safe(&p, end, &addr, sizeof(addr), free_cookie);
+		ceph_decode_addr(&addr);
+
+		desc = ceph_extract_encoded_string(&p, end, &str_len,
+						   GFP_KERNEL);
+		if (IS_ERR(desc)) {
+			ret = PTR_ERR(desc);
+			goto free_cookie;
+		}
+
+		iter_fn(rbd_dev, name, type, num, cookie, &addr, &ts, desc);
+		kfree(cookie);
+		kfree(desc);
+	}
+
+	return 0;
+
+free_cookie:
+	kfree(cookie);
+einval:
+	if (!ret)
+		ret = -EINVAL;
+fail:
+	rbd_warn(rbd_dev, "Could not decode lockers for %s\n", name);
+	return ret;
+}
+
+static int rbd_dev_lock_for_each_locker(struct rbd_device *rbd_dev, char *name,
+					locker_iter_fn *iter_fn)
+{
+	int get_info_op_buf_size;
+	int name_len = strlen(name);
+	void *get_info_op_buf, *p, *end;
+	void *get_info_reply_buf;
+	struct page *reply_pg;
+	int ret;
+
+	get_info_op_buf_size = name_len + sizeof(__le32) +
+				CEPH_ENCODING_START_BLK_LEN;
+	p = get_info_op_buf = kzalloc(get_info_op_buf_size, GFP_KERNEL);
+	if (!p)
+		return -ENOMEM;
+
+	reply_pg = alloc_page(GFP_KERNEL);
+	if (!reply_pg) {
+		ret = -ENOMEM;
+		goto free_info_buf;
+	}
+	get_info_reply_buf = page_address(reply_pg);
+
+	ceph_start_encoding(&p, 1, 1,
+			    get_info_op_buf_size - CEPH_ENCODING_START_BLK_LEN);
+	/* encode cls_lock_get_info struct */
+	end = p + get_info_op_buf_size;
+	ceph_encode_string(&p, end, name, name_len);
+
+	dout("%s: lock %s\n", __func__, name);
+	ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
+				  "lock", "get_info", get_info_op_buf,
+				  get_info_op_buf_size, get_info_reply_buf,
+				  PAGE_SIZE);
+	dout("%s: status %d\n", __func__, ret);
+	if (ret < 0)
+		goto free_pg;
+
+	p = get_info_reply_buf;
+	end = p + ret;
+
+	ret = rbd_dev_parse_lockers(rbd_dev, name, p, end, iter_fn);
+
+free_pg:
+	__free_page(reply_pg);
+free_info_buf:
+	kfree(get_info_op_buf);
+	return ret;
+}
+
+/**
+ * rbd_dev_print_lock_info - print lock info
+ * @rbd_dev: device to take lock for
+ * @name: the name of the lock
+ * @entity_type: ceph entity type (CEPH_ENTITY_TYPE_*)
+ * @entity_num: ceph entity id
+ * @cookie: user-defined identifier for this instance of the lock
+ * @addr: entity address
+ * @ts: lock timespec
+ * @desc: lock description
+ */
+static int rbd_dev_print_lock_info(struct rbd_device *rbd_dev, char *name,
+				   u8 type, u64 num, char *cookie,
+				   struct ceph_entity_addr *addr,
+				   struct timespec *ts, char *desc)
+{
+	struct sockaddr_in6 *sin6;
+	struct sockaddr_in *sin;
+
+	switch (addr->in_addr.ss_family) {
+	case AF_INET:
+		sin = (struct sockaddr_in *)&addr->in_addr;
+		rbd_warn(rbd_dev, "%s %s %s.%llu %s %pI4\n",
+			 name, cookie, ceph_entity_type_name(type), num, desc,
+			 &sin->sin_addr.s_addr);
+		break;
+	case AF_INET6:
+		sin6 = (struct sockaddr_in6 *)&addr->in_addr;
+		rbd_warn(rbd_dev, "%s %s %s.%llu %s %pI6\n",
+			 name, cookie, ceph_entity_type_name(type), num, desc,
+			 &sin6->sin6_addr);
+		break;
+	default:
+		return -EINVAL;
+	}
+
+	return 0;
+}
+
+/**
+ * rbd_dev_print_locks - print all locks for dev
+ * @rbd_dev: device to take lock for
+ */
+static size_t rbd_dev_print_locks(struct rbd_device *rbd_dev)
+{
+	int ret, i;
+	void *p, *end;
+	char *lock;
+	size_t lock_len;
+	u32 num_locks, len;
+	struct page *pg;
+
+	pg = alloc_page(GFP_KERNEL);
+	if (!pg)
+		return -ENOMEM;
+	p = page_address(pg);
+
+	ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
+				"lock", "list_locks", NULL, 0,
+				p, PAGE_SIZE);
+	if (ret < 0)
+		goto free_list_locks_pg;
+
+	end = p + ret;
+	ret = ceph_start_decoding_compat(&p, end, 1, 1, 1, &len);
+	if (ret)
+		goto free_list_locks_pg;
+
+	ceph_decode_32_safe(&p, end, num_locks, einval);
+	dout("got %u locks in struct len %u\n", num_locks, len);
+
+	for (i = 0; i < num_locks; i++) {
+		lock = ceph_extract_encoded_string(&p, end, &lock_len,
+						   GFP_KERNEL);
+		if (IS_ERR(lock)) {
+			rbd_warn(rbd_dev,
+				 "Could not print info for all locks\n");
+			ret = PTR_ERR(lock);
+			goto free_list_locks_pg;
+		}
+
+		rbd_dev_lock_for_each_locker(rbd_dev, lock,
+					     rbd_dev_print_lock_info);
+		kfree(lock);
+	}
+	ret = 0;
+	goto free_list_locks_pg;
+
+einval:
+	ret = -EINVAL;
+free_list_locks_pg:
+	__free_page(pg);
+	return ret;
+}
+
+/**
+ * rbd_dev_break_lock - release rados lock for device for specified client
+ * @rbd_dev: device to take lock for
+ * @name: the name of the lock
+ * @entity_type: ceph entity type (CEPH_ENTITY_TYPE_*)
+ * @entity_num: ceph entity id
+ * @cookie: user-defined identifier for this instance of the lock
+ * @addr: entity address
+ * @ts: lock timespec
+ * @desc: lock description
+ */
+static int rbd_dev_break_lock(struct rbd_device *rbd_dev, char *name,
+			      u8 type, u64 num, char *cookie,
+			      struct ceph_entity_addr *addr,
+			      struct timespec *ts, char *desc)
+{
+	int break_lock_op_buf_size;
+	int name_len = strlen(name);
+	int cookie_len = strlen(cookie);
+	void *break_lock_op_buf, *p, *end;
+	int ret;
+
+	break_lock_op_buf_size = name_len + sizeof(__le32) +
+				cookie_len + sizeof(__le32) +
+				sizeof(u8) + sizeof(__le64) +
+				CEPH_ENCODING_START_BLK_LEN;
+	p = break_lock_op_buf = kzalloc(break_lock_op_buf_size, GFP_KERNEL);
+	if (!p)
+		return -ENOMEM;
+
+	end = p + break_lock_op_buf_size;
+
+	ceph_start_encoding(&p, 1, 1,
+			break_lock_op_buf_size - CEPH_ENCODING_START_BLK_LEN);
+	/* encode cls_lock_break_op struct */
+	ceph_encode_string(&p, end, name, name_len);
+	ceph_encode_8(&p, type);
+	ceph_encode_64(&p, num);
+	ceph_encode_string(&p, end, cookie, cookie_len);
+
+	dout("%s: lock %s type %hu id %llu cookie %s desc %s\n",
+	     __func__, name, type, num, cookie, desc);
+
+	ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
+				  "lock", "break_lock", break_lock_op_buf,
+				  break_lock_op_buf_size, NULL, 0);
+	dout("%s: status %d\n", __func__, ret);
+	kfree(break_lock_op_buf);
+	return ret;
+}
+
+static int rbd_dev_break_locks(struct rbd_device *rbd_dev, char *name)
+{
+	return rbd_dev_lock_for_each_locker(rbd_dev, name, rbd_dev_break_lock);
+}
+
+/*
+ * TODO: remove me or move to debugfs for final merge. I don't think we
+ * need this for upstream since there is already the userspace API
+ * to use from there. These are just for testing the kernel.
+ */
+static ssize_t rbd_lock_set(struct device *dev, struct device_attribute *attr,
+			    const char *buf, size_t size)
+{
+	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
+	char name[RBD_MAX_LOCK_STR_LEN];
+	char cookie[RBD_MAX_LOCK_STR_LEN];
+	char desc[RBD_MAX_LOCK_STR_LEN];
+	int ret;
+
+	ret = sscanf(buf, "%15s %15s %15s\n", name, cookie, desc);
+	if (ret != 3) {
+		rbd_warn(rbd_dev, "Invalid number of params. Got %d\n", ret);
+		return -EINVAL;
+	} else if (!strlen(name) || !strlen(cookie) || !strlen(desc)) {
+		rbd_warn(rbd_dev, "missing param\n");
+		return -EINVAL;
+	}
+
+	ret = rbd_dev_lock(rbd_dev, name, 1, cookie, "", desc, 0);
+	if (ret)
+		return ret;
+	else
+		return size;
+}
+
+static ssize_t rbd_unlock_set(struct device *dev, struct device_attribute *attr,
+			      const char *buf, size_t size)
+{
+	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
+	char name[RBD_MAX_LOCK_STR_LEN];
+	char cookie[RBD_MAX_LOCK_STR_LEN];
+	int ret;
+
+	ret = sscanf(buf, "%15s %15s\n", name, cookie);
+	if (ret != 2) {
+		rbd_warn(rbd_dev, "Invalid number of params. Got %d\n", ret);
+		return -EINVAL;
+	} else if (!strlen(name) || !strlen(cookie)) {
+		rbd_warn(rbd_dev, "missing param\n");
+		return -EINVAL;
+	}
+
+	ret = rbd_dev_unlock(rbd_dev, name, cookie);
+	if (ret)
+		return ret;
+	else
+		return size;
+}
+
+static ssize_t rbd_break_locks_set(struct device *dev,
+				   struct device_attribute *attr,
+				   const char *buf, size_t size)
+{
+	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
+	char name[RBD_MAX_LOCK_STR_LEN];
+	int ret;
+
+	ret = sscanf(buf, "%15s\n", name);
+	if (ret != 1) {
+		rbd_warn(rbd_dev, "Invalid number of params. Got %d\n", ret);
+		return -EINVAL;
+	} else if (!strlen(name)) {
+		rbd_warn(rbd_dev, "missing param\n");
+		return -EINVAL;
+	}
+
+	ret = rbd_dev_break_locks(rbd_dev, name);
+	if (ret)
+		return ret;
+	else
+		return size;
+}
+
+static ssize_t rbd_lock_dump_info_set(struct device *dev,
+				      struct device_attribute *attr,
+				      const char *buf, size_t size)
+{
+	int ret = rbd_dev_print_locks(dev_to_rbd_dev(dev));
+
+	if (ret)
+		return ret;
+	else
+		return size;
+}
+
 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
@@ -4097,6 +4567,10 @@ static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
+static DEVICE_ATTR(lock, S_IWUSR, NULL, rbd_lock_set);
+static DEVICE_ATTR(unlock, S_IWUSR, NULL, rbd_unlock_set);
+static DEVICE_ATTR(break_locks, S_IWUSR, NULL, rbd_break_locks_set);
+static DEVICE_ATTR(dump_lock_info, S_IWUSR, NULL, rbd_lock_dump_info_set);
 
 static struct attribute *rbd_attrs[] = {
 	&dev_attr_size.attr,
@@ -4111,6 +4585,10 @@ static struct attribute *rbd_attrs[] = {
 	&dev_attr_current_snap.attr,
 	&dev_attr_parent.attr,
 	&dev_attr_refresh.attr,
+	&dev_attr_lock.attr,
+	&dev_attr_unlock.attr,
+	&dev_attr_break_locks.attr,
+	&dev_attr_dump_lock_info.attr,
 	NULL
 };
 
-- 
1.8.3.1


^ permalink raw reply related	[flat|nested] 19+ messages in thread

* [PATCH 10/10] rbd: distribute scsi pr info through rbd class calls
  2015-04-28 22:05 [PATCH 00/10] ceph/rbd: initial support for lio ha mchristi
                   ` (8 preceding siblings ...)
  2015-04-28 22:05 ` [PATCH 09/10] rbd: add rados locking mchristi
@ 2015-04-28 22:05 ` mchristi
  2015-04-30  7:56   ` Christoph Hellwig
  9 siblings, 1 reply; 19+ messages in thread
From: mchristi @ 2015-04-28 22:05 UTC (permalink / raw)
  To: ceph-devel

From: Mike Christie <michaelc@cs.wisc.edu>

This stores the LIO PR info in the rbd header. Other
clients (LIO nodes) are notified when the data changes,
so they can update their info.

I added a sysfs file to test it here temporarily. I will
remove it in the final version. The final patches will
have a LIO callout call the rados locking and these
functions instead.

Signed-off-by: Mike Christie <michaelc@cs.wisc.edu>
---
 drivers/block/rbd.c | 87 +++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 87 insertions(+)

diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index f4e7b0f..1fa4fd0 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -4465,6 +4465,72 @@ static int rbd_dev_break_locks(struct rbd_device *rbd_dev, char *name)
 	return rbd_dev_lock_for_each_locker(rbd_dev, name, rbd_dev_break_lock);
 }
 
+static size_t rbd_dev_set_scsi_pr_info(struct rbd_device *rbd_dev,
+				       const char *info)
+{
+	int info_len = strlen(info);
+	int info_buf_len = info_len + sizeof(__le32);
+	int ret;
+	void *info_buf, *p, *end;
+
+	dout("%s: set scsi pr info %s\n", __func__, info);
+
+	p = info_buf = kzalloc(info_buf_len, GFP_KERNEL);
+	if (!p)
+		return -ENOMEM;
+
+	end = p + info_buf_len;
+	ceph_encode_string(&p, end, info, info_len);
+
+	ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
+				  "rbd", "set_scsi_pr_info", info_buf,
+				  info_buf_len, NULL, 0);
+	if (!ret)
+		rbd_obj_notify_scsi_event_sync(rbd_dev,
+					       RBD_NOTIFY_OP_SCSI_PR_UPDATE, 5);
+	dout("%s: status %d\n", __func__, ret);
+	kfree(info_buf);
+	return ret;
+}
+
+static size_t rbd_dev_get_scsi_pr_info(struct rbd_device *rbd_dev,
+				       char *buf, int buf_len)
+{
+	__le64 snapid;
+	int ret;
+	void *info_buf, *p;
+
+	info_buf = p = kzalloc(buf_len, GFP_KERNEL);
+	if (!info_buf)
+		return -ENOMEM;
+
+	snapid = cpu_to_le64(rbd_dev->spec->snap_id);
+	ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name, "rbd",
+				  "get_scsi_pr_info", &snapid, sizeof(snapid),
+				  info_buf, buf_len);
+	if (ret > sizeof(__le32)) {
+		char *ret_buf;
+		size_t ret_buf_len;
+
+		ret_buf = ceph_extract_encoded_string(&p, p + buf_len,
+						      &ret_buf_len, GFP_KERNEL);
+		if (IS_ERR(ret_buf)) {
+			ret = PTR_ERR(ret_buf);
+			goto free_info_buf;
+		}
+		ret = strlcpy(buf, ret_buf, buf_len);
+		kfree(ret_buf);
+	} else if (ret > 0) {
+		ret = -EINVAL;
+		buf[0] = '\0';
+	}
+
+free_info_buf:
+	kfree(info_buf);
+	dout("%s: returned %d\n", __func__, ret);
+	return ret;
+}
+
 /*
  * TODO: remove me or move to debugfs for final merge. I don't think we
  * need this for upstream since there is already the userspace API
@@ -4555,6 +4621,24 @@ static ssize_t rbd_lock_dump_info_set(struct device *dev,
 		return size;
 }
 
+static ssize_t rbd_scsi_pr_info_set(struct device *dev,
+				    struct device_attribute *attr,
+				    const char *buf, size_t size)
+{
+	int ret = rbd_dev_set_scsi_pr_info(dev_to_rbd_dev(dev), buf);
+
+	if (ret)
+		return ret;
+	else
+		return size;
+}
+
+static ssize_t rbd_scsi_pr_info_show(struct device *dev,
+				     struct device_attribute *attr, char *buf)
+{
+	return rbd_dev_get_scsi_pr_info(dev_to_rbd_dev(dev), buf, PAGE_SIZE);
+}
+
 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
@@ -4571,6 +4655,8 @@ static DEVICE_ATTR(lock, S_IWUSR, NULL, rbd_lock_set);
 static DEVICE_ATTR(unlock, S_IWUSR, NULL, rbd_unlock_set);
 static DEVICE_ATTR(break_locks, S_IWUSR, NULL, rbd_break_locks_set);
 static DEVICE_ATTR(dump_lock_info, S_IWUSR, NULL, rbd_lock_dump_info_set);
+static DEVICE_ATTR(scsi_pr_info, S_IWUSR | S_IRUGO, rbd_scsi_pr_info_show,
+		   rbd_scsi_pr_info_set);
 
 static struct attribute *rbd_attrs[] = {
 	&dev_attr_size.attr,
@@ -4589,6 +4675,7 @@ static struct attribute *rbd_attrs[] = {
 	&dev_attr_unlock.attr,
 	&dev_attr_break_locks.attr,
 	&dev_attr_dump_lock_info.attr,
+	&dev_attr_scsi_pr_info.attr,
 	NULL
 };
 
-- 
1.8.3.1


^ permalink raw reply related	[flat|nested] 19+ messages in thread

* Re: [PATCH 01/10] rbd: add obj request execution helper
  2015-04-28 22:05 ` [PATCH 01/10] rbd: add obj request execution helper mchristi
@ 2015-04-29 21:33   ` Alex Elder
  0 siblings, 0 replies; 19+ messages in thread
From: Alex Elder @ 2015-04-29 21:33 UTC (permalink / raw)
  To: mchristi, ceph-devel

On 04/28/2015 05:05 PM, mchristi@redhat.com wrote:
> From: Mike Christie <michaelc@cs.wisc.edu>
>
> This patch breaks out the code that allocates buffers and executes
> the request from rbd_obj_method_sync, so future functions in this
> patchset can use it.
>
> It also adds support for OBJ_OP_WRITE requests which is needed for
> the locking functions which will be added in the next patches.

I would rather see the restructuring you do here (creation of
rbd_obj_request_sync()) be done in a way that preserved exactly
the same functionality.  Then, in a second patch, you should
add the new ability to allocate a page vector for the inbound
data.  This is only a comment on the composition of your series,
not the content of this patch (which otherwise looks good).

A few more remarks below.

					-Alex

> Signed-off-by: Mike Christie <michaelc@cs.wisc.edu>
> ---
>   drivers/block/rbd.c | 156 ++++++++++++++++++++++++++++++++--------------------
>   1 file changed, 95 insertions(+), 61 deletions(-)
>
> diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
> index b40af32..fafe558 100644
> --- a/drivers/block/rbd.c
> +++ b/drivers/block/rbd.c
> @@ -3224,89 +3224,123 @@ static void rbd_dev_header_unwatch_sync(struct rbd_device *rbd_dev)
>   }
>
>   /*
> - * Synchronous osd object method call.  Returns the number of bytes
> - * returned in the outbound buffer, or a negative error code.
> + * Synchronous osd object op call.  Returns the number of bytes
> + * returned in the inbound buffer, or a negative error code.
>    */
> -static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
> -			     const char *object_name,
> -			     const char *class_name,
> -			     const char *method_name,
> -			     const void *outbound,
> -			     size_t outbound_size,
> -			     void *inbound,
> -			     size_t inbound_size)
> +static int rbd_obj_request_sync(struct rbd_device *rbd_dev,
> +				struct rbd_obj_request *obj_request,
> +				const void *outbound, size_t outbound_size,
> +				void *inbound, size_t inbound_size)
>   {
>   	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
> -	struct rbd_obj_request *obj_request;
> -	struct page **pages;
> -	u32 page_count;
> -	int ret;
> -
> -	/*
> -	 * Method calls are ultimately read operations.  The result
> -	 * should placed into the inbound buffer provided.  They
> -	 * also supply outbound data--parameters for the object
> -	 * method.  Currently if this is present it will be a
> -	 * snapshot id.
> -	 */
> -	page_count = (u32)calc_pages_for(0, inbound_size);
> -	pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
> -	if (IS_ERR(pages))
> -		return PTR_ERR(pages);
> -
> -	ret = -ENOMEM;
> -	obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
> -							OBJ_REQUEST_PAGES);
> -	if (!obj_request)
> -		goto out;
> -
> -	obj_request->pages = pages;
> -	obj_request->page_count = page_count;
> -
> -	obj_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1,
> -						  obj_request);
> -	if (!obj_request->osd_req)
> -		goto out;
> +	struct page **pages = NULL;
> +	u32 page_count = 0;
> +	int ret = -ENOMEM;
> +	u16 op = obj_request->osd_req->r_ops[0].op;
> +	struct ceph_pagelist *pagelist;
> +
> +	if (inbound_size) {
> +		page_count = (u32)calc_pages_for(0, inbound_size);
> +		pages = ceph_alloc_page_vector(page_count, GFP_NOIO);

I don't know now whether GFP_NOIO is right (or wrong).  I just call
attention to it because it's different from the GFP_KERNEL that was
used before.  (I'll let you figure it out...)  In any case, that
change (and any others like it below) probably warrants its own patch.

> +		if (IS_ERR(pages))
> +			return PTR_ERR(pages);
> +
> +		obj_request->pages = pages;
> +		obj_request->page_count = page_count;
> +
> +		switch (op) {
> +		case CEPH_OSD_OP_CALL:
> +			osd_req_op_cls_response_data_pages(obj_request->osd_req,
> +							   0, pages,
> +							   inbound_size,
> +							   0, false, false);
> +			break;
> +		default:
> +			BUG();

You should use rbd_assert() rather than directly calling BUG().
(We really should not be calling BUG() there either, but that's
another matter.)

> +		}
> +	}
>
> -	osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
> -					class_name, method_name);
>   	if (outbound_size) {
> -		struct ceph_pagelist *pagelist;
> -
> -		pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
> +		pagelist = kmalloc(sizeof (*pagelist), GFP_NOIO);
>   		if (!pagelist)
> -			goto out;
> +			goto free_pages;
>
>   		ceph_pagelist_init(pagelist);
>   		ceph_pagelist_append(pagelist, outbound, outbound_size);
> -		osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0,
> -						pagelist);
> +
> +		switch (op) {
> +		case CEPH_OSD_OP_CALL:
> +			osd_req_op_cls_request_data_pagelist(
> +							obj_request->osd_req, 0,
> +							pagelist);
> +			break;
> +		default:
> +			BUG();

You already verified op was valid.  Really, since this is just
setting up a method call, the only op should be CEPH_OSD_OP_CALL
(I *think*, though you may have other plans).  If that's the case,
just do an rbd_assert(op == CEPH_OSD_OP_CALL) at the top and
move on without these case statements.

> +		}
>   	}
> -	osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
> -					obj_request->pages, inbound_size,
> -					0, false, false);
> -	rbd_osd_req_format_read(obj_request);
> +
> +	if (inbound_size)
> +		rbd_osd_req_format_read(obj_request);
> +	else
> +		rbd_osd_req_format_write(obj_request);
>
>   	ret = rbd_obj_request_submit(osdc, obj_request);
>   	if (ret)
> -		goto out;
> +		goto done;
>   	ret = rbd_obj_request_wait(obj_request);
>   	if (ret)
> -		goto out;
> +		goto done;
>
>   	ret = obj_request->result;
>   	if (ret < 0)
> -		goto out;
> +		goto done;
>
>   	rbd_assert(obj_request->xferred < (u64)INT_MAX);
>   	ret = (int)obj_request->xferred;
> -	ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
> -out:
> -	if (obj_request)
> -		rbd_obj_request_put(obj_request);
> -	else
> -		ceph_release_page_vector(pages, page_count);
> +	if (inbound_size)
> +		ceph_copy_from_page_vector(pages, inbound, 0,
> +					   obj_request->xferred);
> +done:
> +	return ret;
> +
> +free_pages:
> +	ceph_release_page_vector(pages, page_count);
> +	return ret;
> +}
>
> +/*
> + * Synchronous osd object method call.  Returns the number of bytes
> + * returned in the inbound buffer, or a negative error code.
> + */
> +static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
> +			       const char *object_name,
> +			       const char *class_name,
> +			       const char *method_name,
> +			       const void *outbound,
> +			       size_t outbound_size,
> +			       void *inbound,
> +			       size_t inbound_size)
> +{
> +	struct rbd_obj_request *obj_request;
> +	int ret = -ENOMEM;
> +
> +	obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
> +					     OBJ_REQUEST_PAGES);
> +	if (!obj_request)
> +		return -ENOMEM;
> +
> +	obj_request->osd_req = rbd_osd_req_create(rbd_dev,
> +					inbound ? OBJ_OP_READ : OBJ_OP_WRITE,
> +					1, obj_request);
> +	if (!obj_request->osd_req)
> +		goto out;
> +
> +	osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
> +			    class_name, method_name);
> +	ret = rbd_obj_request_sync(rbd_dev, obj_request, outbound, outbound_size,
> +				   inbound, inbound_size);
> +out:
> +	rbd_obj_request_put(obj_request);
>   	return ret;
>   }
>
>


^ permalink raw reply	[flat|nested] 19+ messages in thread

* Re: [PATCH 02/10] ceph: add start/finish encoding helpers
  2015-04-28 22:05 ` [PATCH 02/10] ceph: add start/finish encoding helpers mchristi
@ 2015-04-29 21:54   ` Alex Elder
  2015-04-30 12:22   ` Alex Elder
  1 sibling, 0 replies; 19+ messages in thread
From: Alex Elder @ 2015-04-29 21:54 UTC (permalink / raw)
  To: mchristi, ceph-devel

On 04/28/2015 05:05 PM, mchristi@redhat.com wrote:
> From: Mike Christie <michaelc@cs.wisc.edu>
>
> This patch adds helpers to encode/decode the starting blocks
> locking code. They are the equivalent of ENCODE_START and
> DECODE_START_LEGACY_COMPAT_LEN in the userspace ceph code.

Your subject line says "start/finish encoding" but really it's just
encode/decode.

I have a few suggestions and questions, below.  Also, I'm about to
get on a plane, so I won't be providing any more reviews for a while.

					-Alex

> Signed-off-by: Mike Christie <michaelc@cs.wisc.edu>
> ---
>   include/linux/ceph/decode.h | 55 +++++++++++++++++++++++++++++++++++++++++++++
>   1 file changed, 55 insertions(+)
>
> diff --git a/include/linux/ceph/decode.h b/include/linux/ceph/decode.h
> index a6ef9cc..96ec43d 100644
> --- a/include/linux/ceph/decode.h
> +++ b/include/linux/ceph/decode.h
> @@ -217,6 +217,61 @@ static inline void ceph_encode_string(void **p, void *end,
>   	*p += len;
>   }
>
> +/*
> + * version and length starting block encoders/decoders
> + */
> +
> +/* current code version (u8) + compat code version (u8) + len of struct (u32) */
> +#define CEPH_ENCODING_START_BLK_LEN 6

I don't see this used. (I'm sure it will be soon though.)

Why not just explain what it is in code?

#define CEPH_ENCODING_START_BLK_LEN \
		(sizeof(u8) + sizeof(u8) + sizeof(u32))

> +/**
> + * ceph_start_encoding - start encoding block
> + * @p: buffer to encode data in
> + * @curr_ver: current (code) version of the encoding
> + * @compat_ver: oldest code version that can decode it
> + * @len: length of data that will be encoded in buffer
> + */
> +static inline void ceph_start_encoding(void **p, u8 curr_ver, u8 compat_ver,
> +				       u32 len)
> +{
> +	ceph_encode_8(p, curr_ver);
> +	ceph_encode_8(p, compat_ver);
> +	ceph_encode_32(p, len);
> +}
> +
> +/**
> + * ceph_start_decoding_compat - decode block with legacy support for older schemes
> + * @p: buffer to decode
> + * @end: end of decode buffer
> + * @curr_ver: current version of the encoding that the code supports/encode
> + * @compat_ver: oldest version that includes a __u8 compat version field
> + * @len_ver: oldest version that includes a __u32 length wrapper
> + * @len: buffer to return len of data in buffer
> + */
> +static inline int ceph_start_decoding_compat(void **p, void *end, u8 curr_ver,
> +					     u8 compat_ver, u8 len_ver, u32 *len)
> +{
> +	u8 struct_ver, struct_compat;
> +
> +	ceph_decode_8_safe(p, end, struct_ver, fail);
> +	if (struct_ver >= curr_ver) {

It seems to me it doesn't matter whether the current code
supports the struct compat version or not.  What matters
is whether the encoded header contains the compat field
or not.  I'm not sure what determines that.

> +		ceph_decode_8_safe(p, end, struct_compat, fail);
> +		if (curr_ver < struct_compat)
> +			return -EINVAL;
> +	}
> +
> +	if (struct_ver >= len_ver) {
> +		ceph_decode_32_safe(p, end, *len, fail);
> +	} else {
> +		*len = 0;
> +	}
> +
> +	return 0;
> +fail:
> +	return -ERANGE;
> +}
> +
> +
>   #define ceph_encode_need(p, end, n, bad)			\
>   	do {							\
>   		if (!likely(ceph_has_room(p, end, n)))		\
>


^ permalink raw reply	[flat|nested] 19+ messages in thread

* Re: [PATCH 08/10] ceph/rbd: add notify support
  2015-04-28 22:05 ` [PATCH 08/10] ceph/rbd: add notify support mchristi
@ 2015-04-30  0:50   ` Jason Dillaman
  0 siblings, 0 replies; 19+ messages in thread
From: Jason Dillaman @ 2015-04-30  0:50 UTC (permalink / raw)
  To: mchristi; +Cc: ceph-devel

Minor: in librbd, RBD notify op (rbd_notify_op) codes 0 - 10 have now been allocated.

-- 

Jason Dillaman 
Red Hat 
dillaman@redhat.com 
http://www.redhat.com 


----- Original Message -----
From: mchristi@redhat.com
To: ceph-devel@vger.kernel.org
Sent: Tuesday, April 28, 2015 6:05:45 PM
Subject: [PATCH 08/10] ceph/rbd: add notify support

From: Mike Christie <michaelc@cs.wisc.edu>

This adds support for rados's notify call. It is being used to notify
scsi PR and TMF watchers that the scsi pr info has changed, or that
we want to sync up on TMF execution (currently only LUN_RESET).

I did not add support for the notify2 recv buffer as I am not using
it yet. Currently, this results in log messages like:

kernel: libceph: read_partial_message skipping long message (48 > 0)

This commit message used to say, I was going to add it later as I need
it to be able to send scsi sense codes, but I guess Doug is going to do
that now. Thanks Doug!

Signed-off-by: Mike Christie <michaelc@cs.wisc.edu>
---
 drivers/block/rbd.c             | 118 ++++++++++++++++++++++++++++++++++++----
 include/linux/ceph/osd_client.h |  16 +++++-
 include/linux/ceph/rados.h      |   9 +++
 net/ceph/osd_client.c           |  51 +++++++++++++++++
 4 files changed, 182 insertions(+), 12 deletions(-)

diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index a70447c..aed38c0 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -132,6 +132,21 @@ static int atomic_dec_return_safe(atomic_t *v)
 #define DEV_NAME_LEN		32
 #define MAX_INT_FORMAT_WIDTH	((5 * sizeof (int)) / 2 + 1)
 
+enum rbd_notify_op {
+	RBD_NOTIFY_OP_ACQUIRED_LOCK	= 0,
+	RBD_NOTIFY_OP_RELEASED_LOCK	= 1,
+	RBD_NOTIFY_OP_REQUEST_LOCK	= 2,
+	RBD_NOTIFY_OP_HEADER_UPDATE	= 3,
+	RBD_NOTIFY_OP_ASYNC_PROGRESS	= 4,
+	RBD_NOTIFY_OP_ASYNC_COMPLETE	= 5,
+	RBD_NOTIFY_OP_FLATTEN		= 6,
+	RBD_NOTIFY_OP_RESIZE		= 7,
+	RBD_NOTIFY_OP_SNAP_CREATE	= 8,
+	RBD_NOTIFY_OP_SCSI_PR_UPDATE	= 9,
+	RBD_NOTIFY_OP_SCSI_LUN_RESET_START	= 10,
+	RBD_NOTIFY_OP_SCSI_LUN_RESET_COMPLETE	= 11,
+};
+
 /*
  * block device image metadata (in-memory version)
  */
@@ -1847,6 +1862,7 @@ static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
 	case CEPH_OSD_OP_CALL:
 	case CEPH_OSD_OP_NOTIFY_ACK:
 	case CEPH_OSD_OP_WATCH:
+	case CEPH_OSD_OP_NOTIFY:
 		rbd_osd_trivial_callback(obj_request);
 		break;
 	default:
@@ -3087,27 +3103,51 @@ static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, s32 return_code,
 {
 	struct rbd_device *rbd_dev = (struct rbd_device *)data;
 	int ret;
+	u32 len, notify_op = -1;
+	void *p = payload, *end = p + payload_len;
 
 	if (!rbd_dev)
 		return;
 
-	dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
+	dout("%s: \"%s\" notify_id %llu opcode %u rc %d bl len %u\n", __func__,
 		rbd_dev->header_name, (unsigned long long)notify_id,
-		(unsigned int)opcode);
+		(unsigned int)opcode, return_code, payload_len);
 
-	/*
-	 * Until adequate refresh error handling is in place, there is
-	 * not much we can do here, except warn.
-	 *
-	 * See http://tracker.ceph.com/issues/5040
-	 */
-	ret = rbd_dev_refresh(rbd_dev);
-	if (ret)
-		rbd_warn(rbd_dev, "refresh failed: %d", ret);
+	if (payload_len) {
+		if (ceph_start_decoding(&p, end, 1, &len))
+			goto decode_fail;
+		ceph_decode_32_safe(&p, end, notify_op, decode_fail);
+	}
+
+	if (opcode == CEPH_WATCH_EVENT_DISCONNECT)
+		return;
+
+	dout("%s: \"%s\" RBD notify op %u\n", __func__, rbd_dev->header_name,
+	     notify_op);
+
+	switch (notify_op) {
+	case RBD_NOTIFY_OP_SCSI_PR_UPDATE:
+		break;
+	default:
+		/*
+		 * Until adequate refresh error handling is in place, there is
+		 * not much we can do here, except warn.
+		 *
+		 * See http://tracker.ceph.com/issues/5040
+		 */
+		ret = rbd_dev_refresh(rbd_dev);
+		if (ret)
+			rbd_warn(rbd_dev, "refresh failed: %d", ret);
+	}
 
 	ret = rbd_obj_notify_ack_sync(rbd_dev, notify_id);
 	if (ret)
 		rbd_warn(rbd_dev, "notify_ack ret %d", ret);
+	return;
+
+decode_fail:
+	rbd_warn(rbd_dev, "Invalid op/notify_op %u/%u", (unsigned int)opcode,
+		 notify_op);
 }
 
 /*
@@ -3260,6 +3300,12 @@ static int rbd_obj_request_sync(struct rbd_device *rbd_dev,
 							   inbound_size,
 							   0, false, false);
 			break;
+		case CEPH_OSD_OP_NOTIFY:
+			osd_req_op_notify_response_data_pages(
+							obj_request->osd_req,
+							0, pages, inbound_size,
+							0, false, false);
+			break;
 		default:
 			BUG();
 		}
@@ -3279,6 +3325,11 @@ static int rbd_obj_request_sync(struct rbd_device *rbd_dev,
 							obj_request->osd_req, 0,
 							pagelist);
 			break;
+		case CEPH_OSD_OP_NOTIFY:
+			osd_req_op_notify_request_data_pagelist(
+							obj_request->osd_req, 0,
+							pagelist);
+			break;
 		default:
 			BUG();
 		}
@@ -3349,6 +3400,51 @@ out:
 	return ret;
 }
 
+static int rbd_obj_notify_scsi_event_sync(struct rbd_device *rbd_dev,
+					  u32 notify_op,
+					  u32 notify_timeout)
+{
+	struct rbd_obj_request *obj_request;
+	int ret = -ENOMEM;
+	struct {
+		__le32 version;
+		__le32 timeout;
+		__le32 buf_len;
+		/* payload only supports basic ops where we just send the op */
+		u8 curr_ver;
+		u8 compat_ver;
+		__le32 len;
+		__le32 notify_op;
+	} __attribute__ ((packed)) notify_buf = { 0 };
+
+	notify_buf.version = cpu_to_le32(0);
+	notify_buf.timeout = cpu_to_le32(notify_timeout);
+	notify_buf.buf_len = cpu_to_le32(10);
+	notify_buf.curr_ver = 2;
+	notify_buf.compat_ver = 1;
+	notify_buf.len = cpu_to_le32(sizeof(__le32));
+	notify_buf.notify_op = cpu_to_le32(notify_op);
+
+	obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
+					     OBJ_REQUEST_PAGES);
+	if (!obj_request)
+		return -ENOMEM;
+
+	obj_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_WRITE,
+						  1, obj_request);
+	if (!obj_request->osd_req)
+		goto out;
+
+	osd_req_op_notify_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY,
+			       rbd_dev->watch_event->cookie);
+
+	ret = rbd_obj_request_sync(rbd_dev, obj_request, &notify_buf,
+				   sizeof(notify_buf), NULL, 0);
+out:
+	rbd_obj_request_put(obj_request);
+	return ret;
+}
+
 static void rbd_queue_workfn(struct work_struct *work)
 {
 	struct request *rq = blk_mq_rq_from_pdu(work);
diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h
index 8c4ba9a..d512dfa 100644
--- a/include/linux/ceph/osd_client.h
+++ b/include/linux/ceph/osd_client.h
@@ -110,6 +110,11 @@ struct ceph_osd_req_op {
 			u32 gen;
 		} watch;
 		struct {
+			u64 cookie;
+			struct ceph_osd_data request_data;
+			struct ceph_osd_data response_data;
+		} notify;
+		struct {
 			u64 expected_object_size;
 			u64 expected_write_size;
 		} alloc_hint;
@@ -301,7 +306,16 @@ extern void osd_req_op_cls_response_data_pages(struct ceph_osd_request *,
 					struct page **pages, u64 length,
 					u32 alignment, bool pages_from_pool,
 					bool own_pages);
-
+extern void osd_req_op_notify_request_data_pagelist(struct ceph_osd_request *,
+					unsigned int which,
+					struct ceph_pagelist *pagelist);
+extern void osd_req_op_notify_response_data_pages(struct ceph_osd_request *,
+					unsigned int which,
+					struct page **pages, u64 length,
+					u32 alignment, bool pages_from_pool,
+					bool own_pages);
+extern void osd_req_op_notify_init(struct ceph_osd_request *osd_req,
+				   unsigned int which, u16 opcode, u64 cookie);
 extern void osd_req_op_cls_init(struct ceph_osd_request *osd_req,
 					unsigned int which, u16 opcode,
 					const char *class, const char *method);
diff --git a/include/linux/ceph/rados.h b/include/linux/ceph/rados.h
index 7d3721f..cae82b36 100644
--- a/include/linux/ceph/rados.h
+++ b/include/linux/ceph/rados.h
@@ -427,6 +427,12 @@ enum {
 	CEPH_OSD_WATCH_OP_PING = 7,
 };
 
+enum {
+	CEPH_WATCH_EVENT_NOTIFY			= 1, /* notifying watcher */
+	CEPH_WATCH_EVENT_NOTIFY_COMPLETE	= 2, /* notifier notified when done */
+	CEPH_WATCH_EVENT_DISCONNECT		= 3, /* we were disconnected */
+};
+
 /*
  * an individual object operation.  each may be accompanied by some data
  * payload
@@ -465,6 +471,9 @@ struct ceph_osd_op {
 			__u32 gen;	/* registration generation */
 		} __attribute__ ((packed)) watch;
 		struct {
+			__le64 cookie;
+		} __attribute__ ((packed)) notify;
+		struct {
 			__le64 offset, length;
 			__le64 src_offset;
 		} __attribute__ ((packed)) clonerange;
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index cfdb6aa..8e90ee3 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -243,6 +243,29 @@ void osd_req_op_cls_response_data_pages(struct ceph_osd_request *osd_req,
 }
 EXPORT_SYMBOL(osd_req_op_cls_response_data_pages);
 
+void osd_req_op_notify_response_data_pages(struct ceph_osd_request *osd_req,
+			unsigned int which, struct page **pages, u64 length,
+			u32 alignment, bool pages_from_pool, bool own_pages)
+{
+	struct ceph_osd_data *osd_data;
+
+	osd_data = osd_req_op_data(osd_req, which, notify, response_data);
+	ceph_osd_data_pages_init(osd_data, pages, length, alignment,
+				pages_from_pool, own_pages);
+}
+EXPORT_SYMBOL(osd_req_op_notify_response_data_pages);
+
+void osd_req_op_notify_request_data_pagelist(
+			struct ceph_osd_request *osd_req,
+			unsigned int which, struct ceph_pagelist *pagelist)
+{
+	struct ceph_osd_data *osd_data;
+
+	osd_data = osd_req_op_data(osd_req, which, notify, request_data);
+	ceph_osd_data_pagelist_init(osd_data, pagelist);
+}
+EXPORT_SYMBOL(osd_req_op_notify_request_data_pagelist);
+
 static u64 ceph_osd_data_length(struct ceph_osd_data *osd_data)
 {
 	switch (osd_data->type) {
@@ -292,6 +315,10 @@ static void osd_req_op_data_release(struct ceph_osd_request *osd_req,
 		ceph_osd_data_release(&op->cls.request_data);
 		ceph_osd_data_release(&op->cls.response_data);
 		break;
+	case CEPH_OSD_OP_NOTIFY:
+		ceph_osd_data_release(&op->notify.request_data);
+		ceph_osd_data_release(&op->notify.response_data);
+		break;
 	case CEPH_OSD_OP_SETXATTR:
 	case CEPH_OSD_OP_CMPXATTR:
 		ceph_osd_data_release(&op->xattr.osd_data);
@@ -581,6 +608,16 @@ int osd_req_op_xattr_init(struct ceph_osd_request *osd_req, unsigned int which,
 }
 EXPORT_SYMBOL(osd_req_op_xattr_init);
 
+void osd_req_op_notify_init(struct ceph_osd_request *osd_req, unsigned int which,
+			    u16 opcode, u64 cookie)
+{
+	struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, opcode);
+
+	BUG_ON(opcode != CEPH_OSD_OP_NOTIFY);
+	op->watch.cookie = cookie;
+}
+EXPORT_SYMBOL(osd_req_op_notify_init);
+
 void osd_req_op_watch_init(struct ceph_osd_request *osd_req, unsigned int which,
 			   u16 opcode, u8 watch_opcode, u64 cookie)
 {
@@ -698,6 +735,20 @@ static u64 osd_req_encode_op(struct ceph_osd_request *req,
 		break;
 	case CEPH_OSD_OP_STARTSYNC:
 		break;
+	case CEPH_OSD_OP_NOTIFY:
+		dst->notify.cookie = cpu_to_le64(src->notify.cookie);
+
+		osd_data = &src->notify.request_data;
+		data_length = ceph_osd_data_length(osd_data);
+		if (data_length) {
+			BUG_ON(osd_data->type == CEPH_OSD_DATA_TYPE_NONE);
+			ceph_osdc_msg_data_add(req->r_request, osd_data);
+			src->payload_len += data_length;
+			request_data_len += data_length;
+		}
+		osd_data = &src->notify.response_data;
+		ceph_osdc_msg_data_add(req->r_reply, osd_data);
+		break;
 	case CEPH_OSD_OP_NOTIFY_ACK:
 	case CEPH_OSD_OP_WATCH:
 		dst->watch.cookie = cpu_to_le64(src->watch.cookie);
-- 
1.8.3.1

--
To unsubscribe from this list: send the line "unsubscribe ceph-devel" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html

^ permalink raw reply related	[flat|nested] 19+ messages in thread

* Re: [PATCH 10/10] rbd: distribute scsi pr info through rbd class calls
  2015-04-28 22:05 ` [PATCH 10/10] rbd: distribute scsi pr info through rbd class calls mchristi
@ 2015-04-30  7:56   ` Christoph Hellwig
  0 siblings, 0 replies; 19+ messages in thread
From: Christoph Hellwig @ 2015-04-30  7:56 UTC (permalink / raw)
  To: mchristi; +Cc: ceph-devel

On Tue, Apr 28, 2015 at 05:05:47PM -0500, mchristi@redhat.com wrote:
> From: Mike Christie <michaelc@cs.wisc.edu>
> 
> This stores the LIO PR info in the rbd header. Other
> clients (LIO nodes) are notified when the data changes,
> so they can update their info.
> 
> I added a sysfs file to test it here temporarily. I will
> remove it in the final version. The final patches will
> have a LIO callout call the rados locking and these
> functions instead.

So this is just an RFC and you'll have a real interface later?

I'd be curious what you're planning there as I'll need something similar
for the pNFS scsi-layouts client which will call into the block device
to ask for persistent reservations.

^ permalink raw reply	[flat|nested] 19+ messages in thread

* Re: [PATCH 02/10] ceph: add start/finish encoding helpers
  2015-04-28 22:05 ` [PATCH 02/10] ceph: add start/finish encoding helpers mchristi
  2015-04-29 21:54   ` Alex Elder
@ 2015-04-30 12:22   ` Alex Elder
  2015-05-01 19:39     ` Mike Christie
  1 sibling, 1 reply; 19+ messages in thread
From: Alex Elder @ 2015-04-30 12:22 UTC (permalink / raw)
  To: mchristi, ceph-devel

On 04/28/2015 05:05 PM, mchristi@redhat.com wrote:
> From: Mike Christie <michaelc@cs.wisc.edu>
>
> This patch adds helpers to encode/decode the starting blocks
> locking code. They are the equivalent of ENCODE_START and
> DECODE_START_LEGACY_COMPAT_LEN in the userspace ceph code.

Your subject line says "start/finish encoding" but really it's just
encode/decode.

I have a few suggestions and questions, below.

					-Alex

(I'm sorry if this got sent twice, I found it in my drafts folder
this morning so it seems it did not get sent.)

> Signed-off-by: Mike Christie <michaelc@cs.wisc.edu>
> ---
>   include/linux/ceph/decode.h | 55 +++++++++++++++++++++++++++++++++++++++++++++
>   1 file changed, 55 insertions(+)
>
> diff --git a/include/linux/ceph/decode.h b/include/linux/ceph/decode.h
> index a6ef9cc..96ec43d 100644
> --- a/include/linux/ceph/decode.h
> +++ b/include/linux/ceph/decode.h
> @@ -217,6 +217,61 @@ static inline void ceph_encode_string(void **p, void *end,
>   	*p += len;
>   }
>
> +/*
> + * version and length starting block encoders/decoders
> + */
> +
> +/* current code version (u8) + compat code version (u8) + len of struct (u32) */
> +#define CEPH_ENCODING_START_BLK_LEN 6

I don't see this used. (I'm sure it will be soon though.)

Why not just explain what it is in code?

#define CEPH_ENCODING_START_BLK_LEN \
		(sizeof(u8) + sizeof(u8) + sizeof(u32))

> +/**
> + * ceph_start_encoding - start encoding block
> + * @p: buffer to encode data in
> + * @curr_ver: current (code) version of the encoding
> + * @compat_ver: oldest code version that can decode it
> + * @len: length of data that will be encoded in buffer
> + */
> +static inline void ceph_start_encoding(void **p, u8 curr_ver, u8 compat_ver,
> +				       u32 len)
> +{
> +	ceph_encode_8(p, curr_ver);
> +	ceph_encode_8(p, compat_ver);
> +	ceph_encode_32(p, len);
> +}
> +
> +/**
> + * ceph_start_decoding_compat - decode block with legacy support for older schemes
> + * @p: buffer to decode
> + * @end: end of decode buffer
> + * @curr_ver: current version of the encoding that the code supports/encode
> + * @compat_ver: oldest version that includes a __u8 compat version field
> + * @len_ver: oldest version that includes a __u32 length wrapper
> + * @len: buffer to return len of data in buffer
> + */
> +static inline int ceph_start_decoding_compat(void **p, void *end, u8 curr_ver,
> +					     u8 compat_ver, u8 len_ver, u32 *len)
> +{
> +	u8 struct_ver, struct_compat;
> +
> +	ceph_decode_8_safe(p, end, struct_ver, fail);
> +	if (struct_ver >= curr_ver) {

It seems to me it doesn't matter whether the current code
supports the struct compat version or not.  What matters
is whether the encoded header contains the compat field
or not.  I'm not sure what determines that.

> +		ceph_decode_8_safe(p, end, struct_compat, fail);
> +		if (curr_ver < struct_compat)
> +			return -EINVAL;
> +	}
> +
> +	if (struct_ver >= len_ver) {
> +		ceph_decode_32_safe(p, end, *len, fail);
> +	} else {
> +		*len = 0;
> +	}
> +
> +	return 0;
> +fail:
> +	return -ERANGE;
> +}
> +
> +
>   #define ceph_encode_need(p, end, n, bad)			\
>   	do {							\
>   		if (!likely(ceph_has_room(p, end, n)))		\
>


^ permalink raw reply	[flat|nested] 19+ messages in thread

* Re: [PATCH 02/10] ceph: add start/finish encoding helpers
  2015-04-30 12:22   ` Alex Elder
@ 2015-05-01 19:39     ` Mike Christie
  2015-05-01 19:47       ` Mike Christie
  0 siblings, 1 reply; 19+ messages in thread
From: Mike Christie @ 2015-05-01 19:39 UTC (permalink / raw)
  To: Alex Elder, ceph-devel

On 04/30/2015 07:22 AM, Alex Elder wrote:
>> +/**
>> + * ceph_start_decoding_compat - decode block with legacy support for
>> older schemes
>> + * @p: buffer to decode
>> + * @end: end of decode buffer
>> + * @curr_ver: current version of the encoding that the code
>> supports/encode
>> + * @compat_ver: oldest version that includes a __u8 compat version field
>> + * @len_ver: oldest version that includes a __u32 length wrapper
>> + * @len: buffer to return len of data in buffer
>> + */
>> +static inline int ceph_start_decoding_compat(void **p, void *end, u8
>> curr_ver,
>> +                         u8 compat_ver, u8 len_ver, u32 *len)
>> +{
>> +    u8 struct_ver, struct_compat;
>> +
>> +    ceph_decode_8_safe(p, end, struct_ver, fail);
>> +    if (struct_ver >= curr_ver) {
> 
> It seems to me it doesn't matter whether the current code
> supports the struct compat version or not.  What matters
> is whether the encoded header contains the compat field
> or not.  I'm not sure what determines that.

I am not sure if I understood this comment.

I thought different structs got the compat field in different versions.
So, I was concerned about a case where we might get a old struct sent to
us. If the compat field was added to some struct_abc in version 2 and
that is what we support in the kernel, but some old osd sent us a struct
that was version 1, then we do not want to do the compat check below.


> 
>> +        ceph_decode_8_safe(p, end, struct_compat, fail);
>> +        if (curr_ver < struct_compat)
>> +            return -EINVAL;
>> +    }
>> +
>> +    if (struct_ver >= len_ver) {
>> +        ceph_decode_32_safe(p, end, *len, fail);
>> +    } else {
>> +        *len = 0;
>> +    }
>> +
>> +    return 0;
>> +fail:
>> +    return -ERANGE;
>> +}
>> +
>> +
>>   #define ceph_encode_need(p, end, n, bad)            \
>>       do {                            \
>>           if (!likely(ceph_has_room(p, end, n)))        \
>>
> 


^ permalink raw reply	[flat|nested] 19+ messages in thread

* Re: [PATCH 02/10] ceph: add start/finish encoding helpers
  2015-05-01 19:39     ` Mike Christie
@ 2015-05-01 19:47       ` Mike Christie
  2015-05-01 19:53         ` Alex Elder
  0 siblings, 1 reply; 19+ messages in thread
From: Mike Christie @ 2015-05-01 19:47 UTC (permalink / raw)
  To: Alex Elder, ceph-devel

On 05/01/2015 02:39 PM, Mike Christie wrote:
> On 04/30/2015 07:22 AM, Alex Elder wrote:
>>> +/**
>>> + * ceph_start_decoding_compat - decode block with legacy support for
>>> older schemes
>>> + * @p: buffer to decode
>>> + * @end: end of decode buffer
>>> + * @curr_ver: current version of the encoding that the code
>>> supports/encode
>>> + * @compat_ver: oldest version that includes a __u8 compat version field
>>> + * @len_ver: oldest version that includes a __u32 length wrapper
>>> + * @len: buffer to return len of data in buffer
>>> + */
>>> +static inline int ceph_start_decoding_compat(void **p, void *end, u8
>>> curr_ver,
>>> +                         u8 compat_ver, u8 len_ver, u32 *len)
>>> +{
>>> +    u8 struct_ver, struct_compat;
>>> +
>>> +    ceph_decode_8_safe(p, end, struct_ver, fail);
>>> +    if (struct_ver >= curr_ver) {
>>
>> It seems to me it doesn't matter whether the current code
>> supports the struct compat version or not.  What matters
>> is whether the encoded header contains the compat field
>> or not.  I'm not sure what determines that.
> 
> I am not sure if I understood this comment.
> 
> I thought different structs got the compat field in different versions.
> So, I was concerned about a case where we might get a old struct sent to
> us. If the compat field was added to some struct_abc in version 2 and
> that is what we support in the kernel, but some old osd sent us a struct
> that was version 1, then we do not want to do the compat check below.
> 

Doh! I wrote the above mail, then realized what you meant.

I think I should have checked the compat_ver passed into the version above.

if (struct_ver >= compat_ver) {
	ceph_decode_8_safe(p, end, struct_compat, fail);
	if (curr_ver < struct_compat)

> 
>>
>>> +        ceph_decode_8_safe(p, end, struct_compat, fail);
>>> +        if (curr_ver < struct_compat)
>>> +            return -EINVAL;
>>> +    }
>>> +
>>> +    if (struct_ver >= len_ver) {
>>> +        ceph_decode_32_safe(p, end, *len, fail);
>>> +    } else {
>>> +        *len = 0;
>>> +    }
>>> +
>>> +    return 0;
>>> +fail:
>>> +    return -ERANGE;
>>> +}
>>> +
>>> +
>>>   #define ceph_encode_need(p, end, n, bad)            \
>>>       do {                            \
>>>           if (!likely(ceph_has_room(p, end, n)))        \
>>>
>>
> 
> --
> To unsubscribe from this list: send the line "unsubscribe ceph-devel" in
> the body of a message to majordomo@vger.kernel.org
> More majordomo info at  http://vger.kernel.org/majordomo-info.html
> 


^ permalink raw reply	[flat|nested] 19+ messages in thread

* Re: [PATCH 02/10] ceph: add start/finish encoding helpers
  2015-05-01 19:47       ` Mike Christie
@ 2015-05-01 19:53         ` Alex Elder
  0 siblings, 0 replies; 19+ messages in thread
From: Alex Elder @ 2015-05-01 19:53 UTC (permalink / raw)
  To: Mike Christie, ceph-devel

On 05/01/2015 02:47 PM, Mike Christie wrote:
> On 05/01/2015 02:39 PM, Mike Christie wrote:
>> On 04/30/2015 07:22 AM, Alex Elder wrote:
>>>> +/**
>>>> + * ceph_start_decoding_compat - decode block with legacy support for
>>>> older schemes
>>>> + * @p: buffer to decode
>>>> + * @end: end of decode buffer
>>>> + * @curr_ver: current version of the encoding that the code
>>>> supports/encode
>>>> + * @compat_ver: oldest version that includes a __u8 compat version field
>>>> + * @len_ver: oldest version that includes a __u32 length wrapper
>>>> + * @len: buffer to return len of data in buffer
>>>> + */
>>>> +static inline int ceph_start_decoding_compat(void **p, void *end, u8
>>>> curr_ver,
>>>> +                         u8 compat_ver, u8 len_ver, u32 *len)
>>>> +{
>>>> +    u8 struct_ver, struct_compat;
>>>> +
>>>> +    ceph_decode_8_safe(p, end, struct_ver, fail);
>>>> +    if (struct_ver >= curr_ver) {
>>>
>>> It seems to me it doesn't matter whether the current code
>>> supports the struct compat version or not.  What matters
>>> is whether the encoded header contains the compat field
>>> or not.  I'm not sure what determines that.
>>
>> I am not sure if I understood this comment.
>>
>> I thought different structs got the compat field in different versions.
>> So, I was concerned about a case where we might get a old struct sent to
>> us. If the compat field was added to some struct_abc in version 2 and
>> that is what we support in the kernel, but some old osd sent us a struct
>> that was version 1, then we do not want to do the compat check below.
>>
> 
> Doh! I wrote the above mail, then realized what you meant.

OK good...  And I should have known what determines
whether the header contains the compat field, but
I was already a little confused about what I was
looking at...

					-Alex

> I think I should have checked the compat_ver passed into the version above.
> 
> if (struct_ver >= compat_ver) {
> 	ceph_decode_8_safe(p, end, struct_compat, fail);
> 	if (curr_ver < struct_compat)
> 
>>
>>>
>>>> +        ceph_decode_8_safe(p, end, struct_compat, fail);
>>>> +        if (curr_ver < struct_compat)
>>>> +            return -EINVAL;
>>>> +    }
>>>> +
>>>> +    if (struct_ver >= len_ver) {
>>>> +        ceph_decode_32_safe(p, end, *len, fail);
>>>> +    } else {
>>>> +        *len = 0;
>>>> +    }
>>>> +
>>>> +    return 0;
>>>> +fail:
>>>> +    return -ERANGE;
>>>> +}
>>>> +
>>>> +
>>>>   #define ceph_encode_need(p, end, n, bad)            \
>>>>       do {                            \
>>>>           if (!likely(ceph_has_room(p, end, n)))        \
>>>>
>>>
>>
>> --
>> To unsubscribe from this list: send the line "unsubscribe ceph-devel" in
>> the body of a message to majordomo@vger.kernel.org
>> More majordomo info at  http://vger.kernel.org/majordomo-info.html
>>
> 


^ permalink raw reply	[flat|nested] 19+ messages in thread

end of thread, other threads:[~2015-05-01 19:54 UTC | newest]

Thread overview: 19+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2015-04-28 22:05 [PATCH 00/10] ceph/rbd: initial support for lio ha mchristi
2015-04-28 22:05 ` [PATCH 01/10] rbd: add obj request execution helper mchristi
2015-04-29 21:33   ` Alex Elder
2015-04-28 22:05 ` [PATCH 02/10] ceph: add start/finish encoding helpers mchristi
2015-04-29 21:54   ` Alex Elder
2015-04-30 12:22   ` Alex Elder
2015-05-01 19:39     ` Mike Christie
2015-05-01 19:47       ` Mike Christie
2015-05-01 19:53         ` Alex Elder
2015-04-28 22:05 ` [PATCH 03/10] ceph: export ceph_entity_type_name mchristi
2015-04-28 22:05 ` [PATCH 04/10] ceph/rbd: add support for watch notify payloads mchristi
2015-04-28 22:05 ` [PATCH 05/10] ceph: decode start helper mchristi
2015-04-28 22:05 ` [PATCH 06/10] ceph/rbd: add support for header version 2 and 3 mchristi
2015-04-28 22:05 ` [PATCH 07/10] ceph/rbd: update watch-notify ceph_osd_op mchristi
2015-04-28 22:05 ` [PATCH 08/10] ceph/rbd: add notify support mchristi
2015-04-30  0:50   ` Jason Dillaman
2015-04-28 22:05 ` [PATCH 09/10] rbd: add rados locking mchristi
2015-04-28 22:05 ` [PATCH 10/10] rbd: distribute scsi pr info through rbd class calls mchristi
2015-04-30  7:56   ` Christoph Hellwig

This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.