All of lore.kernel.org
 help / color / mirror / Atom feed
From: Douglas Fuller <dfuller@redhat.com>
To: ceph-devel@vger.kernel.org
Subject: [PATCH 4/6] osd_client, rbd: update event interface for watch/notify2
Date: Fri, 12 Jun 2015 08:56:57 -0700	[thread overview]
Message-ID: <1e922f4d9ba0ea00be5a387247866f44cb0b6488.1434124007.git.dfuller@redhat.com> (raw)
In-Reply-To: <cover.1434124007.git.dfuller@redhat.com>
In-Reply-To: <cover.1434124007.git.dfuller@redhat.com>

Change unused ceph_osd_event structure to refer to pending watch/notify2
messages. Watch events include the separate watch and watch error callbacks
used for watch/notify2. Update rbd to use separate watch and watch error
callbacks via the new watch event.

Signed-off-by: Douglas Fuller <dfuller@redhat.com>
---
 drivers/block/rbd.c             |  41 +++++++---
 include/linux/ceph/osd_client.h |  27 +++++--
 net/ceph/osd_client.c           | 175 +++++++++++++++++++++++++++++-----------
 3 files changed, 179 insertions(+), 64 deletions(-)

diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index ed170b1..20b3b23 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -427,6 +427,8 @@ static ssize_t rbd_remove_single_major(struct bus_type *bus, const char *buf,
 				       size_t count);
 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping);
 static void rbd_spec_put(struct rbd_spec *spec);
+static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev);
+static void rbd_dev_header_unwatch_sync(struct rbd_device *rbd_dev);
 
 static int rbd_dev_id_to_minor(int dev_id)
 {
@@ -3103,19 +3105,17 @@ out:
 	return ret;
 }
 
-static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, s32 return_code,
-			 u64 notifier_gid, void *data, void *payload,
-			 u32 payload_len)
+static void rbd_watch_cb(void *arg, u64 notify_id, u64 cookie, u64 notifier_id,
+                        void *data, size_t data_len)
 {
-	struct rbd_device *rbd_dev = (struct rbd_device *)data;
+	struct rbd_device *rbd_dev = (struct rbd_device *)arg;
 	int ret;
 
 	if (!rbd_dev)
 		return;
 
-	dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
-		rbd_dev->header_name, (unsigned long long)notify_id,
-		(unsigned int)opcode);
+	dout("%s: \"%s\" notify_id %llu bl len %lu\n", __func__,
+	    rbd_dev->header_name, (unsigned long long)notify_id, data_len);
 
 	/*
 	 * Until adequate refresh error handling is in place, there is
@@ -3132,6 +3132,26 @@ static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, s32 return_code,
 		rbd_warn(rbd_dev, "notify_ack ret %d", ret);
 }
 
+static void rbd_watch_error_cb(void *arg, u64 cookie, int err)
+{
+	struct rbd_device *rbd_dev = (struct rbd_device *)arg;
+	int ret;
+
+	dout("%s: watch error %d on cookie %llu\n", rbd_dev->header_name,
+		err, cookie);
+	rbd_warn(rbd_dev, "%s: watch error %d on cookie %llu\n",
+	         rbd_dev->header_name, err, cookie);
+
+	/* reset watch */
+	rbd_dev_refresh(rbd_dev);
+	rbd_dev_header_unwatch_sync(rbd_dev);
+	ret = rbd_dev_header_watch_sync(rbd_dev);
+	BUG_ON(ret); /* XXX: was the image deleted? can we be more graceful? */
+	rbd_dev_refresh(rbd_dev);
+	if (ret)
+		rbd_warn(rbd_dev, "refresh failed: %d", ret);
+}
+
 /*
  * Send a (un)watch request and wait for the ack.  Return a request
  * with a ref held on success or error.
@@ -3199,13 +3219,14 @@ static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev)
 	rbd_assert(!rbd_dev->watch_event);
 	rbd_assert(!rbd_dev->watch_request);
 
-	ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
-				     &rbd_dev->watch_event);
+	ret = ceph_osdc_create_watch_event(osdc, rbd_watch_cb,
+	                                  rbd_watch_error_cb,
+	                                  rbd_dev, &rbd_dev->watch_event);
 	if (ret < 0)
 		return ret;
 
 	obj_request = rbd_obj_watch_request_helper(rbd_dev,
-						CEPH_OSD_WATCH_OP_LEGACY_WATCH);
+						CEPH_OSD_WATCH_OP_WATCH);
 	if (IS_ERR(obj_request)) {
 		ceph_osdc_cancel_event(rbd_dev->watch_event);
 		rbd_dev->watch_event = NULL;
diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h
index 12732d3..b7d4234 100644
--- a/include/linux/ceph/osd_client.h
+++ b/include/linux/ceph/osd_client.h
@@ -108,6 +108,7 @@ struct ceph_osd_req_op {
 			u64 ver;
 			__u8 op;
 			u32 gen;
+			struct ceph_osd_data request_data;
 		} watch;
 		struct {
 			u64 cookie;
@@ -186,13 +187,21 @@ struct ceph_request_redirect {
 
 struct ceph_osd_event {
 	u64 cookie;
-	int one_shot;
 	struct ceph_osd_client *osdc;
-	void (*cb)(u64, u64, u8, s32, u64, void *, void *, u32);
+	struct ceph_osd_request *osd_req;
 	void *data;
 	struct rb_node node;
-	struct list_head osd_node;
 	struct kref kref;
+	union {
+		struct {
+			void (*watchcb)(void *, u64, u64, u64, void *, size_t);
+			void (*errcb)(void *, u64, int);
+		} watch;
+		struct {
+			struct ceph_msg_data *notify_data;
+			struct completion complete;
+		} notify;
+	};
 };
 
 struct ceph_osd_event_work {
@@ -385,10 +394,14 @@ extern int ceph_osdc_writepages(struct ceph_osd_client *osdc,
 				struct page **pages, int nr_pages);
 
 /* watch/notify events */
