All of lore.kernel.org
 help / color / mirror / Atom feed
* [PATCH 0/6] support watch/notify version 2
@ 2015-06-12 15:56 Douglas Fuller
  2015-06-12 15:56 ` [PATCH 1/6] ceph/rbd: add support for watch notify payloads Douglas Fuller
                   ` (5 more replies)
  0 siblings, 6 replies; 18+ messages in thread
From: Douglas Fuller @ 2015-06-12 15:56 UTC (permalink / raw)
  To: ceph-devel

osd_client, rbd: add support for version 2 of watch/notify

Support watch/notify 2 in osd_client and update rbd for compatibility.
Map ceph_osd_event to expected watch/notify messages and store watch
and watch error callbacks. Implement CEPH_OSD_WATCH_OP_PING and
CEPH_OSD_WATCH_OP_RECONNECT. Handle CEPH_WATCH_EVENT_DISCONNECT.
Handle CEPH_WATCH_EVENT_NOTIFY_COMPLETE and provide a data item for
clients to consume.

Signed-off-by: Douglas Fuller <dfuller@redhat.com>

Douglas Fuller (3):
  osd_client, rbd: update event interface for watch/notify2
  osd_client: add support for notify payloads via notify event
  osd_client: send watch ping messages

Mike Christie (3):
  ceph/rbd: add support for watch notify payloads
  ceph/rbd: add support for header version 2 and 3
  ceph/rbd: update watch-notify ceph_osd_op

 drivers/block/rbd.c             |  56 ++++--
 include/linux/ceph/osd_client.h |  54 ++++--
 include/linux/ceph/rados.h      |  24 ++-
 net/ceph/osd_client.c           | 379 +++++++++++++++++++++++++++++++++++-----
 4 files changed, 439 insertions(+), 74 deletions(-)

-- 
1.9.3


^ permalink raw reply	[flat|nested] 18+ messages in thread

* [PATCH 1/6] ceph/rbd: add support for watch notify payloads
  2015-06-12 15:56 [PATCH 0/6] support watch/notify version 2 Douglas Fuller
@ 2015-06-12 15:56 ` Douglas Fuller
  2015-06-16 21:22   ` Josh Durgin
  2015-06-12 15:56 ` [PATCH 2/6] ceph/rbd: add support for header version 2 and 3 Douglas Fuller
                   ` (4 subsequent siblings)
  5 siblings, 1 reply; 18+ messages in thread
From: Douglas Fuller @ 2015-06-12 15:56 UTC (permalink / raw)
  To: ceph-devel

From: Mike Christie <michaelc@cs.wisc.edu>

This patch adds support for proto version 1 of watch-notify,
so drivers like rbd can be sent a buffer with information like
the notify operation being performed.

Signed-off-by: Mike Christie <michaelc@cs.wisc.edu>
---
 drivers/block/rbd.c             |  3 ++-
 include/linux/ceph/osd_client.h |  7 +++++--
 net/ceph/osd_client.c           | 21 ++++++++++++++++-----
 3 files changed, 23 insertions(+), 8 deletions(-)

diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index 89fe8a4..4b9ba9f 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -3103,7 +3103,8 @@ out:
 	return ret;
 }
 