-extern int ceph_osdc_create_event(struct ceph_osd_client *osdc,
-				  void (*event_cb)(u64, u64, u8, s32, u64,
-						   void *, void *, u32),
-				  void *data, struct ceph_osd_event **pevent);
+extern int ceph_osdc_create_watch_event(struct ceph_osd_client *osdc,
+                         void (*watchcb)(void *, u64, u64, u64, void *, size_t),
+                         void (*errcb)(void *, u64, int),
+                         void *data, struct ceph_osd_event **pevent);
+extern int ceph_osdc_create_notify_event(struct ceph_osd_client *osdc,
+                                         struct ceph_osd_event **pevent);
+extern int ceph_osdc_wait_event(struct ceph_osd_client *osdc,
+				struct ceph_osd_event *event);
 extern void ceph_osdc_cancel_event(struct ceph_osd_event *event);
 extern void ceph_osdc_put_event(struct ceph_osd_event *event);
 #endif
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index 74650e1..d435bf2 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -36,7 +36,13 @@ static void __unregister_linger_request(struct ceph_osd_client *osdc,
 					struct ceph_osd_request *req);
 static void __enqueue_request(struct ceph_osd_request *req);
 static void __send_request(struct ceph_osd_client *osdc,
-			   struct ceph_osd_request *req);
+                           struct ceph_osd_request *req);
+static struct ceph_osd_event *__find_event(struct ceph_osd_client *osdc,
+                                           u64 cookie);
+static void __do_event(struct ceph_osd_client *osdc, u8 opcode,
+                       u64 cookie, u64 notify_id, u32 payload_len,
+                       void *payload, s32 return_code, u64 notifier_gid,
+                       struct ceph_msg_data *data);
 
 /*
  * Implement client access to distributed object storage cluster.
@@ -615,10 +621,12 @@ int osd_req_op_xattr_init(struct ceph_osd_request *osd_req, unsigned int which,
 }
 EXPORT_SYMBOL(osd_req_op_xattr_init);
 
-void osd_req_op_notify_init(struct ceph_osd_request *osd_req, unsigned int which,
+void osd_req_op_notify_init(struct ceph_osd_request *osd_req,
+                            unsigned int which,
 			    u16 opcode, u64 cookie)
 {
-	struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, opcode);
+	struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, opcode, 0);
+	struct ceph_osd_event *notify_event;
 
 	BUG_ON(opcode != CEPH_OSD_OP_NOTIFY);
 	op->watch.cookie = cookie;
@@ -2273,7 +2281,7 @@ void ceph_osdc_put_event(struct ceph_osd_event *event)
 EXPORT_SYMBOL(ceph_osdc_put_event);
 
 static void __insert_event(struct ceph_osd_client *osdc,
-			     struct ceph_osd_event *new)
+                           struct ceph_osd_event *new)
 {
 	struct rb_node **p = &osdc->event_tree.rb_node;
 	struct rb_node *parent = NULL;
@@ -2295,7 +2303,7 @@ static void __insert_event(struct ceph_osd_client *osdc,
 }
 
 static struct ceph_osd_event *__find_event(struct ceph_osd_client *osdc,
-					        u64 cookie)
+                                           u64 cookie)
 {
 	struct rb_node **p = &osdc->event_tree.rb_node;
 	struct rb_node *parent = NULL;
@@ -2327,27 +2335,60 @@ static void __remove_event(struct ceph_osd_event *event)
 	}
 }
 
-int ceph_osdc_create_event(struct ceph_osd_client *osdc,
-			   void (*event_cb)(u64, u64, u8, s32, u64, void *,
-					    void *, u32),
-			   void *data, struct ceph_osd_event **pevent)
+static struct ceph_osd_event *__alloc_event(struct ceph_osd_client *osdc,
+                                            void *data)
 {
 	struct ceph_osd_event *event;
 
 	event = kmalloc(sizeof(*event), GFP_NOIO);
 	if (!event)
-		return -ENOMEM;
+		return NULL;
 
 	dout("create_event %p\n", event);
-	event->cb = event_cb;
-	event->one_shot = 0;
 	event->data = data;
 	event->osdc = osdc;
-	INIT_LIST_HEAD(&event->osd_node);
+	event->osd_req = NULL;
 	RB_CLEAR_NODE(&event->node);
 	kref_init(&event->kref);   /* one ref for us */
 	kref_get(&event->kref);    /* one ref for the caller */
 