-static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
+static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data,
+			 void *payload, int payload_len)
 {
 	struct rbd_device *rbd_dev = (struct rbd_device *)data;
 	int ret;
diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h
index 7506b48..eab96b5 100644
--- a/include/linux/ceph/osd_client.h
+++ b/include/linux/ceph/osd_client.h
@@ -184,7 +184,7 @@ struct ceph_osd_event {
 	u64 cookie;
 	int one_shot;
 	struct ceph_osd_client *osdc;
-	void (*cb)(u64, u64, u8, void *);
+	void (*cb)(u64, u64, u8, void *, void *, int);
 	void *data;
 	struct rb_node node;
 	struct list_head osd_node;
@@ -197,6 +197,8 @@ struct ceph_osd_event_work {
         u64 ver;
         u64 notify_id;
         u8 opcode;
+	void *payload;
+	int payload_len;
 };
 
 struct ceph_osd_client {
@@ -369,7 +371,8 @@ extern int ceph_osdc_writepages(struct ceph_osd_client *osdc,
 
 /* watch/notify events */
 extern int ceph_osdc_create_event(struct ceph_osd_client *osdc,
-				  void (*event_cb)(u64, u64, u8, void *),
+				  void (*event_cb)(u64, u64, u8, void *, void *,
+						   int),
 				  void *data, struct ceph_osd_event **pevent);
 extern void ceph_osdc_cancel_event(struct ceph_osd_event *event);
 extern void ceph_osdc_put_event(struct ceph_osd_event *event);
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index 5003367..aa1c5c46 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -2277,7 +2277,7 @@ static void __remove_event(struct ceph_osd_event *event)
 }
 
 int ceph_osdc_create_event(struct ceph_osd_client *osdc,
-			   void (*event_cb)(u64, u64, u8, void *),
+			   void (*event_cb)(u64, u64, u8, void *, void *, int),
 			   void *data, struct ceph_osd_event **pevent)
 {
 	struct ceph_osd_event *event;
@@ -2329,7 +2329,8 @@ static void do_event_work(struct work_struct *work)
 	u8 opcode = event_work->opcode;
 
 	dout("do_event_work completing %p\n", event);
-	event->cb(ver, notify_id, opcode, event->data);
+	event->cb(ver, notify_id, opcode, event->data, event_work->payload,
+		  event_work->payload_len);
 	dout("do_event_work completed %p\n", event);
 	ceph_osdc_put_event(event);
 	kfree(event_work);
@@ -2342,10 +2343,11 @@ static void do_event_work(struct work_struct *work)
 static void handle_watch_notify(struct ceph_osd_client *osdc,
 				struct ceph_msg *msg)
 {
-	void *p, *end;
+	void *p, *end, *payload = NULL;
 	u8 proto_ver;
 	u64 cookie, ver, notify_id;
 	u8 opcode;
+	u32 payload_len = 0;
 	struct ceph_osd_event *event;
 	struct ceph_osd_event_work *event_work;
 
@@ -2358,6 +2360,13 @@ static void handle_watch_notify(struct ceph_osd_client *osdc,
 	ceph_decode_64_safe(&p, end, ver, bad);
 	ceph_decode_64_safe(&p, end, notify_id, bad);
 
+	if (proto_ver >= 1) {
+		ceph_decode_32_safe(&p, end, payload_len, bad);
+		if (end - p < payload_len)
+			goto bad;
+		payload = p;
+	}
+
 	spin_lock(&osdc->event_lock);
 	event = __find_event(osdc, cookie);
 	if (event) {
@@ -2365,8 +2374,8 @@ static void handle_watch_notify(struct ceph_osd_client *osdc,
 		get_event(event);
 	}
 	spin_unlock(&osdc->event_lock);
-	dout("handle_watch_notify cookie %lld ver %lld event %p\n",
-	     cookie, ver, event);
+	dout("handle_watch_notify cookie %lld ver %lld event %p notify id %llu payload len %u\n",
+	     cookie, ver, event, notify_id, payload_len);
 	if (event) {
 		event_work = kmalloc(sizeof(*event_work), GFP_NOIO);
 		if (!event_work) {
@@ -2379,6 +2388,8 @@ static void handle_watch_notify(struct ceph_osd_client *osdc,
 		event_work->ver = ver;
 		event_work->notify_id = notify_id;
 		event_work->opcode = opcode;
+		event_work->payload = payload;
+		event_work->payload_len = payload_len;
 
 		queue_work(osdc->notify_wq, &event_work->work);
 	}
-- 
1.9.3


^ permalink raw reply related	[flat|nested] 18+ messages in thread

* [PATCH 2/6] ceph/rbd: add support for header version 2 and 3
  2015-06-12 15:56 [PATCH 0/6] support watch/notify version 2 Douglas Fuller
  2015-06-12 15:56 ` [PATCH 1/6] ceph/rbd: add support for watch notify payloads Douglas Fuller
@ 2015-06-12 15:56 ` Douglas Fuller
  2015-06-16 21:23   ` Josh Durgin
  2015-06-12 15:56 ` [PATCH 3/6] ceph/rbd: update watch-notify ceph_osd_op Douglas Fuller
                   ` (3 subsequent siblings)
  5 siblings, 1 reply; 18+ messages in thread
From: Douglas Fuller @ 2015-06-12 15:56 UTC (permalink / raw)
  To: ceph-devel

From: Mike Christie <michaelc@cs.wisc.edu>

This adds support watch-notify header 2 and 3 support, so we can
get a return_code from those operations.

Signed-off-by: Mike Christie <michaelc@cs.wisc.edu>
---
 drivers/block/rbd.c             |  5 +++--
 include/linux/ceph/osd_client.h | 10 ++++++----
 net/ceph/osd_client.c           | 25 +++++++++++++++++++------
 3 files changed, 28 insertions(+), 12 deletions(-)

diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index 4b9ba9f..65421eb 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -3103,8 +3103,9 @@ out:
 	return ret;
 }
 
-static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data,
-			 void *payload, int payload_len)
+static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, s32 return_code,
+			 u64 notifier_gid, void *data, void *payload,
+			 u32 payload_len)
 {
 	struct rbd_device *rbd_dev = (struct rbd_device *)data;
 	int ret;
diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h
index eab96b5..1c4e472 100644
--- a/include/linux/ceph/osd_client.h
+++ b/include/linux/ceph/osd_client.h
@@ -184,7 +184,7 @@ struct ceph_osd_event {
 	u64 cookie;
 	int one_shot;
 	struct ceph_osd_client *osdc;
-	void (*cb)(u64, u64, u8, void *, void *, int);
+	void (*cb)(u64, u64, u8, s32, u64, void *, void *, u32);
 	void *data;
 	struct rb_node node;
 	struct list_head osd_node;
@@ -197,8 +197,10 @@ struct ceph_osd_event_work {
         u64 ver;
         u64 notify_id;
         u8 opcode;
+	s32 return_code;
+	u64 notifier_gid;
 	void *payload;
-	int payload_len;
+	u32 payload_len;
 };
 
 struct ceph_osd_client {
@@ -371,8 +373,8 @@ extern int ceph_osdc_writepages(struct ceph_osd_client *osdc,
 
 /* watch/notify events */
 extern int ceph_osdc_create_event(struct ceph_osd_client *osdc,
-				  void (*event_cb)(u64, u64, u8, void *, void *,
-						   int),
+				  void (*event_cb)(u64, u64, u8, s32, u64,
+						   void *, void *, u32),
 				  void *data, struct ceph_osd_event **pevent);
 extern void ceph_osdc_cancel_event(struct ceph_osd_event *event);
 extern void ceph_osdc_put_event(struct ceph_osd_event *event);
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index aa1c5c46..590cf9c 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -2277,7 +2277,8 @@ static void __remove_event(struct ceph_osd_event *event)
 }
 
 int ceph_osdc_create_event(struct ceph_osd_client *osdc,
-			   void (*event_cb)(u64, u64, u8, void *, void *, int),
+			   void (*event_cb)(u64, u64, u8, s32, u64, void *,
+					    void *, u32),
 			   void *data, struct ceph_osd_event **pevent)
 {
 	struct ceph_osd_event *event;
@@ -2327,10 +2328,12 @@ static void do_event_work(struct work_struct *work)
 	u64 ver = event_work->ver;
 	u64 notify_id = event_work->notify_id;
 	u8 opcode = event_work->opcode;
+	s32 return_code = event_work->return_code;
+	u64 notifier_gid = event_work->notifier_gid;
 
 	dout("do_event_work completing %p\n", event);
-	event->cb(ver, notify_id, opcode, event->data, event_work->payload,
-		  event_work->payload_len);
+	event->cb(ver, notify_id, opcode, return_code, notifier_gid,
+		  event->data, event_work->payload, event_work->payload_len);
 	dout("do_event_work completed %p\n", event);
 	ceph_osdc_put_event(event);
 	kfree(event_work);
@@ -2345,9 +2348,10 @@ static void handle_watch_notify(struct ceph_osd_client *osdc,
 {
 	void *p, *end, *payload = NULL;
 	u8 proto_ver;
-	u64 cookie, ver, notify_id;
+	u64 cookie, ver, notify_id, notifier_gid = 0;
 	u8 opcode;
 	u32 payload_len = 0;
+	s32 return_code = 0;
 	struct ceph_osd_event *event;
 	struct ceph_osd_event_work *event_work;
 
@@ -2365,8 +2369,15 @@ static void handle_watch_notify(struct ceph_osd_client *osdc,
 		if (end - p < payload_len)
 			goto bad;
 		payload = p;
+		p += payload_len;
 	}
 
+	if (msg->hdr.version >= 2)
+		ceph_decode_32_safe(&p, end, return_code, bad);
+
+	if (msg->hdr.version >= 3)
+		ceph_decode_32_safe(&p, end, notifier_gid, bad);
+
 	spin_lock(&osdc->event_lock);
 	event = __find_event(osdc, cookie);
 	if (event) {
@@ -2374,8 +2385,8 @@ static void handle_watch_notify(struct ceph_osd_client *osdc,
 		get_event(event);
 	}
 	spin_unlock(&osdc->event_lock);
-	dout("handle_watch_notify cookie %lld ver %lld event %p notify id %llu payload len %u\n",
-	     cookie, ver, event, notify_id, payload_len);
+	dout("handle_watch_notify cookie %lld ver %lld event %p notify id %llu payload len %u return code %d notifier gid %llu\n",
+	     cookie, ver, event, notify_id, payload_len, return_code, notifier_gid);
 	if (event) {
 		event_work = kmalloc(sizeof(*event_work), GFP_NOIO);
 		if (!event_work) {
@@ -2388,6 +2399,8 @@ static void handle_watch_notify(struct ceph_osd_client *osdc,
 		event_work->ver = ver;
 		event_work->notify_id = notify_id;
 		event_work->opcode = opcode;
+		event_work->return_code = return_code;
+		event_work->notifier_gid = notifier_gid;
 		event_work->payload = payload;
 		event_work->payload_len = payload_len;
 
-- 
1.9.3


^ permalink raw reply related	[flat|nested] 18+ messages in thread

* [PATCH 3/6] ceph/rbd: update watch-notify ceph_osd_op
  2015-06-12 15:56 [PATCH 0/6] support watch/notify version 2 Douglas Fuller
  2015-06-12 15:56 ` [PATCH 1/6] ceph/rbd: add support for watch notify payloads Douglas Fuller
  2015-06-12 15:56 ` [PATCH 2/6] ceph/rbd: add support for header version 2 and 3 Douglas Fuller
@ 2015-06-12 15:56 ` Douglas Fuller
  2015-06-16 22:00   ` Josh Durgin
  2015-06-12 15:56 ` [PATCH 4/6] osd_client, rbd: update event interface for watch/notify2 Douglas Fuller
                   ` (2 subsequent siblings)
  5 siblings, 1 reply; 18+ messages in thread
From: Douglas Fuller @ 2015-06-12 15:56 UTC (permalink / raw)
  To: ceph-devel

From: Mike Christie <michaelc@cs.wisc.edu>

This syncs the ceph_osd_op struct with the current version of ceph
where the watch struct has been updated to support more ops and
the notify-ack support has been broken out of the watch struct.

Ceph commits
1a82cc3926fc7bc4cfbdd2fd4dfee8660d5107a1
2288f318e1b1f6a1c42b185fc1b4c41f23995247
73720130c34424bf1fe36058ebe8da66976f40fb

It still has us use the legacy watch op for now. I will add support
later. It is mostly a prepartion patch for more advanced notify support.

Questions:

1. Should linger also be set for CEPH_OSD_WATCH_OP_RECONNECT?
2. Not sure what watch.gen is used for. Is that for our internal
use or does the osd do something with it.

djf: removed changes to rbd.c for SCSI

Signed-off-by: Mike Christie <michaelc@cs.wisc.edu>
---
 drivers/block/rbd.c             | 19 +++++++-----
 include/linux/ceph/osd_client.h | 23 +++++++++++----
 include/linux/ceph/rados.h      | 24 +++++++++++++--
 net/ceph/osd_client.c           | 65 ++++++++++++++++++++++++++++++++++++-----
 4 files changed, 109 insertions(+), 22 deletions(-)

diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index 65421eb..ed170b1 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -3089,8 +3089,8 @@ static int rbd_obj_notify_ack_sync(struct rbd_device *rbd_dev, u64 notify_id)
 	if (!obj_request->osd_req)
 		goto out;
 
-	osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
-					notify_id, 0, 0);
+	osd_req_op_watch_init(obj_request->osd_req, 0,
+			      CEPH_OSD_OP_NOTIFY_ACK, 0, notify_id);
 	rbd_osd_req_format_read(obj_request);
 
 	ret = rbd_obj_request_submit(osdc, obj_request);
@@ -3138,7 +3138,7 @@ static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, s32 return_code,
  */
 static struct rbd_obj_request *rbd_obj_watch_request_helper(
 						struct rbd_device *rbd_dev,
-						bool watch)
+						u8 watch_opcode)
 {
 	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
 	struct ceph_options *opts = osdc->client->options;
@@ -3158,10 +3158,11 @@ static struct rbd_obj_request *rbd_obj_watch_request_helper(
 	}
 
 	osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
-			      rbd_dev->watch_event->cookie, 0, watch);
+			      watch_opcode, rbd_dev->watch_event->cookie);
 	rbd_osd_req_format_write(obj_request);
 
-	if (watch)
+	if (watch_opcode == CEPH_OSD_WATCH_OP_LEGACY_WATCH ||
+	    watch_opcode == CEPH_OSD_WATCH_OP_WATCH)
 		ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
 
 	ret = rbd_obj_request_submit(osdc, obj_request);
@@ -3174,7 +3175,7 @@ static struct rbd_obj_request *rbd_obj_watch_request_helper(
 
 	ret = obj_request->result;
 	if (ret) {
-		if (watch)
+		if (watch_opcode != CEPH_OSD_WATCH_OP_UNWATCH)
 			rbd_obj_request_end(obj_request);
 		goto out;
 	}
@@ -3203,7 +3204,8 @@ static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev)
 	if (ret < 0)
 		return ret;
 
-	obj_request = rbd_obj_watch_request_helper(rbd_dev, true);
+	obj_request = rbd_obj_watch_request_helper(rbd_dev,
+						CEPH_OSD_WATCH_OP_LEGACY_WATCH);
 	if (IS_ERR(obj_request)) {
 		ceph_osdc_cancel_event(rbd_dev->watch_event);
 		rbd_dev->watch_event = NULL;
@@ -3237,7 +3239,8 @@ static void rbd_dev_header_unwatch_sync(struct rbd_device *rbd_dev)
 	rbd_obj_request_put(rbd_dev->watch_request);
 	rbd_dev->watch_request = NULL;
 
-	obj_request = rbd_obj_watch_request_helper(rbd_dev, false);
+	obj_request = rbd_obj_watch_request_helper(rbd_dev,
+						   CEPH_OSD_WATCH_OP_UNWATCH);
 	if (!IS_ERR(obj_request))
 		rbd_obj_request_put(obj_request);
 	else
diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h
index 1c4e472..12732d3 100644
--- a/include/linux/ceph/osd_client.h
+++ b/include/linux/ceph/osd_client.h
@@ -106,11 +106,15 @@ struct ceph_osd_req_op {
 		struct {
 			u64 cookie;
 			u64 ver;
-			u32 prot_ver;
-			u32 timeout;
-			__u8 flag;
+			__u8 op;
+			u32 gen;
 		} watch;
 		struct {
+			u64 cookie;
+			struct ceph_osd_data request_data;
+			struct ceph_osd_data response_data;
+		} notify;
+		struct {
 			u64 expected_object_size;
 			u64 expected_write_size;
 		} alloc_hint;
@@ -302,7 +306,16 @@ extern void osd_req_op_cls_response_data_pages(struct ceph_osd_request *,
 					struct page **pages, u64 length,
 					u32 alignment, bool pages_from_pool,
 					bool own_pages);
-
+extern void osd_req_op_notify_request_data_pagelist(struct ceph_osd_request *,
+					unsigned int which,
+					struct ceph_pagelist *pagelist);
+extern void osd_req_op_notify_response_data_pages(struct ceph_osd_request *,
+					unsigned int which,
+					struct page **pages, u64 length,
+					u32 alignment, bool pages_from_pool,
+					bool own_pages);
+extern void osd_req_op_notify_init(struct ceph_osd_request *osd_req,
+				   unsigned int which, u16 opcode, u64 cookie);
 extern void osd_req_op_cls_init(struct ceph_osd_request *osd_req,
 					unsigned int which, u16 opcode,
 					const char *class, const char *method);
@@ -311,7 +324,7 @@ extern int osd_req_op_xattr_init(struct ceph_osd_request *osd_req, unsigned int
 				 size_t size, u8 cmp_op, u8 cmp_mode);
 extern void osd_req_op_watch_init(struct ceph_osd_request *osd_req,
 					unsigned int which, u16 opcode,
-					u64 cookie, u64 version, int flag);
+					u8 watch_opcode, u64 cookie);
 extern void osd_req_op_alloc_hint_init(struct ceph_osd_request *osd_req,
 				       unsigned int which,
 				       u64 expected_object_size,
diff --git a/include/linux/ceph/rados.h b/include/linux/ceph/rados.h
index 2f822dc..cae82b36 100644
--- a/include/linux/ceph/rados.h
+++ b/include/linux/ceph/rados.h
@@ -417,6 +417,22 @@ enum {
 
 #define RADOS_NOTIFY_VER	1
 
+enum {
+	CEPH_OSD_WATCH_OP_UNWATCH = 0,
+	CEPH_OSD_WATCH_OP_LEGACY_WATCH = 1,
+	/* note: use only ODD ids to prevent pre-giant code from
+	 * interpreting the op as UNWATCH */
+	CEPH_OSD_WATCH_OP_WATCH = 3,
+	CEPH_OSD_WATCH_OP_RECONNECT = 5,
+	CEPH_OSD_WATCH_OP_PING = 7,
+};
+
+enum {
+	CEPH_WATCH_EVENT_NOTIFY			= 1, /* notifying watcher */
+	CEPH_WATCH_EVENT_NOTIFY_COMPLETE	= 2, /* notifier notified when done */
+	CEPH_WATCH_EVENT_DISCONNECT		= 3, /* we were disconnected */
+};
+
 /*
  * an individual object operation.  each may be accompanied by some data
  * payload
@@ -450,10 +466,14 @@ struct ceph_osd_op {
 	        } __attribute__ ((packed)) snap;
 		struct {
 			__le64 cookie;
-			__le64 ver;
-			__u8 flag;	/* 0 = unwatch, 1 = watch */
+			__le64 ver;	/* no longer used */
+			__u8 op;	/* CEPH_OSD_WATCH_OP_* */
+			__u32 gen;	/* registration generation */
 		} __attribute__ ((packed)) watch;
 		struct {
+			__le64 cookie;
+		} __attribute__ ((packed)) notify;
+		struct {
 			__le64 offset, length;
 			__le64 src_offset;
 		} __attribute__ ((packed)) clonerange;
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index 590cf9c..74650e1 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -243,6 +243,29 @@ void osd_req_op_cls_response_data_pages(struct ceph_osd_request *osd_req,
 }
 EXPORT_SYMBOL(osd_req_op_cls_response_data_pages);
 
+void osd_req_op_notify_response_data_pages(struct ceph_osd_request *osd_req,
+			unsigned int which, struct page **pages, u64 length,
+			u32 alignment, bool pages_from_pool, bool own_pages)
+{
+	struct ceph_osd_data *osd_data;
+
+	osd_data = osd_req_op_data(osd_req, which, notify, response_data);
+	ceph_osd_data_pages_init(osd_data, pages, length, alignment,
+				pages_from_pool, own_pages);
+}
+EXPORT_SYMBOL(osd_req_op_notify_response_data_pages);
+
+void osd_req_op_notify_request_data_pagelist(
+			struct ceph_osd_request *osd_req,
+			unsigned int which, struct ceph_pagelist *pagelist)
+{
+	struct ceph_osd_data *osd_data;
+
+	osd_data = osd_req_op_data(osd_req, which, notify, request_data);
+	ceph_osd_data_pagelist_init(osd_data, pagelist);
+}
+EXPORT_SYMBOL(osd_req_op_notify_request_data_pagelist);
+
 static u64 ceph_osd_data_length(struct ceph_osd_data *osd_data)
 {
 	switch (osd_data->type) {
@@ -292,6 +315,10 @@ static void osd_req_op_data_release(struct ceph_osd_request *osd_req,
 		ceph_osd_data_release(&op->cls.request_data);
 		ceph_osd_data_release(&op->cls.response_data);
 		break;
+	case CEPH_OSD_OP_NOTIFY:
+		ceph_osd_data_release(&op->notify.request_data);
+		ceph_osd_data_release(&op->notify.response_data);
+		break;
 	case CEPH_OSD_OP_SETXATTR:
 	case CEPH_OSD_OP_CMPXATTR:
 		ceph_osd_data_release(&op->xattr.osd_data);
@@ -588,9 +615,18 @@ int osd_req_op_xattr_init(struct ceph_osd_request *osd_req, unsigned int which,
 }
 EXPORT_SYMBOL(osd_req_op_xattr_init);
 
-void osd_req_op_watch_init(struct ceph_osd_request *osd_req,
-				unsigned int which, u16 opcode,
-				u64 cookie, u64 version, int flag)
+void osd_req_op_notify_init(struct ceph_osd_request *osd_req, unsigned int which,
+			    u16 opcode, u64 cookie)
+{
+	struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, opcode);
+
+	BUG_ON(opcode != CEPH_OSD_OP_NOTIFY);
+	op->watch.cookie = cookie;
+}
+EXPORT_SYMBOL(osd_req_op_notify_init);
+
+void osd_req_op_watch_init(struct ceph_osd_request *osd_req, unsigned int which,
+			   u16 opcode, u8 watch_opcode, u64 cookie)
 {
 	struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which,
 						      opcode, 0);
@@ -598,9 +634,9 @@ void osd_req_op_watch_init(struct ceph_osd_request *osd_req,
 	BUG_ON(opcode != CEPH_OSD_OP_NOTIFY_ACK && opcode != CEPH_OSD_OP_WATCH);
 
 	op->watch.cookie = cookie;
-	op->watch.ver = version;
-	if (opcode == CEPH_OSD_OP_WATCH && flag)
-		op->watch.flag = (u8)1;
+	op->watch.ver = 0;
+	op->watch.op = watch_opcode;
+	op->watch.gen = 0;
 }
 EXPORT_SYMBOL(osd_req_op_watch_init);
 
@@ -708,11 +744,26 @@ static u64 osd_req_encode_op(struct ceph_osd_request *req,
 		break;
 	case CEPH_OSD_OP_STARTSYNC:
 		break;
+	case CEPH_OSD_OP_NOTIFY:
+		dst->notify.cookie = cpu_to_le64(src->notify.cookie);
+
+		osd_data = &src->notify.request_data;
+		data_length = ceph_osd_data_length(osd_data);
+		if (data_length) {
+			BUG_ON(osd_data->type == CEPH_OSD_DATA_TYPE_NONE);
+			ceph_osdc_msg_data_add(req->r_request, osd_data);
+			src->payload_len += data_length;
+			request_data_len += data_length;
+		}
+		osd_data = &src->notify.response_data;
+		ceph_osdc_msg_data_add(req->r_reply, osd_data);
+		break;
 	case CEPH_OSD_OP_NOTIFY_ACK:
 	case CEPH_OSD_OP_WATCH:
 		dst->watch.cookie = cpu_to_le64(src->watch.cookie);
 		dst->watch.ver = cpu_to_le64(src->watch.ver);
-		dst->watch.flag = src->watch.flag;
+		dst->watch.op = src->watch.op;
+		dst->watch.gen = cpu_to_le32(src->watch.gen);
 		break;
 	case CEPH_OSD_OP_SETALLOCHINT:
 		dst->alloc_hint.expected_object_size =
-- 
1.9.3


^ permalink raw reply related	[flat|nested] 18+ messages in thread

* [PATCH 4/6] osd_client, rbd: update event interface for watch/notify2
  2015-06-12 15:56 [PATCH 0/6] support watch/notify version 2 Douglas Fuller
                   ` (2 preceding siblings ...)
  2015-06-12 15:56 ` [PATCH 3/6] ceph/rbd: update watch-notify ceph_osd_op Douglas Fuller
@ 2015-06-12 15:56 ` Douglas Fuller
  2015-06-16 14:58   ` Mike Christie
  2015-06-16 23:18   ` Josh Durgin
  2015-06-12 15:56 ` [PATCH 5/6] osd_client: add support for notify payloads via notify event Douglas Fuller
  2015-06-12 15:56 ` [PATCH 6/6] osd_client: send watch ping messages Douglas Fuller
  5 siblings, 2 replies; 18+ messages in thread
From: Douglas Fuller @ 2015-06-12 15:56 UTC (permalink / raw)
  To: ceph-devel

Change unused ceph_osd_event structure to refer to pending watch/notify2
messages. Watch events include the separate watch and watch error callbacks
used for watch/notify2. Update rbd to use separate watch and watch error
callbacks via the new watch event.

Signed-off-by: Douglas Fuller <dfuller@redhat.com>
---
 drivers/block/rbd.c             |  41 +++++++---
 include/linux/ceph/osd_client.h |  27 +++++--
 net/ceph/osd_client.c           | 175 +++++++++++++++++++++++++++++-----------
 3 files changed, 179 insertions(+), 64 deletions(-)

diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index ed170b1..20b3b23 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -427,6 +427,8 @@ static ssize_t rbd_remove_single_major(struct bus_type *bus, const char *buf,
 				       size_t count);
 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping);
 static void rbd_spec_put(struct rbd_spec *spec);
+static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev);
+static void rbd_dev_header_unwatch_sync(struct rbd_device *rbd_dev);
 
 static int rbd_dev_id_to_minor(int dev_id)
 {
@@ -3103,19 +3105,17 @@ out:
 	return ret;
 }
 
-static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, s32 return_code,
-			 u64 notifier_gid, void *data, void *payload,
-			 u32 payload_len)
+static void rbd_watch_cb(void *arg, u64 notify_id, u64 cookie, u64 notifier_id,
+                        void *data, size_t data_len)
 {
-	struct rbd_device *rbd_dev = (struct rbd_device *)data;
+	struct rbd_device *rbd_dev = (struct rbd_device *)arg;
 	int ret;
 
 	if (!rbd_dev)
 		return;
 
-	dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
-		rbd_dev->header_name, (unsigned long long)notify_id,
-		(unsigned int)opcode);
+	dout("%s: \"%s\" notify_id %llu bl len %lu\n", __func__,
+	    rbd_dev->header_name, (unsigned long long)notify_id, data_len);
 
 	/*
 	 * Until adequate refresh error handling is in place, there is
@@ -3132,6 +3132,26 @@ static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, s32 return_code,
 		rbd_warn(rbd_dev, "notify_ack ret %d", ret);
 }
 
+static void rbd_watch_error_cb(void *arg, u64 cookie, int err)
+{
+	struct rbd_device *rbd_dev = (struct rbd_device *)arg;
+	int ret;
+
+	dout("%s: watch error %d on cookie %llu\n", rbd_dev->header_name,
+		err, cookie);
+	rbd_warn(rbd_dev, "%s: watch error %d on cookie %llu\n",
+	         rbd_dev->header_name, err, cookie);
+
+	/* reset watch */
+	rbd_dev_refresh(rbd_dev);
+	rbd_dev_header_unwatch_sync(rbd_dev);
+	ret = rbd_dev_header_watch_sync(rbd_dev);
+	BUG_ON(ret); /* XXX: was the image deleted? can we be more graceful? */
+	rbd_dev_refresh(rbd_dev);
+	if (ret)
+		rbd_warn(rbd_dev, "refresh failed: %d", ret);
+}
+
 /*
  * Send a (un)watch request and wait for the ack.  Return a request
  * with a ref held on success or error.
@@ -3199,13 +3219,14 @@ static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev)
 	rbd_assert(!rbd_dev->watch_event);
 	rbd_assert(!rbd_dev->watch_request);
 
-	ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
-				     &rbd_dev->watch_event);
+	ret = ceph_osdc_create_watch_event(osdc, rbd_watch_cb,
+	                                  rbd_watch_error_cb,
+	                                  rbd_dev, &rbd_dev->watch_event);
 	if (ret < 0)
 		return ret;
 
 	obj_request = rbd_obj_watch_request_helper(rbd_dev,
-						CEPH_OSD_WATCH_OP_LEGACY_WATCH);
+						CEPH_OSD_WATCH_OP_WATCH);
 	if (IS_ERR(obj_request)) {
 		ceph_osdc_cancel_event(rbd_dev->watch_event);
 		rbd_dev->watch_event = NULL;
diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h
index 12732d3..b7d4234 100644
--- a/include/linux/ceph/osd_client.h
+++ b/include/linux/ceph/osd_client.h
@@ -108,6 +108,7 @@ struct ceph_osd_req_op {
 			u64 ver;
 			__u8 op;
 			u32 gen;
+			struct ceph_osd_data request_data;
 		} watch;
 		struct {
 			u64 cookie;
@@ -186,13 +187,21 @@ struct ceph_request_redirect {
 
 struct ceph_osd_event {
 	u64 cookie;
-	int one_shot;
 	struct ceph_osd_client *osdc;
-	void (*cb)(u64, u64, u8, s32, u64, void *, void *, u32);
+	struct ceph_osd_request *osd_req;
 	void *data;
 	struct rb_node node;
-	struct list_head osd_node;
 	struct kref kref;
+	union {
+		struct {
+			void (*watchcb)(void *, u64, u64, u64, void *, size_t);
+			void (*errcb)(void *, u64, int);
+		} watch;
+		struct {
+			struct ceph_msg_data *notify_data;
+			struct completion complete;
+		} notify;
+	};
 };
 
 struct ceph_osd_event_work {
@@ -385,10 +394,14 @@ extern int ceph_osdc_writepages(struct ceph_osd_client *osdc,
 				struct page **pages, int nr_pages);
 
 /* watch/notify events */
-extern int ceph_osdc_create_event(struct ceph_osd_client *osdc,
-				  void (*event_cb)(u64, u64, u8, s32, u64,
-						   void *, void *, u32),
-				  void *data, struct ceph_osd_event **pevent);
+extern int ceph_osdc_create_watch_event(struct ceph_osd_client *osdc,
+                         void (*watchcb)(void *, u64, u64, u64, void *, size_t),
+                         void (*errcb)(void *, u64, int),
+                         void *data, struct ceph_osd_event **pevent);
+extern int ceph_osdc_create_notify_event(struct ceph_osd_client *osdc,
+                                         struct ceph_osd_event **pevent);
+extern int ceph_osdc_wait_event(struct ceph_osd_client *osdc,
+				struct ceph_osd_event *event);
 extern void ceph_osdc_cancel_event(struct ceph_osd_event *event);
 extern void ceph_osdc_put_event(struct ceph_osd_event *event);
 #endif
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index 74650e1..d435bf2 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -36,7 +36,13 @@ static void __unregister_linger_request(struct ceph_osd_client *osdc,
 					struct ceph_osd_request *req);
 static void __enqueue_request(struct ceph_osd_request *req);
 static void __send_request(struct ceph_osd_client *osdc,
-			   struct ceph_osd_request *req);
+                           struct ceph_osd_request *req);
+static struct ceph_osd_event *__find_event(struct ceph_osd_client *osdc,
+                                           u64 cookie);
+static void __do_event(struct ceph_osd_client *osdc, u8 opcode,
+                       u64 cookie, u64 notify_id, u32 payload_len,
+                       void *payload, s32 return_code, u64 notifier_gid,
+                       struct ceph_msg_data *data);
 
 /*
  * Implement client access to distributed object storage cluster.
@@ -615,10 +621,12 @@ int osd_req_op_xattr_init(struct ceph_osd_request *osd_req, unsigned int which,
 }
 EXPORT_SYMBOL(osd_req_op_xattr_init);
 
-void osd_req_op_notify_init(struct ceph_osd_request *osd_req, unsigned int which,
+void osd_req_op_notify_init(struct ceph_osd_request *osd_req,
+                            unsigned int which,
 			    u16 opcode, u64 cookie)
 {
-	struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, opcode);
+	struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, opcode, 0);
+	struct ceph_osd_event *notify_event;
 
 	BUG_ON(opcode != CEPH_OSD_OP_NOTIFY);
 	op->watch.cookie = cookie;
@@ -2273,7 +2281,7 @@ void ceph_osdc_put_event(struct ceph_osd_event *event)
 EXPORT_SYMBOL(ceph_osdc_put_event);
 
 static void __insert_event(struct ceph_osd_client *osdc,
-			     struct ceph_osd_event *new)
+                           struct ceph_osd_event *new)
 {
 	struct rb_node **p = &osdc->event_tree.rb_node;
 	struct rb_node *parent = NULL;
@@ -2295,7 +2303,7 @@ static void __insert_event(struct ceph_osd_client *osdc,
 }
 
 static struct ceph_osd_event *__find_event(struct ceph_osd_client *osdc,
-					        u64 cookie)
+                                           u64 cookie)
 {
 	struct rb_node **p = &osdc->event_tree.rb_node;
 	struct rb_node *parent = NULL;
@@ -2327,27 +2335,60 @@ static void __remove_event(struct ceph_osd_event *event)
 	}
 }
 
-int ceph_osdc_create_event(struct ceph_osd_client *osdc,
-			   void (*event_cb)(u64, u64, u8, s32, u64, void *,
-					    void *, u32),
-			   void *data, struct ceph_osd_event **pevent)
+static struct ceph_osd_event *__alloc_event(struct ceph_osd_client *osdc,
+                                            void *data)
 {
 	struct ceph_osd_event *event;
 
 	event = kmalloc(sizeof(*event), GFP_NOIO);
 	if (!event)
-		return -ENOMEM;
+		return NULL;
 
 	dout("create_event %p\n", event);
-	event->cb = event_cb;
-	event->one_shot = 0;
 	event->data = data;
 	event->osdc = osdc;
-	INIT_LIST_HEAD(&event->osd_node);
+	event->osd_req = NULL;
 	RB_CLEAR_NODE(&event->node);
 	kref_init(&event->kref);   /* one ref for us */
 	kref_get(&event->kref);    /* one ref for the caller */
 
+	return event;
+}
+
+int ceph_osdc_create_watch_event (struct ceph_osd_client *osdc,
+                         void (*watchcb)(void *, u64, u64, u64, void *, size_t),
+                         void (*errcb)(void *, u64, int),
+                         void *data, struct ceph_osd_event **pevent)
+{
+	struct ceph_osd_event *event;
+	
+	event = __alloc_event(osdc, data);
+	if (!event)
+		return -ENOMEM;
+
+	event->watch.watchcb = watchcb;
+	event->watch.errcb = errcb;
+
+	spin_lock(&osdc->event_lock);
+	event->cookie = ++osdc->event_count;
+	__insert_event(osdc, event);
+	spin_unlock(&osdc->event_lock);
+	*pevent = event;
+	return 0;
+}
+EXPORT_SYMBOL(ceph_osdc_create_watch_event);
+
+int ceph_osdc_create_notify_event(struct ceph_osd_client *osdc,
+                                  struct ceph_osd_event **pevent)
+{
+	struct ceph_osd_event *event;
+	
+	event = __alloc_event(osdc, NULL);
+	if (!event)
+		return -ENOMEM;
+
+	init_completion(&event->notify.complete);
+
 	spin_lock(&osdc->event_lock);
 	event->cookie = ++osdc->event_count;
 	__insert_event(osdc, event);
@@ -2356,7 +2397,15 @@ int ceph_osdc_create_event(struct ceph_osd_client *osdc,
 	*pevent = event;
 	return 0;
 }
-EXPORT_SYMBOL(ceph_osdc_create_event);
+EXPORT_SYMBOL(ceph_osdc_create_notify_event);
+
+int ceph_osdc_wait_event (struct ceph_osd_client *osdc,
+                          struct ceph_osd_event *event)
+{
+	wait_for_completion(&event->notify.complete);
+	return 0;
+}
+EXPORT_SYMBOL(ceph_osdc_wait_event);
 
 void ceph_osdc_cancel_event(struct ceph_osd_event *event)
 {
@@ -2376,20 +2425,79 @@ static void do_event_work(struct work_struct *work)
 	struct ceph_osd_event_work *event_work =
 		container_of(work, struct ceph_osd_event_work, work);
 	struct ceph_osd_event *event = event_work->event;
-	u64 ver = event_work->ver;
 	u64 notify_id = event_work->notify_id;
 	u8 opcode = event_work->opcode;
 	s32 return_code = event_work->return_code;
 	u64 notifier_gid = event_work->notifier_gid;
 
 	dout("do_event_work completing %p\n", event);
-	event->cb(ver, notify_id, opcode, return_code, notifier_gid,
-		  event->data, event_work->payload, event_work->payload_len);
+	if (opcode == CEPH_WATCH_EVENT_NOTIFY)
+		event->watch.watchcb(event->data, notify_id,
+		                     event->cookie, notifier_gid,
+		                     event_work->payload,
+		                     event_work->payload_len);
+	else if (opcode == CEPH_WATCH_EVENT_DISCONNECT && event->watch.errcb)
+		event->watch.errcb(event->data, event->cookie, return_code);
 	dout("do_event_work completed %p\n", event);
 	ceph_osdc_put_event(event);
 	kfree(event_work);
 }
 
+static void __do_event(struct ceph_osd_client *osdc, u8 opcode,
+                       u64 cookie, u64 notify_id, u32 payload_len,
+                       void *payload, s32 return_code, u64 notifier_gid,
+                       struct ceph_msg_data *data)
+{
+	struct ceph_osd_event *event;
+	struct ceph_osd_event_work *event_work;
+
+	spin_lock(&osdc->event_lock);
+	event = __find_event(osdc, cookie);
+	if (event)
+		get_event(event);
+	spin_unlock(&osdc->event_lock);
+
+	dout("handle_watch_notify cookie %lld event %p notify id %llu payload "
+	     "len %u return code %d notifier gid %llu\n",
+             cookie, event, notify_id, payload_len, return_code, notifier_gid);
+	switch(opcode) {
+		case CEPH_WATCH_EVENT_NOTIFY:
+		case CEPH_WATCH_EVENT_DISCONNECT:
+			if (event) {
+				event_work = kmalloc(sizeof(*event_work),
+				                     GFP_NOIO);
+				if (!event_work) {
+					pr_err("couldn't allocate event_work\n");
+					ceph_osdc_put_event(event);
+					return;
+				}
+				INIT_WORK(&event_work->work, do_event_work);
+				event_work->event = event;
+				event_work->notify_id = notify_id;
+				event_work->opcode = opcode;
+				event_work->return_code = return_code;
+				event_work->notifier_gid = notifier_gid;
+				event_work->payload = payload;
+				event_work->payload_len = payload_len;
+
+				queue_work(osdc->notify_wq, &event_work->work);
+			}
+			break;
+		case CEPH_WATCH_EVENT_NOTIFY_COMPLETE:
+			if (event) {
+				event->notify.notify_data = data;
+				if (event->osd_req) {
+					ceph_osdc_cancel_request(event->osd_req);
+					event->osd_req = NULL;
+				}
+				complete_all(&event->notify.complete);
+			}
+			break;
+		default:
+			BUG();
+			break;
+	}
+}
 
 /*
  * Process osd watch notifications
@@ -2398,13 +2506,12 @@ static void handle_watch_notify(struct ceph_osd_client *osdc,
 				struct ceph_msg *msg)
 {
 	void *p, *end, *payload = NULL;
+	struct ceph_msg_data *data = NULL;
 	u8 proto_ver;
 	u64 cookie, ver, notify_id, notifier_gid = 0;
 	u8 opcode;
 	u32 payload_len = 0;
 	s32 return_code = 0;
-	struct ceph_osd_event *event;
-	struct ceph_osd_event_work *event_work;
 
 	p = msg->front.iov_base;
 	end = p + msg->front.iov_len;
@@ -2429,34 +2536,8 @@ static void handle_watch_notify(struct ceph_osd_client *osdc,
 	if (msg->hdr.version >= 3)
 		ceph_decode_32_safe(&p, end, notifier_gid, bad);
 
-	spin_lock(&osdc->event_lock);
-	event = __find_event(osdc, cookie);
-	if (event) {
-		BUG_ON(event->one_shot);
-		get_event(event);
-	}
-	spin_unlock(&osdc->event_lock);
-	dout("handle_watch_notify cookie %lld ver %lld event %p notify id %llu payload len %u return code %d notifier gid %llu\n",
-	     cookie, ver, event, notify_id, payload_len, return_code, notifier_gid);
-	if (event) {
-		event_work = kmalloc(sizeof(*event_work), GFP_NOIO);
-		if (!event_work) {
-			pr_err("couldn't allocate event_work\n");
-			ceph_osdc_put_event(event);
-			return;
-		}
-		INIT_WORK(&event_work->work, do_event_work);
-		event_work->event = event;
-		event_work->ver = ver;
-		event_work->notify_id = notify_id;
-		event_work->opcode = opcode;
-		event_work->return_code = return_code;
-		event_work->notifier_gid = notifier_gid;
-		event_work->payload = payload;
-		event_work->payload_len = payload_len;
-
-		queue_work(osdc->notify_wq, &event_work->work);
-	}
+	__do_event(osdc, opcode, cookie, notify_id, payload_len, payload,
+		return_code, notifier_gid, data);
 
 	return;
 
-- 
1.9.3


^ permalink raw reply related	[flat|nested] 18+ messages in thread

* [PATCH 5/6] osd_client: add support for notify payloads via notify event
  2015-06-12 15:56 [PATCH 0/6] support watch/notify version 2 Douglas Fuller
                   ` (3 preceding siblings ...)
  2015-06-12 15:56 ` [PATCH 4/6] osd_client, rbd: update event interface for watch/notify2 Douglas Fuller
@ 2015-06-12 15:56 ` Douglas Fuller
  2015-06-16 15:26   ` Mike Christie
  2015-06-12 15:56 ` [PATCH 6/6] osd_client: send watch ping messages Douglas Fuller
  5 siblings, 1 reply; 18+ messages in thread
From: Douglas Fuller @ 2015-06-12 15:56 UTC (permalink / raw)
  To: ceph-devel

Add support in notify events for receiving data from notify_ack. Notify
events are optional; data is discarded if no event is found.

Signed-off-by: Douglas Fuller <dfuller@redhat.com>
---
 net/ceph/osd_client.c | 39 ++++++++++++++++++++++++++++++++++++---
 1 file changed, 36 insertions(+), 3 deletions(-)

diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index d435bf2..d56f7a6 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -267,7 +267,7 @@ void osd_req_op_notify_request_data_pagelist(
 {
 	struct ceph_osd_data *osd_data;
 
-	osd_data = osd_req_op_data(osd_req, which, notify, request_data);
+	osd_data = osd_req_op_data(osd_req, which, watch, request_data);
 	ceph_osd_data_pagelist_init(osd_data, pagelist);
 }
 EXPORT_SYMBOL(osd_req_op_notify_request_data_pagelist);
@@ -629,6 +629,13 @@ void osd_req_op_notify_init(struct ceph_osd_request *osd_req,
 	struct ceph_osd_event *notify_event;
 
 	BUG_ON(opcode != CEPH_OSD_OP_NOTIFY);
+
+	notify_event = __find_event(osd_req->r_osdc, cookie);
+	/* Only linger if the caller is interested in the notify acks. */
+	if (notify_event) {
+		ceph_osdc_set_request_linger(osd_req->r_osdc, osd_req);
+		notify_event->osd_req = osd_req;
+	}
 	op->watch.cookie = cookie;
 }
 EXPORT_SYMBOL(osd_req_op_notify_init);
@@ -767,6 +774,14 @@ static u64 osd_req_encode_op(struct ceph_osd_request *req,
 		ceph_osdc_msg_data_add(req->r_reply, osd_data);
 		break;
 	case CEPH_OSD_OP_NOTIFY_ACK:
+		osd_data = &src->watch.request_data;
+		data_length = ceph_osd_data_length(osd_data);
+		if (data_length) {
+			ceph_osdc_msg_data_add(req->r_request, osd_data);
+			src->payload_len += data_length;
+			request_data_len += data_length;
+		}
+		/* fallthrough */
 	case CEPH_OSD_OP_WATCH:
 		dst->watch.cookie = cpu_to_le64(src->watch.cookie);
 		dst->watch.ver = cpu_to_le64(src->watch.ver);
@@ -2533,8 +2548,10 @@ static void handle_watch_notify(struct ceph_osd_client *osdc,
 	if (msg->hdr.version >= 2)
 		ceph_decode_32_safe(&p, end, return_code, bad);
 
-	if (msg->hdr.version >= 3)
+	if (msg->hdr.version >= 3) {
 		ceph_decode_32_safe(&p, end, notifier_gid, bad);
+		data = list_first_entry(&msg->data, struct ceph_msg_data, links);
+	}
 
 	__do_event(osdc, opcode, cookie, notify_id, payload_len, payload,
 		return_code, notifier_gid, data);
@@ -3061,7 +3078,23 @@ static struct ceph_msg *alloc_msg(struct ceph_connection *con,
 	switch (type) {
 	case CEPH_MSG_OSD_MAP:
 	case CEPH_MSG_WATCH_NOTIFY:
-		return ceph_msg_new(type, front, GFP_NOFS, false);
+		{
+			struct ceph_msg *m = ceph_msg_new(type, front, GFP_NOFS, false);
+			size_t len = con->in_hdr.data_len;
+			if (len > 0) {
+				struct page **pages;
+				struct ceph_osd_data osd_data;
+				pages = ceph_alloc_page_vector(calc_pages_for(0, len), GFP_KERNEL);
+				osd_data.type = CEPH_OSD_DATA_TYPE_PAGES;
+				osd_data.pages = pages;
+				osd_data.length = len;
+				osd_data.alignment = 0;
+				osd_data.pages_from_pool = false;
+				osd_data.own_pages = false;
+				ceph_osdc_msg_data_add(m, &osd_data);
+			}
+			return m;
+		}
 	case CEPH_MSG_OSD_OPREPLY:
 		return get_reply(con, hdr, skip);
 	default:
-- 
1.9.3


^ permalink raw reply related	[flat|nested] 18+ messages in thread

* [PATCH 6/6] osd_client: send watch ping messages
  2015-06-12 15:56 [PATCH 0/6] support watch/notify version 2 Douglas Fuller
                   ` (4 preceding siblings ...)
  2015-06-12 15:56 ` [PATCH 5/6] osd_client: add support for notify payloads via notify event Douglas Fuller
@ 2015-06-12 15:56 ` Douglas Fuller
  2015-06-16 15:07   ` Mike Christie
  5 siblings, 1 reply; 18+ messages in thread
From: Douglas Fuller @ 2015-06-12 15:56 UTC (permalink / raw)
  To: ceph-devel

Send CEPH_OSD_WATCH_OP_PING every osd_keepalive_timeout for each watch
event registered. When errors are detected, look up the watch event and
send it CEPH_WATCH_EVENT_DISCONNECTED.

Signed-off-by: Douglas Fuller <dfuller@redhat.com>
---
 include/linux/ceph/osd_client.h |   1 +
 net/ceph/osd_client.c           | 102 +++++++++++++++++++++++++++++++++++++++-
 2 files changed, 101 insertions(+), 2 deletions(-)

diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h
index b7d4234..5aef3db 100644
--- a/include/linux/ceph/osd_client.h
+++ b/include/linux/ceph/osd_client.h
@@ -237,6 +237,7 @@ struct ceph_osd_client {
 	int                    num_requests;
 	struct delayed_work    timeout_work;
 	struct delayed_work    osds_timeout_work;
+	struct delayed_work    linger_ping_work;
 #ifdef CONFIG_DEBUG_FS
 	struct dentry 	       *debugfs_file;
 #endif
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index d56f7a6..e57db93 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -109,6 +109,7 @@ static void ceph_osd_data_pages_init(struct ceph_osd_data *osd_data,
 	osd_data->own_pages = own_pages;
 }
 
+
 static void ceph_osd_data_pagelist_init(struct ceph_osd_data *osd_data,
 			struct ceph_pagelist *pagelist)
 {
@@ -1362,6 +1363,13 @@ static void __register_linger_request(struct ceph_osd_client *osdc,
 	dout("%s %p tid %llu\n", __func__, req, req->r_tid);
 	WARN_ON(!req->r_linger);
 
+	++req->r_ops[0].watch.gen;
+
+	if (list_empty(&osdc->req_linger))
+		schedule_delayed_work(&osdc->linger_ping_work,
+			       round_jiffies_relative(
+			         osdc->client->options->osd_keepalive_timeout));
+
 	ceph_osdc_get_request(req);
 	list_add_tail(&req->r_linger_item, &osdc->req_linger);
 	if (req->r_osd)
@@ -1382,6 +1390,12 @@ static void __unregister_linger_request(struct ceph_osd_client *osdc,
 
 	dout("%s %p tid %llu\n", __func__, req, req->r_tid);
 	list_del_init(&req->r_linger_item);
+	if (++req->r_ops[0].watch.gen > 1 &&
+		req->r_ops[0].watch.op == CEPH_OSD_WATCH_OP_WATCH) {
+		struct timespec mtime = CURRENT_TIME;
+		req->r_ops[0].watch.op = CEPH_OSD_WATCH_OP_RECONNECT;
+		ceph_osdc_build_request(req, 0, req->r_snapc, req->r_snapid, &mtime);
+	}
 
 	if (req->r_osd) {
 		list_del_init(&req->r_linger_osd_item);
@@ -1390,6 +1404,9 @@ static void __unregister_linger_request(struct ceph_osd_client *osdc,
 			req->r_osd = NULL;
 	}
 	ceph_osdc_put_request(req);
+
+	if (list_empty(&osdc->req_linger))
+		cancel_delayed_work(&osdc->linger_ping_work);
 }
 
 void ceph_osdc_set_request_linger(struct ceph_osd_client *osdc,
@@ -1707,6 +1724,83 @@ static void handle_osds_timeout(struct work_struct *work)
 			      round_jiffies_relative(delay));
 }
 
+static void __ping_callback(struct ceph_osd_request *osd_req,
+               struct ceph_msg *msg)
+{
+	struct ceph_osd_req_op * info = &osd_req->r_ops[0];
+	struct ceph_osd_request *target = osd_req->r_priv;
+	u64 result = osd_req->r_reply_op_result[0];
+
+	dout("got pong result %llu\n", result);
+
+	if (target->r_ops[0].watch.gen != info->watch.gen) {
+		dout("ignoring pong result out of phase (%u != %u)\n",
+		     target->r_ops[0].watch.gen, info->watch.gen);
+		return;
+	}
+	if (result != 0)
+		__do_event(osd_req->r_osdc, CEPH_WATCH_EVENT_DISCONNECT,
+		           info->watch.cookie, 0, 0, NULL, result, 0, NULL);
+
+	ceph_osdc_put_request(target);
+	ceph_osdc_put_request(osd_req);
+}
+
+static void __send_linger_ping(struct ceph_osd_request *req)
+{
+	struct ceph_osd_request *ping_req;
+	int ret;
+
+	dout("ping for watch %llu\n", req->r_tid);
+
+	ping_req = ceph_osdc_alloc_request(req->r_osdc, NULL, 1, false,
+	                                   GFP_NOIO);
+	if (!ping_req) {
+		WARN(true, "failed to allocate memory to ping, skipping");
+		return;
+	}
+
+	ping_req->r_base_oloc.pool = req->r_base_oloc.pool;
+	ping_req->r_flags = CEPH_OSD_OP_READ;
+	ceph_oid_copy(&ping_req->r_base_oid, &req->r_base_oid);
+	ping_req->r_callback = __ping_callback;
+	osd_req_op_watch_init(ping_req, 0, CEPH_OSD_OP_WATCH,
+	                      CEPH_OSD_WATCH_OP_PING,
+	                      req->r_ops[0].watch.cookie);
+	ping_req->r_ops[0].watch.gen = req->r_ops[0].watch.gen;
+	ping_req->r_priv = req;
+	ceph_osdc_build_request(ping_req, 0, NULL, cpu_to_le64(CEPH_NOSNAP),
+	                        NULL);
+	ceph_osdc_get_request(req);
+	ret = ceph_osdc_start_request(req->r_osdc, ping_req, false);
+	if (ret) {
+		ceph_osdc_put_request(ping_req);
+		ceph_osdc_cancel_request(ping_req);
+	}
+}
+
+static void handle_linger_ping(struct work_struct *work)
+{
+	struct ceph_osd_client *osdc;
+
+	struct ceph_osd_request *req, *nreq;
+
+	osdc = container_of(work, struct ceph_osd_client,
+	                    linger_ping_work.work);
+
+	dout("scanning for watches to ping about\n");
+
+	list_for_each_entry_safe(req, nreq, &osdc->req_linger, r_linger_item) {
+		int i;
+		for (i = 0; i < req->r_num_ops; i++) {
+			if (req->r_ops[i].op == CEPH_OSD_OP_WATCH)
+				__send_linger_ping(req);
+		}
+	}
+	schedule_delayed_work(&osdc->linger_ping_work, 
+	                      osdc->client->options->osd_keepalive_timeout);
+}
+
 static int ceph_oloc_decode(void **p, void *end,
 			    struct ceph_object_locator *oloc)
 {
@@ -2795,6 +2889,7 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
 	osdc->num_requests = 0;
 	INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout);
 	INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout);
+	INIT_DELAYED_WORK(&osdc->linger_ping_work, handle_linger_ping);
 	spin_lock_init(&osdc->event_lock);
 	osdc->event_tree = RB_ROOT;
 	osdc->event_count = 0;
@@ -3079,12 +3174,15 @@ static struct ceph_msg *alloc_msg(struct ceph_connection *con,
 	case CEPH_MSG_OSD_MAP:
 	case CEPH_MSG_WATCH_NOTIFY:
 		{
-			struct ceph_msg *m = ceph_msg_new(type, front, GFP_NOFS, false);
+			struct ceph_msg *m = ceph_msg_new(type, front,
+			                                  GFP_NOFS, false);
 			size_t len = con->in_hdr.data_len;
 			if (len > 0) {
 				struct page **pages;
 				struct ceph_osd_data osd_data;
-				pages = ceph_alloc_page_vector(calc_pages_for(0, len), GFP_KERNEL);
+				pages = ceph_alloc_page_vector(
+				              calc_pages_for(0, len), GFP_NOFS);
+				WARN_ON(!pages);
 				osd_data.type = CEPH_OSD_DATA_TYPE_PAGES;
 				osd_data.pages = pages;
 				osd_data.length = len;
-- 
1.9.3


^ permalink raw reply related	[flat|nested] 18+ messages in thread

* Re: [PATCH 4/6] osd_client, rbd: update event interface for watch/notify2
  2015-06-12 15:56 ` [PATCH 4/6] osd_client, rbd: update event interface for watch/notify2 Douglas Fuller
@ 2015-06-16 14:58   ` Mike Christie
  2015-06-16 17:05     ` Douglas Fuller
  2015-06-16 23:18   ` Josh Durgin
  1 sibling, 1 reply; 18+ messages in thread
From: Mike Christie @ 2015-06-16 14:58 UTC (permalink / raw)
  To: Douglas Fuller, ceph-devel

On 06/12/2015 10:56 AM, Douglas Fuller wrote:
> +static void rbd_watch_error_cb(void *arg, u64 cookie, int err)
> +{
> +	struct rbd_device *rbd_dev = (struct rbd_device *)arg;
> +	int ret;
> +
> +	dout("%s: watch error %d on cookie %llu\n", rbd_dev->header_name,
> +		err, cookie);
> +	rbd_warn(rbd_dev, "%s: watch error %d on cookie %llu\n",
> +	         rbd_dev->header_name, err, cookie);
> +
> +	/* reset watch */
> +	rbd_dev_refresh(rbd_dev);
> +	rbd_dev_header_unwatch_sync(rbd_dev);
> +	ret = rbd_dev_header_watch_sync(rbd_dev);
> +	BUG_ON(ret); /* XXX: was the image deleted? can we be more graceful? */

Is this for debugging only? BUG()/BUG_ON() can kill the system. We
normally use it for cases where proceeding might cause something like
data corruption or where we want to catch programming bugs early on like
passing incorrect args to a function.

The other caller if this function does not escalate like this function.
Are you sure you need to here? The code below will not run if we BUG
above, so if you did want to BUG, you would want to move the rbd_warn
before it.

> +	rbd_dev_refresh(rbd_dev);
> +	if (ret)
> +		rbd_warn(rbd_dev, "refresh failed: %d", ret);
> +}
> +
>  /*



> diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
> index 74650e1..d435bf2 100644
> --- a/net/ceph/osd_client.c
> +++ b/net/ceph/osd_client.c
> @@ -36,7 +36,13 @@ static void __unregister_linger_request(struct ceph_osd_client *osdc,
>  					struct ceph_osd_request *req);
>  static void __enqueue_request(struct ceph_osd_request *req);
>  static void __send_request(struct ceph_osd_client *osdc,
> -			   struct ceph_osd_request *req);
> +                           struct ceph_osd_request *req);
> +static struct ceph_osd_event *__find_event(struct ceph_osd_client *osdc,
> +                                           u64 cookie);
> +static void __do_event(struct ceph_osd_client *osdc, u8 opcode,
> +                       u64 cookie, u64 notify_id, u32 payload_len,
> +                       void *payload, s32 return_code, u64 notifier_gid,
> +                       struct ceph_msg_data *data);

We should not be adding these declarations if they are not needed.


> +}
> +
> +int ceph_osdc_create_watch_event (struct ceph_osd_client *osdc,

Not sure if it is my mailer, but there seem to be several places where
there are extra spaces between the function namd and initial "(" like above.

> +                         void (*watchcb)(void *, u64, u64, u64, void *, size_t),
> +                         void (*errcb)(void *, u64, int),
> +                         void *data, struct ceph_osd_event **pevent)
> +{
> +	struct ceph_osd_event *event;
> +	
> +	event = __alloc_event(osdc, data);
> +	if (!event)
> +		return -ENOMEM;
> +
> +	event->watch.watchcb = watchcb;
> +	event->watch.errcb = errcb;
> +
> +	spin_lock(&osdc->event_lock);
> +	event->cookie = ++osdc->event_count;
> +	__insert_event(osdc, event);
> +	spin_unlock(&osdc->event_lock);
> +	*pevent = event;
> +	return 0;
> +}
> +EXPORT_SYMBOL(ceph_osdc_create_watch_event);
> +
> +int ceph_osdc_create_notify_event(struct ceph_osd_client *osdc,
> +                                  struct ceph_osd_event **pevent)
> +{
> +	struct ceph_osd_event *event;
> +	
> +	event = __alloc_event(osdc, NULL);
> +	if (!event)
> +		return -ENOMEM;
> +
> +	init_completion(&event->notify.complete);
> +
>  	spin_lock(&osdc->event_lock);
>  	event->cookie = ++osdc->event_count;
>  	__insert_event(osdc, event);
> @@ -2356,7 +2397,15 @@ int ceph_osdc_create_event(struct ceph_osd_client *osdc,
>  	*pevent = event;
>  	return 0;
>  }
> -EXPORT_SYMBOL(ceph_osdc_create_event);
> +EXPORT_SYMBOL(ceph_osdc_create_notify_event);
> +
> +int ceph_osdc_wait_event (struct ceph_osd_client *osdc,
> +                          struct ceph_osd_event *event)
> +{
> +	wait_for_completion(&event->notify.complete);
> +	return 0;

If it's not a interruptible or timed wait then I think you can just kill
the return value.

> +}
> +EXPORT_SYMBOL(ceph_osdc_wait_event);
>  
>  void ceph_osdc_cancel_event(struct ceph_osd_event *event)
>  {
> @@ -2376,20 +2425,79 @@ static void do_event_work(struct work_struct *work)
>  	struct ceph_osd_event_work *event_work =
>  		container_of(work, struct ceph_osd_event_work, work);
>  	struct ceph_osd_event *event = event_work->event;
> -	u64 ver = event_work->ver;
>  	u64 notify_id = event_work->notify_id;
>  	u8 opcode = event_work->opcode;
>  	s32 return_code = event_work->return_code;
>  	u64 notifier_gid = event_work->notifier_gid;
>  
>  	dout("do_event_work completing %p\n", event);
> -	event->cb(ver, notify_id, opcode, return_code, notifier_gid,
> -		  event->data, event_work->payload, event_work->payload_len);
> +	if (opcode == CEPH_WATCH_EVENT_NOTIFY)
> +		event->watch.watchcb(event->data, notify_id,
> +		                     event->cookie, notifier_gid,
> +		                     event_work->payload,
> +		                     event_work->payload_len);
> +	else if (opcode == CEPH_WATCH_EVENT_DISCONNECT && event->watch.errcb)
> +		event->watch.errcb(event->data, event->cookie, return_code);
>  	dout("do_event_work completed %p\n", event);
>  	ceph_osdc_put_event(event);
>  	kfree(event_work);
>  }
>  
> +static void __do_event(struct ceph_osd_client *osdc, u8 opcode,
> +                       u64 cookie, u64 notify_id, u32 payload_len,
> +                       void *payload, s32 return_code, u64 notifier_gid,
> +                       struct ceph_msg_data *data)
> +{
> +	struct ceph_osd_event *event;
> +	struct ceph_osd_event_work *event_work;
> +
> +	spin_lock(&osdc->event_lock);
> +	event = __find_event(osdc, cookie);
> +	if (event)
> +		get_event(event);
> +	spin_unlock(&osdc->event_lock);
> +
> +	dout("handle_watch_notify cookie %lld event %p notify id %llu payload "
> +	     "len %u return code %d notifier gid %llu\n",
> +             cookie, event, notify_id, payload_len, return_code, notifier_gid);
> +	switch(opcode) {
> +		case CEPH_WATCH_EVENT_NOTIFY:
> +		case CEPH_WATCH_EVENT_DISCONNECT:
> +			if (event) {
> +				event_work = kmalloc(sizeof(*event_work),
> +				                     GFP_NOIO);
> +				if (!event_work) {
> +					pr_err("couldn't allocate event_work\n");
> +					ceph_osdc_put_event(event);
> +					return;
> +				}
> +				INIT_WORK(&event_work->work, do_event_work);
> +				event_work->event = event;
> +				event_work->notify_id = notify_id;
> +				event_work->opcode = opcode;
> +				event_work->return_code = return_code;
> +				event_work->notifier_gid = notifier_gid;
> +				event_work->payload = payload;
> +				event_work->payload_len = payload_len;
> +
> +				queue_work(osdc->notify_wq, &event_work->work);
> +			}
> +			break;
> +		case CEPH_WATCH_EVENT_NOTIFY_COMPLETE:
> +			if (event) {
> +				event->notify.notify_data = data;
> +				if (event->osd_req) {
> +					ceph_osdc_cancel_request(event->osd_req);
> +					event->osd_req = NULL;
> +				}
> +				complete_all(&event->notify.complete);
> +			}
> +			break;
> +		default:
> +			BUG();
> +			break;

No need to break after BUG()ing.

^ permalink raw reply	[flat|nested] 18+ messages in thread

* Re: [PATCH 6/6] osd_client: send watch ping messages
  2015-06-12 15:56 ` [PATCH 6/6] osd_client: send watch ping messages Douglas Fuller
@ 2015-06-16 15:07   ` Mike Christie
  0 siblings, 0 replies; 18+ messages in thread
From: Mike Christie @ 2015-06-16 15:07 UTC (permalink / raw)
  To: Douglas Fuller, ceph-devel

On 06/12/2015 10:56 AM, Douglas Fuller wrote:
>  static int ceph_oloc_decode(void **p, void *end,
>  			    struct ceph_object_locator *oloc)
>  {
> @@ -2795,6 +2889,7 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
>  	osdc->num_requests = 0;
>  	INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout);
>  	INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout);
> +	INIT_DELAYED_WORK(&osdc->linger_ping_work, handle_linger_ping);
>  	spin_lock_init(&osdc->event_lock);
>  	osdc->event_tree = RB_ROOT;
>  	osdc->event_count = 0;
> @@ -3079,12 +3174,15 @@ static struct ceph_msg *alloc_msg(struct ceph_connection *con,
>  	case CEPH_MSG_OSD_MAP:
>  	case CEPH_MSG_WATCH_NOTIFY:
>  		{
> -			struct ceph_msg *m = ceph_msg_new(type, front, GFP_NOFS, false);
> +			struct ceph_msg *m = ceph_msg_new(type, front,
> +			                                  GFP_NOFS, false);
>  			size_t len = con->in_hdr.data_len;
>  			if (len > 0) {
>  				struct page **pages;
>  				struct ceph_osd_data osd_data;
> -				pages = ceph_alloc_page_vector(calc_pages_for(0, len), GFP_KERNEL);
> +				pages = ceph_alloc_page_vector(
> +				              calc_pages_for(0, len), GFP_NOFS);
> +				WARN_ON(!pages);

Are you wanting this warn to get more info in case someone sends us a
really large buffer?

Handle the null pointer here like is done elsewhere. If you don't you
will get NULL pointer ooppses or other crashes due to there being a non
zero len but null pages pointer.


^ permalink raw reply	[flat|nested] 18+ messages in thread

* Re: [PATCH 5/6] osd_client: add support for notify payloads via notify event
  2015-06-12 15:56 ` [PATCH 5/6] osd_client: add support for notify payloads via notify event Douglas Fuller
@ 2015-06-16 15:26   ` Mike Christie
  2015-06-16 17:22     ` Douglas Fuller
  0 siblings, 1 reply; 18+ messages in thread
From: Mike Christie @ 2015-06-16 15:26 UTC (permalink / raw)
  To: Douglas Fuller, ceph-devel

On 06/12/2015 10:56 AM, Douglas Fuller wrote:
> @@ -2533,8 +2548,10 @@ static void handle_watch_notify(struct ceph_osd_client *osdc,
>  	if (msg->hdr.version >= 2)
>  		ceph_decode_32_safe(&p, end, return_code, bad);
>  
> -	if (msg->hdr.version >= 3)
> +	if (msg->hdr.version >= 3) {
>  		ceph_decode_32_safe(&p, end, notifier_gid, bad);
> +		data = list_first_entry(&msg->data, struct ceph_msg_data, links);

It's not completely clear how/who can use this data. Would rbd be
calling ceph_osdc_create_notify_event/ceph_osdc_create_notify_event, or
is some libceph code (net/ceph)?

If it's rbd, is it supposed to be digging into ceph_msg_data structs?
Did we want to pass it a pagelist or CEPH_OSD_DATA_TYPE_PAGES type of
pages array?



> +	}
>  
>  	__do_event(osdc, opcode, cookie, notify_id, payload_len, payload,
>  		return_code, notifier_gid, data);
> @@ -3061,7 +3078,23 @@ static struct ceph_msg *alloc_msg(struct ceph_connection *con,
>  	switch (type) {
>  	case CEPH_MSG_OSD_MAP:
>  	case CEPH_MSG_WATCH_NOTIFY:
> -		return ceph_msg_new(type, front, GFP_NOFS, false);
> +		{
> +			struct ceph_msg *m = ceph_msg_new(type, front, GFP_NOFS, false);
> +			size_t len = con->in_hdr.data_len;
> +			if (len > 0) {
> +				struct page **pages;
> +				struct ceph_osd_data osd_data;
> +				pages = ceph_alloc_page_vector(calc_pages_for(0, len), GFP_KERNEL);


You should use GFP_NOIO, or at least GFP_NOFS like above. Anything but
GFP_KERNEL.

Also handle the allocation failure like is done in other places.


> +				osd_data.type = CEPH_OSD_DATA_TYPE_PAGES;
> +				osd_data.pages = pages;
> +				osd_data.length = len;
> +				osd_data.alignment = 0;
> +				osd_data.pages_from_pool = false;
> +				osd_data.own_pages = false;
> +				ceph_osdc_msg_data_add(m, &osd_data);
> +			}
> +			return m;
> +		}
>  	case CEPH_MSG_OSD_OPREPLY:
>  		return get_reply(con, hdr, skip);
>  	default:
> 


^ permalink raw reply	[flat|nested] 18+ messages in thread

* Re: [PATCH 4/6] osd_client, rbd: update event interface for watch/notify2
  2015-06-16 14:58   ` Mike Christie
@ 2015-06-16 17:05     ` Douglas Fuller
  0 siblings, 0 replies; 18+ messages in thread
From: Douglas Fuller @ 2015-06-16 17:05 UTC (permalink / raw)
  To: Mike Christie; +Cc: ceph-devel



----- Original Message -----
> From: "Mike Christie" <mchristi@redhat.com>
> To: "Douglas Fuller" <dfuller@redhat.com>, ceph-devel@vger.kernel.org
> Sent: Tuesday, June 16, 2015 10:58:07 AM
> Subject: Re: [PATCH 4/6] osd_client, rbd: update event interface for watch/notify2
> 
> On 06/12/2015 10:56 AM, Douglas Fuller wrote:
> > +static void rbd_watch_error_cb(void *arg, u64 cookie, int err)
> > +{
> > +	struct rbd_device *rbd_dev = (struct rbd_device *)arg;
> > +	int ret;
> > +
> > +	dout("%s: watch error %d on cookie %llu\n", rbd_dev->header_name,
> > +		err, cookie);
> > +	rbd_warn(rbd_dev, "%s: watch error %d on cookie %llu\n",
> > +	         rbd_dev->header_name, err, cookie);
> > +
> > +	/* reset watch */
> > +	rbd_dev_refresh(rbd_dev);
> > +	rbd_dev_header_unwatch_sync(rbd_dev);
> > +	ret = rbd_dev_header_watch_sync(rbd_dev);
> > +	BUG_ON(ret); /* XXX: was the image deleted? can we be more graceful? */
> 
> Is this for debugging only? BUG()/BUG_ON() can kill the system. We
> normally use it for cases where proceeding might cause something like
> data corruption or where we want to catch programming bugs early on like
> passing incorrect args to a function.
> 
> The other caller if this function does not escalate like this function.
> Are you sure you need to here? The code below will not run if we BUG
> above, so if you did want to BUG, you would want to move the rbd_warn
> before it.

Thanks for the catch; this case is probably worse than rbd_warn() and not as bad as
BUG(). If the watch timed out or was disconnected and cannot be re-established, it's
likely the rbd image has been deleted out from under this client. We should probably
set the block device to a state where it just returns -EIO all the time at that point.

I have logic for that in my earlier patchset; I'll duplicate it here.


^ permalink raw reply	[flat|nested] 18+ messages in thread

* Re: [PATCH 5/6] osd_client: add support for notify payloads via notify event
  2015-06-16 15:26   ` Mike Christie
@ 2015-06-16 17:22     ` Douglas Fuller
  0 siblings, 0 replies; 18+ messages in thread
From: Douglas Fuller @ 2015-06-16 17:22 UTC (permalink / raw)
  To: Mike Christie; +Cc: ceph-devel

----- Original Message -----
> From: "Mike Christie" <mchristi@redhat.com>
> To: "Douglas Fuller" <dfuller@redhat.com>, ceph-devel@vger.kernel.org
> Sent: Tuesday, June 16, 2015 11:26:43 AM
> Subject: Re: [PATCH 5/6] osd_client: add support for notify payloads via notify event
> 
> On 06/12/2015 10:56 AM, Douglas Fuller wrote:
> > @@ -2533,8 +2548,10 @@ static void handle_watch_notify(struct
> > ceph_osd_client *osdc,
> >  	if (msg->hdr.version >= 2)
> >  		ceph_decode_32_safe(&p, end, return_code, bad);
> >  
> > -	if (msg->hdr.version >= 3)
> > +	if (msg->hdr.version >= 3) {
> >  		ceph_decode_32_safe(&p, end, notifier_gid, bad);
> > +		data = list_first_entry(&msg->data, struct ceph_msg_data, links);
> 
> It's not completely clear how/who can use this data. Would rbd be
> calling ceph_osdc_create_notify_event/ceph_osdc_create_notify_event, or
> is some libceph code (net/ceph)?

rbd would be calling ceph_osdc_create_notify_event and then ceph_osdc_wait_event (which
waits for CEPH_WATCH_EVENT_NOTIFY_COMPLETE).

> If it's rbd, is it supposed to be digging into ceph_msg_data structs?
> Did we want to pass it a pagelist or CEPH_OSD_DATA_TYPE_PAGES type of
> pages array?

Yeah, it would be cleaner to just copy the pages pointer and size. I'll change that.

Either way, the decoding gets hairy and cumbersome. I think we should
extend osd_client to have one or two convenience routines like
ceph_osdc_for_each_notifier or something like that.

^ permalink raw reply	[flat|nested] 18+ messages in thread

* Re: [PATCH 1/6] ceph/rbd: add support for watch notify payloads
  2015-06-12 15:56 ` [PATCH 1/6] ceph/rbd: add support for watch notify payloads Douglas Fuller
@ 2015-06-16 21:22   ` Josh Durgin
  0 siblings, 0 replies; 18+ messages in thread
From: Josh Durgin @ 2015-06-16 21:22 UTC (permalink / raw)
  To: Douglas Fuller, ceph-devel

On 06/12/2015 08:56 AM, Douglas Fuller wrote:
> From: Mike Christie <michaelc@cs.wisc.edu>
>
> This patch adds support for proto version 1 of watch-notify,
> so drivers like rbd can be sent a buffer with information like
> the notify operation being performed.
>
> Signed-off-by: Mike Christie <michaelc@cs.wisc.edu>

Reviewed-by: Josh Durgin <jdurgin@redhat.com>

> ---
>   drivers/block/rbd.c             |  3 ++-
>   include/linux/ceph/osd_client.h |  7 +++++--
>   net/ceph/osd_client.c           | 21 ++++++++++++++++-----
>   3 files changed, 23 insertions(+), 8 deletions(-)
>
> diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
> index 89fe8a4..4b9ba9f 100644
> --- a/drivers/block/rbd.c
> +++ b/drivers/block/rbd.c
> @@ -3103,7 +3103,8 @@ out:
>   	return ret;
>   }
>
> -static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
> +static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data,
> +			 void *payload, int payload_len)
>   {
>   	struct rbd_device *rbd_dev = (struct rbd_device *)data;
>   	int ret;
> diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h
> index 7506b48..eab96b5 100644
> --- a/include/linux/ceph/osd_client.h
> +++ b/include/linux/ceph/osd_client.h
> @@ -184,7 +184,7 @@ struct ceph_osd_event {
>   	u64 cookie;
>   	int one_shot;
>   	struct ceph_osd_client *osdc;
> -	void (*cb)(u64, u64, u8, void *);
> +	void (*cb)(u64, u64, u8, void *, void *, int);
>   	void *data;
>   	struct rb_node node;
>   	struct list_head osd_node;
> @@ -197,6 +197,8 @@ struct ceph_osd_event_work {
>           u64 ver;
>           u64 notify_id;
>           u8 opcode;
> +	void *payload;
> +	int payload_len;
>   };
>
>   struct ceph_osd_client {
> @@ -369,7 +371,8 @@ extern int ceph_osdc_writepages(struct ceph_osd_client *osdc,
>
>   /* watch/notify events */
>   extern int ceph_osdc_create_event(struct ceph_osd_client *osdc,
> -				  void (*event_cb)(u64, u64, u8, void *),
> +				  void (*event_cb)(u64, u64, u8, void *, void *,
> +						   int),
>   				  void *data, struct ceph_osd_event **pevent);
>   extern void ceph_osdc_cancel_event(struct ceph_osd_event *event);
>   extern void ceph_osdc_put_event(struct ceph_osd_event *event);
> diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
> index 5003367..aa1c5c46 100644
> --- a/net/ceph/osd_client.c
> +++ b/net/ceph/osd_client.c
> @@ -2277,7 +2277,7 @@ static void __remove_event(struct ceph_osd_event *event)
>   }
>
>   int ceph_osdc_create_event(struct ceph_osd_client *osdc,
> -			   void (*event_cb)(u64, u64, u8, void *),
> +			   void (*event_cb)(u64, u64, u8, void *, void *, int),
>   			   void *data, struct ceph_osd_event **pevent)
>   {
>   	struct ceph_osd_event *event;
> @@ -2329,7 +2329,8 @@ static void do_event_work(struct work_struct *work)
>   	u8 opcode = event_work->opcode;
>
>   	dout("do_event_work completing %p\n", event);
> -	event->cb(ver, notify_id, opcode, event->data);
> +	event->cb(ver, notify_id, opcode, event->data, event_work->payload,
> +		  event_work->payload_len);
>   	dout("do_event_work completed %p\n", event);
>   	ceph_osdc_put_event(event);
>   	kfree(event_work);
> @@ -2342,10 +2343,11 @@ static void do_event_work(struct work_struct *work)
>   static void handle_watch_notify(struct ceph_osd_client *osdc,
>   				struct ceph_msg *msg)
>   {
> -	void *p, *end;
> +	void *p, *end, *payload = NULL;
>   	u8 proto_ver;
>   	u64 cookie, ver, notify_id;
>   	u8 opcode;
> +	u32 payload_len = 0;
>   	struct ceph_osd_event *event;
>   	struct ceph_osd_event_work *event_work;
>
> @@ -2358,6 +2360,13 @@ static void handle_watch_notify(struct ceph_osd_client *osdc,
>   	ceph_decode_64_safe(&p, end, ver, bad);
>   	ceph_decode_64_safe(&p, end, notify_id, bad);
>
> +	if (proto_ver >= 1) {
> +		ceph_decode_32_safe(&p, end, payload_len, bad);
> +		if (end - p < payload_len)
> +			goto bad;
> +		payload = p;
> +	}
> +
>   	spin_lock(&osdc->event_lock);
>   	event = __find_event(osdc, cookie);
>   	if (event) {
> @@ -2365,8 +2374,8 @@ static void handle_watch_notify(struct ceph_osd_client *osdc,
>   		get_event(event);
>   	}
>   	spin_unlock(&osdc->event_lock);
> -	dout("handle_watch_notify cookie %lld ver %lld event %p\n",
> -	     cookie, ver, event);
> +	dout("handle_watch_notify cookie %lld ver %lld event %p notify id %llu payload len %u\n",
> +	     cookie, ver, event, notify_id, payload_len);
>   	if (event) {
>   		event_work = kmalloc(sizeof(*event_work), GFP_NOIO);
>   		if (!event_work) {
> @@ -2379,6 +2388,8 @@ static void handle_watch_notify(struct ceph_osd_client *osdc,
>   		event_work->ver = ver;
>   		event_work->notify_id = notify_id;
>   		event_work->opcode = opcode;
> +		event_work->payload = payload;
> +		event_work->payload_len = payload_len;
>
>   		queue_work(osdc->notify_wq, &event_work->work);
>   	}
>


^ permalink raw reply	[flat|nested] 18+ messages in thread

* Re: [PATCH 2/6] ceph/rbd: add support for header version 2 and 3
  2015-06-12 15:56 ` [PATCH 2/6] ceph/rbd: add support for header version 2 and 3 Douglas Fuller
@ 2015-06-16 21:23   ` Josh Durgin
  0 siblings, 0 replies; 18+ messages in thread
From: Josh Durgin @ 2015-06-16 21:23 UTC (permalink / raw)
  To: Douglas Fuller, ceph-devel

On 06/12/2015 08:56 AM, Douglas Fuller wrote:
> From: Mike Christie <michaelc@cs.wisc.edu>
>
> This adds support watch-notify header 2 and 3 support, so we can
> get a return_code from those operations.
>
> Signed-off-by: Mike Christie <michaelc@cs.wisc.edu>
> ---
>   drivers/block/rbd.c             |  5 +++--
>   include/linux/ceph/osd_client.h | 10 ++++++----
>   net/ceph/osd_client.c           | 25 +++++++++++++++++++------
>   3 files changed, 28 insertions(+), 12 deletions(-)
>
> diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
> index 4b9ba9f..65421eb 100644
> --- a/drivers/block/rbd.c
> +++ b/drivers/block/rbd.c
> @@ -3103,8 +3103,9 @@ out:
>   	return ret;
>   }
>
> -static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data,
> -			 void *payload, int payload_len)
> +static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, s32 return_code,
> +			 u64 notifier_gid, void *data, void *payload,
> +			 u32 payload_len)
>   {
>   	struct rbd_device *rbd_dev = (struct rbd_device *)data;
>   	int ret;
> diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h
> index eab96b5..1c4e472 100644
> --- a/include/linux/ceph/osd_client.h
> +++ b/include/linux/ceph/osd_client.h
> @@ -184,7 +184,7 @@ struct ceph_osd_event {
>   	u64 cookie;
>   	int one_shot;
>   	struct ceph_osd_client *osdc;
> -	void (*cb)(u64, u64, u8, void *, void *, int);
> +	void (*cb)(u64, u64, u8, s32, u64, void *, void *, u32);
>   	void *data;
>   	struct rb_node node;
>   	struct list_head osd_node;
> @@ -197,8 +197,10 @@ struct ceph_osd_event_work {
>           u64 ver;
>           u64 notify_id;
>           u8 opcode;
> +	s32 return_code;
> +	u64 notifier_gid;
>   	void *payload;
> -	int payload_len;
> +	u32 payload_len;
>   };
>
>   struct ceph_osd_client {
> @@ -371,8 +373,8 @@ extern int ceph_osdc_writepages(struct ceph_osd_client *osdc,
>
>   /* watch/notify events */
>   extern int ceph_osdc_create_event(struct ceph_osd_client *osdc,
> -				  void (*event_cb)(u64, u64, u8, void *, void *,
> -						   int),
> +				  void (*event_cb)(u64, u64, u8, s32, u64,
> +						   void *, void *, u32),
>   				  void *data, struct ceph_osd_event **pevent);
>   extern void ceph_osdc_cancel_event(struct ceph_osd_event *event);
>   extern void ceph_osdc_put_event(struct ceph_osd_event *event);
> diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
> index aa1c5c46..590cf9c 100644
> --- a/net/ceph/osd_client.c
> +++ b/net/ceph/osd_client.c
> @@ -2277,7 +2277,8 @@ static void __remove_event(struct ceph_osd_event *event)
>   }
>
>   int ceph_osdc_create_event(struct ceph_osd_client *osdc,
> -			   void (*event_cb)(u64, u64, u8, void *, void *, int),
> +			   void (*event_cb)(u64, u64, u8, s32, u64, void *,
> +					    void *, u32),
>   			   void *data, struct ceph_osd_event **pevent)
>   {
>   	struct ceph_osd_event *event;
> @@ -2327,10 +2328,12 @@ static void do_event_work(struct work_struct *work)
>   	u64 ver = event_work->ver;
>   	u64 notify_id = event_work->notify_id;
>   	u8 opcode = event_work->opcode;
> +	s32 return_code = event_work->return_code;
> +	u64 notifier_gid = event_work->notifier_gid;
>
>   	dout("do_event_work completing %p\n", event);
> -	event->cb(ver, notify_id, opcode, event->data, event_work->payload,
> -		  event_work->payload_len);
> +	event->cb(ver, notify_id, opcode, return_code, notifier_gid,
> +		  event->data, event_work->payload, event_work->payload_len);
>   	dout("do_event_work completed %p\n", event);
>   	ceph_osdc_put_event(event);
>   	kfree(event_work);
> @@ -2345,9 +2348,10 @@ static void handle_watch_notify(struct ceph_osd_client *osdc,
>   {
>   	void *p, *end, *payload = NULL;
>   	u8 proto_ver;
> -	u64 cookie, ver, notify_id;
> +	u64 cookie, ver, notify_id, notifier_gid = 0;
>   	u8 opcode;
>   	u32 payload_len = 0;
> +	s32 return_code = 0;
>   	struct ceph_osd_event *event;
>   	struct ceph_osd_event_work *event_work;
>
> @@ -2365,8 +2369,15 @@ static void handle_watch_notify(struct ceph_osd_client *osdc,
>   		if (end - p < payload_len)
>   			goto bad;
>   		payload = p;
> +		p += payload_len;
>   	}
>
> +	if (msg->hdr.version >= 2)
> +		ceph_decode_32_safe(&p, end, return_code, bad);
> +
> +	if (msg->hdr.version >= 3)
> +		ceph_decode_32_safe(&p, end, notifier_gid, bad);
> +

This should be ceph_decode_64_safe. With that fixed,

Reviewed-by: Josh Durgin <jdurgin@redhat.com>

>   	spin_lock(&osdc->event_lock);
>   	event = __find_event(osdc, cookie);
>   	if (event) {
> @@ -2374,8 +2385,8 @@ static void handle_watch_notify(struct ceph_osd_client *osdc,
>   		get_event(event);
>   	}
>   	spin_unlock(&osdc->event_lock);
> -	dout("handle_watch_notify cookie %lld ver %lld event %p notify id %llu payload len %u\n",
> -	     cookie, ver, event, notify_id, payload_len);
> +	dout("handle_watch_notify cookie %lld ver %lld event %p notify id %llu payload len %u return code %d notifier gid %llu\n",
> +	     cookie, ver, event, notify_id, payload_len, return_code, notifier_gid);
>   	if (event) {
>   		event_work = kmalloc(sizeof(*event_work), GFP_NOIO);
>   		if (!event_work) {
> @@ -2388,6 +2399,8 @@ static void handle_watch_notify(struct ceph_osd_client *osdc,
>   		event_work->ver = ver;
>   		event_work->notify_id = notify_id;
>   		event_work->opcode = opcode;
> +		event_work->return_code = return_code;
> +		event_work->notifier_gid = notifier_gid;
>   		event_work->payload = payload;
>   		event_work->payload_len = payload_len;
>
>


^ permalink raw reply	[flat|nested] 18+ messages in thread

* Re: [PATCH 3/6] ceph/rbd: update watch-notify ceph_osd_op
  2015-06-12 15:56 ` [PATCH 3/6] ceph/rbd: update watch-notify ceph_osd_op Douglas Fuller
@ 2015-06-16 22:00   ` Josh Durgin
  0 siblings, 0 replies; 18+ messages in thread
From: Josh Durgin @ 2015-06-16 22:00 UTC (permalink / raw)
  To: Douglas Fuller, ceph-devel

On 06/12/2015 08:56 AM, Douglas Fuller wrote:
> From: Mike Christie <michaelc@cs.wisc.edu>
>
> This syncs the ceph_osd_op struct with the current version of ceph
> where the watch struct has been updated to support more ops and
> the notify-ack support has been broken out of the watch struct.
>
> Ceph commits
> 1a82cc3926fc7bc4cfbdd2fd4dfee8660d5107a1
> 2288f318e1b1f6a1c42b185fc1b4c41f23995247
> 73720130c34424bf1fe36058ebe8da66976f40fb
>
> It still has us use the legacy watch op for now. I will add support
> later. It is mostly a prepartion patch for more advanced notify support.
>
> Questions:
>
> 1. Should linger also be set for CEPH_OSD_WATCH_OP_RECONNECT?
> 2. Not sure what watch.gen is used for. Is that for our internal
> use or does the osd do something with it.
>
> djf: removed changes to rbd.c for SCSI
>
> Signed-off-by: Mike Christie <michaelc@cs.wisc.edu>
> ---
>   drivers/block/rbd.c             | 19 +++++++-----
>   include/linux/ceph/osd_client.h | 23 +++++++++++----
>   include/linux/ceph/rados.h      | 24 +++++++++++++--
>   net/ceph/osd_client.c           | 65 ++++++++++++++++++++++++++++++++++++-----
>   4 files changed, 109 insertions(+), 22 deletions(-)
>
> diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
> index 65421eb..ed170b1 100644
> --- a/drivers/block/rbd.c
> +++ b/drivers/block/rbd.c
> @@ -3089,8 +3089,8 @@ static int rbd_obj_notify_ack_sync(struct rbd_device *rbd_dev, u64 notify_id)
>   	if (!obj_request->osd_req)
>   		goto out;
>
> -	osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
> -					notify_id, 0, 0);
> +	osd_req_op_watch_init(obj_request->osd_req, 0,
> +			      CEPH_OSD_OP_NOTIFY_ACK, 0, notify_id);
>   	rbd_osd_req_format_read(obj_request);
>
>   	ret = rbd_obj_request_submit(osdc, obj_request);
> @@ -3138,7 +3138,7 @@ static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, s32 return_code,
>    */
>   static struct rbd_obj_request *rbd_obj_watch_request_helper(
>   						struct rbd_device *rbd_dev,
> -						bool watch)
> +						u8 watch_opcode)
>   {
>   	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
>   	struct ceph_options *opts = osdc->client->options;
> @@ -3158,10 +3158,11 @@ static struct rbd_obj_request *rbd_obj_watch_request_helper(
>   	}
>
>   	osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
> -			      rbd_dev->watch_event->cookie, 0, watch);
> +			      watch_opcode, rbd_dev->watch_event->cookie);
>   	rbd_osd_req_format_write(obj_request);
>
> -	if (watch)
> +	if (watch_opcode == CEPH_OSD_WATCH_OP_LEGACY_WATCH ||
> +	    watch_opcode == CEPH_OSD_WATCH_OP_WATCH)
>   		ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
>
>   	ret = rbd_obj_request_submit(osdc, obj_request);
> @@ -3174,7 +3175,7 @@ static struct rbd_obj_request *rbd_obj_watch_request_helper(
>
>   	ret = obj_request->result;
>   	if (ret) {
> -		if (watch)
> +		if (watch_opcode != CEPH_OSD_WATCH_OP_UNWATCH)
>   			rbd_obj_request_end(obj_request);
>   		goto out;
>   	}
> @@ -3203,7 +3204,8 @@ static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev)
>   	if (ret < 0)
>   		return ret;
>
> -	obj_request = rbd_obj_watch_request_helper(rbd_dev, true);
> +	obj_request = rbd_obj_watch_request_helper(rbd_dev,
> +						CEPH_OSD_WATCH_OP_LEGACY_WATCH);
>   	if (IS_ERR(obj_request)) {
>   		ceph_osdc_cancel_event(rbd_dev->watch_event);
>   		rbd_dev->watch_event = NULL;
> @@ -3237,7 +3239,8 @@ static void rbd_dev_header_unwatch_sync(struct rbd_device *rbd_dev)
>   	rbd_obj_request_put(rbd_dev->watch_request);
>   	rbd_dev->watch_request = NULL;
>
> -	obj_request = rbd_obj_watch_request_helper(rbd_dev, false);
> +	obj_request = rbd_obj_watch_request_helper(rbd_dev,
> +						   CEPH_OSD_WATCH_OP_UNWATCH);
>   	if (!IS_ERR(obj_request))
>   		rbd_obj_request_put(obj_request);
>   	else
> diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h
> index 1c4e472..12732d3 100644
> --- a/include/linux/ceph/osd_client.h
> +++ b/include/linux/ceph/osd_client.h
> @@ -106,11 +106,15 @@ struct ceph_osd_req_op {
>   		struct {
>   			u64 cookie;
>   			u64 ver;
> -			u32 prot_ver;
> -			u32 timeout;
> -			__u8 flag;
> +			__u8 op;
> +			u32 gen;
>   		} watch;
>   		struct {
> +			u64 cookie;
> +			struct ceph_osd_data request_data;
> +			struct ceph_osd_data response_data;
> +		} notify;
> +		struct {
>   			u64 expected_object_size;
>   			u64 expected_write_size;
>   		} alloc_hint;
> @@ -302,7 +306,16 @@ extern void osd_req_op_cls_response_data_pages(struct ceph_osd_request *,
>   					struct page **pages, u64 length,
>   					u32 alignment, bool pages_from_pool,
>   					bool own_pages);
> -
> +extern void osd_req_op_notify_request_data_pagelist(struct ceph_osd_request *,
> +					unsigned int which,
> +					struct ceph_pagelist *pagelist);
> +extern void osd_req_op_notify_response_data_pages(struct ceph_osd_request *,
> +					unsigned int which,
> +					struct page **pages, u64 length,
> +					u32 alignment, bool pages_from_pool,
> +					bool own_pages);
> +extern void osd_req_op_notify_init(struct ceph_osd_request *osd_req,
> +				   unsigned int which, u16 opcode, u64 cookie);
>   extern void osd_req_op_cls_init(struct ceph_osd_request *osd_req,
>   					unsigned int which, u16 opcode,
>   					const char *class, const char *method);
> @@ -311,7 +324,7 @@ extern int osd_req_op_xattr_init(struct ceph_osd_request *osd_req, unsigned int
>   				 size_t size, u8 cmp_op, u8 cmp_mode);
>   extern void osd_req_op_watch_init(struct ceph_osd_request *osd_req,
>   					unsigned int which, u16 opcode,
> -					u64 cookie, u64 version, int flag);
> +					u8 watch_opcode, u64 cookie);
>   extern void osd_req_op_alloc_hint_init(struct ceph_osd_request *osd_req,
>   				       unsigned int which,
>   				       u64 expected_object_size,
> diff --git a/include/linux/ceph/rados.h b/include/linux/ceph/rados.h
> index 2f822dc..cae82b36 100644
> --- a/include/linux/ceph/rados.h
> +++ b/include/linux/ceph/rados.h
> @@ -417,6 +417,22 @@ enum {
>
>   #define RADOS_NOTIFY_VER	1
>
> +enum {
> +	CEPH_OSD_WATCH_OP_UNWATCH = 0,
> +	CEPH_OSD_WATCH_OP_LEGACY_WATCH = 1,
> +	/* note: use only ODD ids to prevent pre-giant code from
> +	 * interpreting the op as UNWATCH */
> +	CEPH_OSD_WATCH_OP_WATCH = 3,
> +	CEPH_OSD_WATCH_OP_RECONNECT = 5,
> +	CEPH_OSD_WATCH_OP_PING = 7,
> +};
> +
> +enum {
> +	CEPH_WATCH_EVENT_NOTIFY			= 1, /* notifying watcher */
> +	CEPH_WATCH_EVENT_NOTIFY_COMPLETE	= 2, /* notifier notified when done */
> +	CEPH_WATCH_EVENT_DISCONNECT		= 3, /* we were disconnected */
> +};

CEPH_WATCH_EVENT_* live in ceph_fs.h already. No need to add it here.
These are meant to stay in sync with the same headers in userspace, so
I'd rather reorganize things there first if you want to clean up things
that should be in rados.h rather than ceph_fs.h. With that dropped,

Reviewed-by: Josh Durgin <jdurgin@redhat.com>

>   /*
>    * an individual object operation.  each may be accompanied by some data
>    * payload
> @@ -450,10 +466,14 @@ struct ceph_osd_op {
>   	        } __attribute__ ((packed)) snap;
>   		struct {
>   			__le64 cookie;
> -			__le64 ver;
> -			__u8 flag;	/* 0 = unwatch, 1 = watch */
> +			__le64 ver;	/* no longer used */
> +			__u8 op;	/* CEPH_OSD_WATCH_OP_* */
> +			__u32 gen;	/* registration generation */
>   		} __attribute__ ((packed)) watch;
>   		struct {
> +			__le64 cookie;
> +		} __attribute__ ((packed)) notify;
> +		struct {
>   			__le64 offset, length;
>   			__le64 src_offset;
>   		} __attribute__ ((packed)) clonerange;
> diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
> index 590cf9c..74650e1 100644
> --- a/net/ceph/osd_client.c
> +++ b/net/ceph/osd_client.c
> @@ -243,6 +243,29 @@ void osd_req_op_cls_response_data_pages(struct ceph_osd_request *osd_req,
>   }
>   EXPORT_SYMBOL(osd_req_op_cls_response_data_pages);
>
> +void osd_req_op_notify_response_data_pages(struct ceph_osd_request *osd_req,
> +			unsigned int which, struct page **pages, u64 length,
> +			u32 alignment, bool pages_from_pool, bool own_pages)
> +{
> +	struct ceph_osd_data *osd_data;
> +
> +	osd_data = osd_req_op_data(osd_req, which, notify, response_data);
> +	ceph_osd_data_pages_init(osd_data, pages, length, alignment,
> +				pages_from_pool, own_pages);
> +}
> +EXPORT_SYMBOL(osd_req_op_notify_response_data_pages);
> +
> +void osd_req_op_notify_request_data_pagelist(
> +			struct ceph_osd_request *osd_req,
> +			unsigned int which, struct ceph_pagelist *pagelist)
> +{
> +	struct ceph_osd_data *osd_data;
> +
> +	osd_data = osd_req_op_data(osd_req, which, notify, request_data);
> +	ceph_osd_data_pagelist_init(osd_data, pagelist);
> +}
> +EXPORT_SYMBOL(osd_req_op_notify_request_data_pagelist);
> +
>   static u64 ceph_osd_data_length(struct ceph_osd_data *osd_data)
>   {
>   	switch (osd_data->type) {
> @@ -292,6 +315,10 @@ static void osd_req_op_data_release(struct ceph_osd_request *osd_req,
>   		ceph_osd_data_release(&op->cls.request_data);
>   		ceph_osd_data_release(&op->cls.response_data);
>   		break;
> +	case CEPH_OSD_OP_NOTIFY:
> +		ceph_osd_data_release(&op->notify.request_data);
> +		ceph_osd_data_release(&op->notify.response_data);
> +		break;
>   	case CEPH_OSD_OP_SETXATTR:
>   	case CEPH_OSD_OP_CMPXATTR:
>   		ceph_osd_data_release(&op->xattr.osd_data);
> @@ -588,9 +615,18 @@ int osd_req_op_xattr_init(struct ceph_osd_request *osd_req, unsigned int which,
>   }
>   EXPORT_SYMBOL(osd_req_op_xattr_init);
>
> -void osd_req_op_watch_init(struct ceph_osd_request *osd_req,
> -				unsigned int which, u16 opcode,
> -				u64 cookie, u64 version, int flag)
> +void osd_req_op_notify_init(struct ceph_osd_request *osd_req, unsigned int which,
> +			    u16 opcode, u64 cookie)
> +{
> +	struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, opcode);
> +
> +	BUG_ON(opcode != CEPH_OSD_OP_NOTIFY);
> +	op->watch.cookie = cookie;
> +}
> +EXPORT_SYMBOL(osd_req_op_notify_init);
> +
> +void osd_req_op_watch_init(struct ceph_osd_request *osd_req, unsigned int which,
> +			   u16 opcode, u8 watch_opcode, u64 cookie)
>   {
>   	struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which,
>   						      opcode, 0);
> @@ -598,9 +634,9 @@ void osd_req_op_watch_init(struct ceph_osd_request *osd_req,
>   	BUG_ON(opcode != CEPH_OSD_OP_NOTIFY_ACK && opcode != CEPH_OSD_OP_WATCH);
>
>   	op->watch.cookie = cookie;
> -	op->watch.ver = version;
> -	if (opcode == CEPH_OSD_OP_WATCH && flag)
> -		op->watch.flag = (u8)1;
> +	op->watch.ver = 0;
> +	op->watch.op = watch_opcode;
> +	op->watch.gen = 0;
>   }
>   EXPORT_SYMBOL(osd_req_op_watch_init);
>
> @@ -708,11 +744,26 @@ static u64 osd_req_encode_op(struct ceph_osd_request *req,
>   		break;
>   	case CEPH_OSD_OP_STARTSYNC:
>   		break;
> +	case CEPH_OSD_OP_NOTIFY:
> +		dst->notify.cookie = cpu_to_le64(src->notify.cookie);
> +
> +		osd_data = &src->notify.request_data;
> +		data_length = ceph_osd_data_length(osd_data);
> +		if (data_length) {
> +			BUG_ON(osd_data->type == CEPH_OSD_DATA_TYPE_NONE);
> +			ceph_osdc_msg_data_add(req->r_request, osd_data);
> +			src->payload_len += data_length;
> +			request_data_len += data_length;
> +		}
> +		osd_data = &src->notify.response_data;
> +		ceph_osdc_msg_data_add(req->r_reply, osd_data);
> +		break;
>   	case CEPH_OSD_OP_NOTIFY_ACK:
>   	case CEPH_OSD_OP_WATCH:
>   		dst->watch.cookie = cpu_to_le64(src->watch.cookie);
>   		dst->watch.ver = cpu_to_le64(src->watch.ver);
> -		dst->watch.flag = src->watch.flag;
> +		dst->watch.op = src->watch.op;
> +		dst->watch.gen = cpu_to_le32(src->watch.gen);
>   		break;
>   	case CEPH_OSD_OP_SETALLOCHINT:
>   		dst->alloc_hint.expected_object_size =
>


^ permalink raw reply	[flat|nested] 18+ messages in thread

* Re: [PATCH 4/6] osd_client, rbd: update event interface for watch/notify2
  2015-06-12 15:56 ` [PATCH 4/6] osd_client, rbd: update event interface for watch/notify2 Douglas Fuller
  2015-06-16 14:58   ` Mike Christie
@ 2015-06-16 23:18   ` Josh Durgin
  2015-06-17 13:28     ` Douglas Fuller
  1 sibling, 1 reply; 18+ messages in thread
From: Josh Durgin @ 2015-06-16 23:18 UTC (permalink / raw)
  To: Douglas Fuller, ceph-devel

On 06/12/2015 08:56 AM, Douglas Fuller wrote:
> Change unused ceph_osd_event structure to refer to pending watch/notify2
> messages. Watch events include the separate watch and watch error callbacks
> used for watch/notify2. Update rbd to use separate watch and watch error
> callbacks via the new watch event.
>
> Signed-off-by: Douglas Fuller <dfuller@redhat.com>
> ---
>   drivers/block/rbd.c             |  41 +++++++---
>   include/linux/ceph/osd_client.h |  27 +++++--
>   net/ceph/osd_client.c           | 175 +++++++++++++++++++++++++++++-----------
>   3 files changed, 179 insertions(+), 64 deletions(-)
>
> diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
> index ed170b1..20b3b23 100644
> --- a/drivers/block/rbd.c
> +++ b/drivers/block/rbd.c
> @@ -427,6 +427,8 @@ static ssize_t rbd_remove_single_major(struct bus_type *bus, const char *buf,
>   				       size_t count);
>   static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping);
>   static void rbd_spec_put(struct rbd_spec *spec);
> +static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev);
> +static void rbd_dev_header_unwatch_sync(struct rbd_device *rbd_dev);
>
>   static int rbd_dev_id_to_minor(int dev_id)
>   {
> @@ -3103,19 +3105,17 @@ out:
>   	return ret;
>   }
>
> -static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, s32 return_code,
> -			 u64 notifier_gid, void *data, void *payload,
> -			 u32 payload_len)
> +static void rbd_watch_cb(void *arg, u64 notify_id, u64 cookie, u64 notifier_id,
> +                        void *data, size_t data_len)
>   {
> -	struct rbd_device *rbd_dev = (struct rbd_device *)data;
> +	struct rbd_device *rbd_dev = (struct rbd_device *)arg;
>   	int ret;
>
>   	if (!rbd_dev)
>   		return;
>
> -	dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
> -		rbd_dev->header_name, (unsigned long long)notify_id,
> -		(unsigned int)opcode);
> +	dout("%s: \"%s\" notify_id %llu bl len %lu\n", __func__,
> +	    rbd_dev->header_name, (unsigned long long)notify_id, data_len);
>
>   	/*
>   	 * Until adequate refresh error handling is in place, there is
> @@ -3132,6 +3132,26 @@ static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, s32 return_code,
>   		rbd_warn(rbd_dev, "notify_ack ret %d", ret);
>   }
>
> +static void rbd_watch_error_cb(void *arg, u64 cookie, int err)
> +{
> +	struct rbd_device *rbd_dev = (struct rbd_device *)arg;
> +	int ret;
> +
> +	dout("%s: watch error %d on cookie %llu\n", rbd_dev->header_name,
> +		err, cookie);
> +	rbd_warn(rbd_dev, "%s: watch error %d on cookie %llu\n",
> +	         rbd_dev->header_name, err, cookie);
> +
> +	/* reset watch */
> +	rbd_dev_refresh(rbd_dev);
> +	rbd_dev_header_unwatch_sync(rbd_dev);
> +	ret = rbd_dev_header_watch_sync(rbd_dev);
> +	BUG_ON(ret); /* XXX: was the image deleted? can we be more graceful? */
> +	rbd_dev_refresh(rbd_dev);

Why refresh before and after unwatching? Only the second one seems
necessary.


^ permalink raw reply	[flat|nested] 18+ messages in thread

* Re: [PATCH 4/6] osd_client, rbd: update event interface for watch/notify2
  2015-06-16 23:18   ` Josh Durgin
@ 2015-06-17 13:28     ` Douglas Fuller
  2015-06-17 13:41       ` Ilya Dryomov
  0 siblings, 1 reply; 18+ messages in thread
From: Douglas Fuller @ 2015-06-17 13:28 UTC (permalink / raw)
  To: Josh Durgin; +Cc: ceph-devel


> On Jun 16, 2015, at 7:18 PM, Josh Durgin <jdurgin@redhat.com> wrote:
> 
> On 06/12/2015 08:56 AM, Douglas Fuller wrote:
>> 
>> @@ -3132,6 +3132,26 @@ static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, s32 return_code,
>>  		rbd_warn(rbd_dev, "notify_ack ret %d", ret);
>>  }
>> 
>> +static void rbd_watch_error_cb(void *arg, u64 cookie, int err)
>> +{
>> +	struct rbd_device *rbd_dev = (struct rbd_device *)arg;
>> +	int ret;
>> +
>> +	dout("%s: watch error %d on cookie %llu\n", rbd_dev->header_name,
>> +		err, cookie);
>> +	rbd_warn(rbd_dev, "%s: watch error %d on cookie %llu\n",
>> +	         rbd_dev->header_name, err, cookie);
>> +
>> +	/* reset watch */
>> +	rbd_dev_refresh(rbd_dev);
>> +	rbd_dev_header_unwatch_sync(rbd_dev);
>> +	ret = rbd_dev_header_watch_sync(rbd_dev);
>> +	BUG_ON(ret); /* XXX: was the image deleted? can we be more graceful? */
>> +	rbd_dev_refresh(rbd_dev);
> 
> Why refresh before and after unwatching? Only the second one seems
> necessary.

The first one isn’t strictly necessary; I can remove it if you want.

If we get a watch error, we may very well have a situation in which we need to stop I/O to the device because the underlying image has been deleted or its features have changed. We don’t actually do that yet (we just print a warning message), but the extra refresh was to handle that case early, even before we bothered trying to re-establish the watch.--
To unsubscribe from this list: send the line "unsubscribe ceph-devel" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html

^ permalink raw reply	[flat|nested] 18+ messages in thread

* Re: [PATCH 4/6] osd_client, rbd: update event interface for watch/notify2
  2015-06-17 13:28     ` Douglas Fuller
@ 2015-06-17 13:41       ` Ilya Dryomov
  0 siblings, 0 replies; 18+ messages in thread
From: Ilya Dryomov @ 2015-06-17 13:41 UTC (permalink / raw)
  To: Douglas Fuller; +Cc: Josh Durgin, Ceph Development

On Wed, Jun 17, 2015 at 4:28 PM, Douglas Fuller <dfuller@redhat.com> wrote:
>
>> On Jun 16, 2015, at 7:18 PM, Josh Durgin <jdurgin@redhat.com> wrote:
>>
>> On 06/12/2015 08:56 AM, Douglas Fuller wrote:
>>>
>>> @@ -3132,6 +3132,26 @@ static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, s32 return_code,
>>>              rbd_warn(rbd_dev, "notify_ack ret %d", ret);
>>>  }
>>>
>>> +static void rbd_watch_error_cb(void *arg, u64 cookie, int err)
>>> +{
>>> +    struct rbd_device *rbd_dev = (struct rbd_device *)arg;
>>> +    int ret;
>>> +
>>> +    dout("%s: watch error %d on cookie %llu\n", rbd_dev->header_name,
>>> +            err, cookie);
>>> +    rbd_warn(rbd_dev, "%s: watch error %d on cookie %llu\n",
>>> +             rbd_dev->header_name, err, cookie);
>>> +
>>> +    /* reset watch */
>>> +    rbd_dev_refresh(rbd_dev);
>>> +    rbd_dev_header_unwatch_sync(rbd_dev);
>>> +    ret = rbd_dev_header_watch_sync(rbd_dev);
>>> +    BUG_ON(ret); /* XXX: was the image deleted? can we be more graceful? */
>>> +    rbd_dev_refresh(rbd_dev);
>>
>> Why refresh before and after unwatching? Only the second one seems
>> necessary.
>
> The first one isn’t strictly necessary; I can remove it if you want.
>
> If we get a watch error, we may very well have a situation in which we need to stop I/O to the device because the underlying image has been deleted or its features have changed. We don’t actually do that yet (we just print a warning message), but the extra refresh was to handle that case early, even before we bothered trying to re-establish the watch.--

We should remove it, for consistency if nothing else.  Also, when you
are mapping image, it's rbd_dev_header_watch_sync() that fails if the
header object doesn't exist and such.  So I'd rather it failing in the
same place if an image got deleted from under a client or something
else went wrong instead of keeping in mind that it's the get_size (or
whatever) method that is called first on refresh and expect failures
there.

Thanks,

                Ilya
--
To unsubscribe from this list: send the line "unsubscribe ceph-devel" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html

^ permalink raw reply	[flat|nested] 18+ messages in thread

end of thread, other threads:[~2015-06-17 13:41 UTC | newest]

Thread overview: 18+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2015-06-12 15:56 [PATCH 0/6] support watch/notify version 2 Douglas Fuller
2015-06-12 15:56 ` [PATCH 1/6] ceph/rbd: add support for watch notify payloads Douglas Fuller
2015-06-16 21:22   ` Josh Durgin
2015-06-12 15:56 ` [PATCH 2/6] ceph/rbd: add support for header version 2 and 3 Douglas Fuller
2015-06-16 21:23   ` Josh Durgin
2015-06-12 15:56 ` [PATCH 3/6] ceph/rbd: update watch-notify ceph_osd_op Douglas Fuller
2015-06-16 22:00   ` Josh Durgin
2015-06-12 15:56 ` [PATCH 4/6] osd_client, rbd: update event interface for watch/notify2 Douglas Fuller
2015-06-16 14:58   ` Mike Christie
2015-06-16 17:05     ` Douglas Fuller
2015-06-16 23:18   ` Josh Durgin
2015-06-17 13:28     ` Douglas Fuller
2015-06-17 13:41       ` Ilya Dryomov
2015-06-12 15:56 ` [PATCH 5/6] osd_client: add support for notify payloads via notify event Douglas Fuller
2015-06-16 15:26   ` Mike Christie
2015-06-16 17:22     ` Douglas Fuller
2015-06-12 15:56 ` [PATCH 6/6] osd_client: send watch ping messages Douglas Fuller
2015-06-16 15:07   ` Mike Christie

This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.