+	return event;
+}
+
+int ceph_osdc_create_watch_event (struct ceph_osd_client *osdc,
+                         void (*watchcb)(void *, u64, u64, u64, void *, size_t),
+                         void (*errcb)(void *, u64, int),
+                         void *data, struct ceph_osd_event **pevent)
+{
+	struct ceph_osd_event *event;
+	
+	event = __alloc_event(osdc, data);
+	if (!event)
+		return -ENOMEM;
+
+	event->watch.watchcb = watchcb;
+	event->watch.errcb = errcb;
+
+	spin_lock(&osdc->event_lock);
+	event->cookie = ++osdc->event_count;
+	__insert_event(osdc, event);
+	spin_unlock(&osdc->event_lock);
+	*pevent = event;
+	return 0;
+}
+EXPORT_SYMBOL(ceph_osdc_create_watch_event);
+
+int ceph_osdc_create_notify_event(struct ceph_osd_client *osdc,
+                                  struct ceph_osd_event **pevent)
+{
+	struct ceph_osd_event *event;
+	
+	event = __alloc_event(osdc, NULL);
+	if (!event)
+		return -ENOMEM;
+
+	init_completion(&event->notify.complete);
+
 	spin_lock(&osdc->event_lock);
 	event->cookie = ++osdc->event_count;
 	__insert_event(osdc, event);
@@ -2356,7 +2397,15 @@ int ceph_osdc_create_event(struct ceph_osd_client *osdc,
 	*pevent = event;
 	return 0;
 }
-EXPORT_SYMBOL(ceph_osdc_create_event);
+EXPORT_SYMBOL(ceph_osdc_create_notify_event);
+
+int ceph_osdc_wait_event (struct ceph_osd_client *osdc,
+                          struct ceph_osd_event *event)
+{
+	wait_for_completion(&event->notify.complete);
+	return 0;
+}
+EXPORT_SYMBOL(ceph_osdc_wait_event);
 
 void ceph_osdc_cancel_event(struct ceph_osd_event *event)
 {
@@ -2376,20 +2425,79 @@ static void do_event_work(struct work_struct *work)
 	struct ceph_osd_event_work *event_work =
 		container_of(work, struct ceph_osd_event_work, work);
 	struct ceph_osd_event *event = event_work->event;
-	u64 ver = event_work->ver;
 	u64 notify_id = event_work->notify_id;
 	u8 opcode = event_work->opcode;
 	s32 return_code = event_work->return_code;
 	u64 notifier_gid = event_work->notifier_gid;
 
 	dout("do_event_work completing %p\n", event);
-	event->cb(ver, notify_id, opcode, return_code, notifier_gid,
-		  event->data, event_work->payload, event_work->payload_len);
+	if (opcode == CEPH_WATCH_EVENT_NOTIFY)
+		event->watch.watchcb(event->data, notify_id,
+		                     event->cookie, notifier_gid,
+		                     event_work->payload,
+		                     event_work->payload_len);
+	else if (opcode == CEPH_WATCH_EVENT_DISCONNECT && event->watch.errcb)
+		event->watch.errcb(event->data, event->cookie, return_code);
 	dout("do_event_work completed %p\n", event);
 	ceph_osdc_put_event(event);
 	kfree(event_work);
 }
 
+static void __do_event(struct ceph_osd_client *osdc, u8 opcode,
+                       u64 cookie, u64 notify_id, u32 payload_len,
+                       void *payload, s32 return_code, u64 notifier_gid,
+                       struct ceph_msg_data *data)
+{
+	struct ceph_osd_event *event;
+	struct ceph_osd_event_work *event_work;
+
+	spin_lock(&osdc->event_lock);
+	event = __find_event(osdc, cookie);
+	if (event)
+		get_event(event);
+	spin_unlock(&osdc->event_lock);
+
+	dout("handle_watch_notify cookie %lld event %p notify id %llu payload "
+	     "len %u return code %d notifier gid %llu\n",
+             cookie, event, notify_id, payload_len, return_code, notifier_gid);
+	switch(opcode) {
+		case CEPH_WATCH_EVENT_NOTIFY:
+		case CEPH_WATCH_EVENT_DISCONNECT:
+			if (event) {
+				event_work = kmalloc(sizeof(*event_work),
+				                     GFP_NOIO);
+				if (!event_work) {
+					pr_err("couldn't allocate event_work\n");
+					ceph_osdc_put_event(event);
+					return;
+				}
+				INIT_WORK(&event_work->work, do_event_work);
+				event_work->event = event;
+				event_work->notify_id = notify_id;
+				event_work->opcode = opcode;
+				event_work->return_code = return_code;
+				event_work->notifier_gid = notifier_gid;
+				event_work->payload = payload;
+				event_work->payload_len = payload_len;
+
+				queue_work(osdc->notify_wq, &event_work->work);
+			}
+			break;
+		case CEPH_WATCH_EVENT_NOTIFY_COMPLETE:
+			if (event) {
+				event->notify.notify_data = data;
+				if (event->osd_req) {
+					ceph_osdc_cancel_request(event->osd_req);
+					event->osd_req = NULL;
+				}
+				complete_all(&event->notify.complete);
+			}
+			break;
+		default:
+			BUG();
+			break;
+	}
+}
 
 /*
  * Process osd watch notifications
@@ -2398,13 +2506,12 @@ static void handle_watch_notify(struct ceph_osd_client *osdc,
 				struct ceph_msg *msg)
 {
 	void *p, *end, *payload = NULL;
+	struct ceph_msg_data *data = NULL;
 	u8 proto_ver;
 	u64 cookie, ver, notify_id, notifier_gid = 0;
 	u8 opcode;
 	u32 payload_len = 0;
 	s32 return_code = 0;
-	struct ceph_osd_event *event;
-	struct ceph_osd_event_work *event_work;
 
 	p = msg->front.iov_base;
 	end = p + msg->front.iov_len;
@@ -2429,34 +2536,8 @@ static void handle_watch_notify(struct ceph_osd_client *osdc,
 	if (msg->hdr.version >= 3)
 		ceph_decode_32_safe(&p, end, notifier_gid, bad);
 
-	spin_lock(&osdc->event_lock);
-	event = __find_event(osdc, cookie);
-	if (event) {
-		BUG_ON(event->one_shot);
-		get_event(event);
-	}
-	spin_unlock(&osdc->event_lock);
-	dout("handle_watch_notify cookie %lld ver %lld event %p notify id %llu payload len %u return code %d notifier gid %llu\n",
-	     cookie, ver, event, notify_id, payload_len, return_code, notifier_gid);
-	if (event) {
-		event_work = kmalloc(sizeof(*event_work), GFP_NOIO);
-		if (!event_work) {
-			pr_err("couldn't allocate event_work\n");
-			ceph_osdc_put_event(event);
-			return;
-		}
-		INIT_WORK(&event_work->work, do_event_work);
-		event_work->event = event;
-		event_work->ver = ver;
-		event_work->notify_id = notify_id;
-		event_work->opcode = opcode;
-		event_work->return_code = return_code;
-		event_work->notifier_gid = notifier_gid;
-		event_work->payload = payload;
-		event_work->payload_len = payload_len;
-
-		queue_work(osdc->notify_wq, &event_work->work);
-	}
+	__do_event(osdc, opcode, cookie, notify_id, payload_len, payload,
+		return_code, notifier_gid, data);
 
 	return;
 
-- 
1.9.3


  parent reply	other threads:[~2015-06-12 15:58 UTC|newest]

Thread overview: 18+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2015-06-12 15:56 [PATCH 0/6] support watch/notify version 2 Douglas Fuller
2015-06-12 15:56 ` [PATCH 1/6] ceph/rbd: add support for watch notify payloads Douglas Fuller
2015-06-16 21:22   ` Josh Durgin
2015-06-12 15:56 ` [PATCH 2/6] ceph/rbd: add support for header version 2 and 3 Douglas Fuller
2015-06-16 21:23   ` Josh Durgin
2015-06-12 15:56 ` [PATCH 3/6] ceph/rbd: update watch-notify ceph_osd_op Douglas Fuller
2015-06-16 22:00   ` Josh Durgin
2015-06-12 15:56 ` Douglas Fuller [this message]
2015-06-16 14:58   ` [PATCH 4/6] osd_client, rbd: update event interface for watch/notify2 Mike Christie
2015-06-16 17:05     ` Douglas Fuller
2015-06-16 23:18   ` Josh Durgin
2015-06-17 13:28     ` Douglas Fuller
2015-06-17 13:41       ` Ilya Dryomov
2015-06-12 15:56 ` [PATCH 5/6] osd_client: add support for notify payloads via notify event Douglas Fuller
2015-06-16 15:26   ` Mike Christie
2015-06-16 17:22     ` Douglas Fuller
2015-06-12 15:56 ` [PATCH 6/6] osd_client: send watch ping messages Douglas Fuller
2015-06-16 15:07   ` Mike Christie

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=1e922f4d9ba0ea00be5a387247866f44cb0b6488.1434124007.git.dfuller@redhat.com \
    --to=dfuller@redhat.com \
    --cc=ceph-devel@vger.kernel.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.