All of lore.kernel.org
 help / color / mirror / Atom feed
* [PATCH 00/16] rbd: support for exclusive-lock + mpath remap bits
@ 2016-08-24 13:18 Ilya Dryomov
  2016-08-24 13:18 ` [PATCH 01/16] libceph: rename ceph_entity_name_encode() -> ceph_auth_entity_name_encode() Ilya Dryomov
                   ` (16 more replies)
  0 siblings, 17 replies; 29+ messages in thread
From: Ilya Dryomov @ 2016-08-24 13:18 UTC (permalink / raw)
  To: ceph-devel

Hello,

This series already has a general thumbs up from Mike and an ack on
sysfs interface additions from Sage, but, as wip-exclusive-lock branch
has been through quite a few rebases recently, I wanted to send what
hopefully is the final version for a wider review.

Thanks,

                Ilya


Douglas Fuller (5):
  libceph: support for CEPH_OSD_OP_LIST_WATCHERS
  libceph: add ceph_osdc_call() single-page helper
  libceph: support for advisory locking on RADOS objects
  libceph: support for lock.lock_info
  libceph: support for blacklisting clients

Ilya Dryomov (7):
  libceph: rename ceph_entity_name_encode() ->
    ceph_auth_entity_name_encode()
  libceph: rename ceph_client_id() -> ceph_client_gid()
  rbd: introduce a per-device ordered workqueue
  rbd: retry watch re-registration periodically
  rbd: support for exclusive-lock feature
  rbd: print capacity in decimal and features in hex
  rbd: add 'client_addr' sysfs rbd device attribute

Mike Christie (4):
  rbd: add 'cluster_fsid' sysfs rbd device attribute
  rbd: add 'snap_id' sysfs rbd device attribute
  rbd: add 'config_info' sysfs rbd device attribute
  rbd: add force close option

 Documentation/ABI/testing/sysfs-bus-rbd |   29 +-
 drivers/block/rbd.c                     | 1195 +++++++++++++++++++++++++++----
 drivers/block/rbd_types.h               |   11 +
 include/linux/ceph/auth.h               |    2 +-
 include/linux/ceph/ceph_fs.h            |   11 +
 include/linux/ceph/cls_lock_client.h    |   49 ++
 include/linux/ceph/libceph.h            |    3 +-
 include/linux/ceph/mon_client.h         |    3 +
 include/linux/ceph/osd_client.h         |   23 +-
 net/ceph/Makefile                       |    1 +
 net/ceph/auth.c                         |    7 +-
 net/ceph/auth_none.c                    |    2 +-
 net/ceph/ceph_common.c                  |   13 +-
 net/ceph/ceph_strings.c                 |    1 +
 net/ceph/cls_lock_client.c              |  325 +++++++++
 net/ceph/mon_client.c                   |   82 +++
 net/ceph/osd_client.c                   |  169 +++++
 17 files changed, 1777 insertions(+), 149 deletions(-)
 create mode 100644 include/linux/ceph/cls_lock_client.h
 create mode 100644 net/ceph/cls_lock_client.c

-- 
2.4.3


^ permalink raw reply	[flat|nested] 29+ messages in thread

* [PATCH 01/16] libceph: rename ceph_entity_name_encode() -> ceph_auth_entity_name_encode()
  2016-08-24 13:18 [PATCH 00/16] rbd: support for exclusive-lock + mpath remap bits Ilya Dryomov
@ 2016-08-24 13:18 ` Ilya Dryomov
  2016-08-24 18:49   ` Alex Elder
  2016-08-24 13:18 ` [PATCH 02/16] libceph: support for CEPH_OSD_OP_LIST_WATCHERS Ilya Dryomov
                   ` (15 subsequent siblings)
  16 siblings, 1 reply; 29+ messages in thread
From: Ilya Dryomov @ 2016-08-24 13:18 UTC (permalink / raw)
  To: ceph-devel

Clear up EntityName vs entity_name_t confusion.

Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
---
 include/linux/ceph/auth.h | 2 +-
 net/ceph/auth.c           | 7 +++++--
 net/ceph/auth_none.c      | 2 +-
 3 files changed, 7 insertions(+), 4 deletions(-)

diff --git a/include/linux/ceph/auth.h b/include/linux/ceph/auth.h
index 1563265d2097..374bb1c4ef52 100644
--- a/include/linux/ceph/auth.h
+++ b/include/linux/ceph/auth.h
@@ -104,7 +104,7 @@ extern int ceph_auth_build_hello(struct ceph_auth_client *ac,
 extern int ceph_handle_auth_reply(struct ceph_auth_client *ac,
 				  void *buf, size_t len,
 				  void *reply_buf, size_t reply_len);
-extern int ceph_entity_name_encode(const char *name, void **p, void *end);
+int ceph_auth_entity_name_encode(const char *name, void **p, void *end);
 
 extern int ceph_build_auth(struct ceph_auth_client *ac,
 		    void *msg_buf, size_t msg_len);
diff --git a/net/ceph/auth.c b/net/ceph/auth.c
index 2bc5965fdd1e..78067dda9d3c 100644
--- a/net/ceph/auth.c
+++ b/net/ceph/auth.c
@@ -82,7 +82,10 @@ void ceph_auth_reset(struct ceph_auth_client *ac)
 	mutex_unlock(&ac->mutex);
 }
 
-int ceph_entity_name_encode(const char *name, void **p, void *end)
+/*
+ * EntityName, not to be confused with entity_name_t
+ */
+int ceph_auth_entity_name_encode(const char *name, void **p, void *end)
 {
 	int len = strlen(name);
 
@@ -124,7 +127,7 @@ int ceph_auth_build_hello(struct ceph_auth_client *ac, void *buf, size_t len)
 	for (i = 0; i < num; i++)
 		ceph_encode_32(&p, supported_protocols[i]);
 
-	ret = ceph_entity_name_encode(ac->name, &p, end);
+	ret = ceph_auth_entity_name_encode(ac->name, &p, end);
 	if (ret < 0)
 		goto out;
 	ceph_decode_need(&p, end, sizeof(u64), bad);
diff --git a/net/ceph/auth_none.c b/net/ceph/auth_none.c
index 5f836f02ae36..df45e467c81f 100644
--- a/net/ceph/auth_none.c
+++ b/net/ceph/auth_none.c
@@ -46,7 +46,7 @@ static int ceph_auth_none_build_authorizer(struct ceph_auth_client *ac,
 	int ret;
 
 	ceph_encode_8_safe(&p, end, 1, e_range);
-	ret = ceph_entity_name_encode(ac->name, &p, end);
+	ret = ceph_auth_entity_name_encode(ac->name, &p, end);
 	if (ret < 0)
 		return ret;
 
-- 
2.4.3


^ permalink raw reply related	[flat|nested] 29+ messages in thread

* [PATCH 02/16] libceph: support for CEPH_OSD_OP_LIST_WATCHERS
  2016-08-24 13:18 [PATCH 00/16] rbd: support for exclusive-lock + mpath remap bits Ilya Dryomov
  2016-08-24 13:18 ` [PATCH 01/16] libceph: rename ceph_entity_name_encode() -> ceph_auth_entity_name_encode() Ilya Dryomov
@ 2016-08-24 13:18 ` Ilya Dryomov
  2016-08-24 19:29   ` Alex Elder
  2016-08-24 13:18 ` [PATCH 03/16] libceph: add ceph_osdc_call() single-page helper Ilya Dryomov
                   ` (14 subsequent siblings)
  16 siblings, 1 reply; 29+ messages in thread
From: Ilya Dryomov @ 2016-08-24 13:18 UTC (permalink / raw)
  To: ceph-devel

From: Douglas Fuller <dfuller@redhat.com>

Add support for this Ceph OSD op, needed to support the RBD exclusive
lock feature.

Signed-off-by: Douglas Fuller <dfuller@redhat.com>
[idryomov@gmail.com: refactor, misc fixes throughout]
Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
---
 include/linux/ceph/osd_client.h |  15 +++++-
 net/ceph/osd_client.c           | 117 ++++++++++++++++++++++++++++++++++++++++
 2 files changed, 131 insertions(+), 1 deletion(-)

diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h
index 858932304260..19821a191732 100644
--- a/include/linux/ceph/osd_client.h
+++ b/include/linux/ceph/osd_client.h
@@ -121,6 +121,9 @@ struct ceph_osd_req_op {
 			struct ceph_osd_data response_data;
 		} notify;
 		struct {
+			struct ceph_osd_data response_data;
+		} list_watchers;
+		struct {
 			u64 expected_object_size;
 			u64 expected_write_size;
 		} alloc_hint;
@@ -249,6 +252,12 @@ struct ceph_osd_linger_request {
 	size_t *preply_len;
 };
 
+struct ceph_watch_item {
+	struct ceph_entity_name name;
+	u64 cookie;
+	struct ceph_entity_addr addr;
+};
+
 struct ceph_osd_client {
 	struct ceph_client     *client;
 
@@ -346,7 +355,6 @@ extern void osd_req_op_cls_response_data_pages(struct ceph_osd_request *,
 					struct page **pages, u64 length,
 					u32 alignment, bool pages_from_pool,
 					bool own_pages);
-
 extern void osd_req_op_cls_init(struct ceph_osd_request *osd_req,
 					unsigned int which, u16 opcode,
 					const char *class, const char *method);
@@ -434,5 +442,10 @@ int ceph_osdc_notify(struct ceph_osd_client *osdc,
 		     size_t *preply_len);
 int ceph_osdc_watch_check(struct ceph_osd_client *osdc,
 			  struct ceph_osd_linger_request *lreq);
+int ceph_osdc_list_watchers(struct ceph_osd_client *osdc,
+			    struct ceph_object_id *oid,
+			    struct ceph_object_locator *oloc,
+			    struct ceph_watch_item **watchers,
+			    u32 *num_watchers);
 #endif
 
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index a97e7b506612..dd51ec8ce97f 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -338,6 +338,9 @@ static void osd_req_op_data_release(struct ceph_osd_request *osd_req,
 		ceph_osd_data_release(&op->notify.request_data);
 		ceph_osd_data_release(&op->notify.response_data);
 		break;
+	case CEPH_OSD_OP_LIST_WATCHERS:
+		ceph_osd_data_release(&op->list_watchers.response_data);
+		break;
 	default:
 		break;
 	}
@@ -863,6 +866,8 @@ static u32 osd_req_encode_op(struct ceph_osd_op *dst,
 	case CEPH_OSD_OP_NOTIFY:
 		dst->notify.cookie = cpu_to_le64(src->notify.cookie);
 		break;
+	case CEPH_OSD_OP_LIST_WATCHERS:
+		break;
 	case CEPH_OSD_OP_SETALLOCHINT:
 		dst->alloc_hint.expected_object_size =
 		    cpu_to_le64(src->alloc_hint.expected_object_size);
@@ -1445,6 +1450,10 @@ static void setup_request_data(struct ceph_osd_request *req,
 			ceph_osdc_msg_data_add(req->r_reply,
 					       &op->extent.osd_data);
 			break;
+		case CEPH_OSD_OP_LIST_WATCHERS:
+			ceph_osdc_msg_data_add(req->r_reply,
+					       &op->list_watchers.response_data);
+			break;
 
 		/* both */
 		case CEPH_OSD_OP_CALL:
@@ -3891,6 +3900,114 @@ int ceph_osdc_watch_check(struct ceph_osd_client *osdc,
 	return ret;
 }
 
+static int decode_watcher(void **p, void *end, struct ceph_watch_item *item)
+{
+	u8 struct_v;
+	u32 struct_len;
+	int ret;
+
+	ret = ceph_start_decoding(p, end, 2, "watch_item_t",
+				  &struct_v, &struct_len);
+	if (ret)
+		return ret;
+
+	ceph_decode_copy(p, &item->name, sizeof(item->name));
+	item->cookie = ceph_decode_64(p);
+	*p += 4; /* skip timeout_seconds */
+	if (struct_v >= 2) {
+		ceph_decode_copy(p, &item->addr, sizeof(item->addr));
+		ceph_decode_addr(&item->addr);
+	}
+
+	dout("%s %s%llu cookie %llu addr %s\n", __func__,
+	     ENTITY_NAME(item->name), item->cookie,
+	     ceph_pr_addr(&item->addr.in_addr));
+	return 0;
+}
+
+static int decode_watchers(void **p, void *end,
+			   struct ceph_watch_item **watchers,
+			   u32 *num_watchers)
+{
+	u8 struct_v;
+	u32 struct_len;
+	int i;
+	int ret;
+
+	ret = ceph_start_decoding(p, end, 1, "obj_list_watch_response_t",
+				  &struct_v, &struct_len);
+	if (ret)
+		return ret;
+
+	*num_watchers = ceph_decode_32(p);
+	*watchers = kcalloc(*num_watchers, sizeof(**watchers), GFP_NOIO);
+	if (!*watchers)
+		return -ENOMEM;
+
+	for (i = 0; i < *num_watchers; i++) {
+		ret = decode_watcher(p, end, *watchers + i);
+		if (ret) {
+			kfree(*watchers);
+			return ret;
+		}
+	}
+
+	return 0;
+}
+
+/*
+ * On success, the caller is responsible for:
+ *
+ *     kfree(watchers);
+ */
+int ceph_osdc_list_watchers(struct ceph_osd_client *osdc,
+			    struct ceph_object_id *oid,
+			    struct ceph_object_locator *oloc,
+			    struct ceph_watch_item **watchers,
+			    u32 *num_watchers)
+{
+	struct ceph_osd_request *req;
+	struct page **pages;
+	int ret;
+
+	req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_NOIO);
+	if (!req)
+		return -ENOMEM;
+
+	ceph_oid_copy(&req->r_base_oid, oid);
+	ceph_oloc_copy(&req->r_base_oloc, oloc);
+	req->r_flags = CEPH_OSD_FLAG_READ;
+
+	ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
+	if (ret)
+		goto out_put_req;
+
+	pages = ceph_alloc_page_vector(1, GFP_NOIO);
+	if (IS_ERR(pages)) {
+		ret = PTR_ERR(pages);
+		goto out_put_req;
+	}
+
+	osd_req_op_init(req, 0, CEPH_OSD_OP_LIST_WATCHERS, 0);
+	ceph_osd_data_pages_init(osd_req_op_data(req, 0, list_watchers,
+						 response_data),
+				 pages, PAGE_SIZE, 0, false, true);
+
+	ceph_osdc_start_request(osdc, req, false);
+	ret = ceph_osdc_wait_request(osdc, req);
+	if (ret >= 0) {
+		void *p = page_address(pages[0]);
+		void *const end = p + req->r_ops[0].outdata_len;
+
+		ret = decode_watchers(&p, end, watchers, num_watchers);
+	}
+
+out_put_req:
+	ceph_osdc_put_request(req);
+	return ret;
+}
+EXPORT_SYMBOL(ceph_osdc_list_watchers);
+
 /*
  * Call all pending notify callbacks - for use after a watch is
  * unregistered, to make sure no more callbacks for it will be invoked
-- 
2.4.3


^ permalink raw reply related	[flat|nested] 29+ messages in thread

* [PATCH 03/16] libceph: add ceph_osdc_call() single-page helper
  2016-08-24 13:18 [PATCH 00/16] rbd: support for exclusive-lock + mpath remap bits Ilya Dryomov
  2016-08-24 13:18 ` [PATCH 01/16] libceph: rename ceph_entity_name_encode() -> ceph_auth_entity_name_encode() Ilya Dryomov
  2016-08-24 13:18 ` [PATCH 02/16] libceph: support for CEPH_OSD_OP_LIST_WATCHERS Ilya Dryomov
@ 2016-08-24 13:18 ` Ilya Dryomov
  2016-08-24 19:37   ` Alex Elder
  2016-08-24 13:18 ` [PATCH 04/16] libceph: support for advisory locking on RADOS objects Ilya Dryomov
                   ` (13 subsequent siblings)
  16 siblings, 1 reply; 29+ messages in thread
From: Ilya Dryomov @ 2016-08-24 13:18 UTC (permalink / raw)
  To: ceph-devel

From: Douglas Fuller <dfuller@redhat.com>

Add a convenience function to osd_client to send Ceph OSD
'class' ops. The interface assumes that the request and
reply data each consist of single pages.

Signed-off-by: Douglas Fuller <dfuller@redhat.com>
Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
---
 include/linux/ceph/osd_client.h |  8 +++++++
 net/ceph/osd_client.c           | 51 +++++++++++++++++++++++++++++++++++++++++
 2 files changed, 59 insertions(+)

diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h
index 19821a191732..96337b15a60d 100644
--- a/include/linux/ceph/osd_client.h
+++ b/include/linux/ceph/osd_client.h
@@ -397,6 +397,14 @@ extern void ceph_osdc_sync(struct ceph_osd_client *osdc);
 extern void ceph_osdc_flush_notifies(struct ceph_osd_client *osdc);
 void ceph_osdc_maybe_request_map(struct ceph_osd_client *osdc);
 
+int ceph_osdc_call(struct ceph_osd_client *osdc,
+		   struct ceph_object_id *oid,
+		   struct ceph_object_locator *oloc,
+		   const char *class, const char *method,
+		   unsigned int flags,
+		   struct page *req_page, size_t req_len,
+		   struct page *resp_page, size_t *resp_len);
+
 extern int ceph_osdc_readpages(struct ceph_osd_client *osdc,
 			       struct ceph_vino vino,
 			       struct ceph_file_layout *layout,
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index dd51ec8ce97f..fbc6b7090c65 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -4027,6 +4027,57 @@ void ceph_osdc_maybe_request_map(struct ceph_osd_client *osdc)
 EXPORT_SYMBOL(ceph_osdc_maybe_request_map);
 
 /*
+ * Execute an OSD class method on an object.
+ *
+ * @flags: CEPH_OSD_FLAG_*
+ * @resp_len: out param for reply length
+ */
+int ceph_osdc_call(struct ceph_osd_client *osdc,
+		   struct ceph_object_id *oid,
+		   struct ceph_object_locator *oloc,
+		   const char *class, const char *method,
+		   unsigned int flags,
+		   struct page *req_page, size_t req_len,
+		   struct page *resp_page, size_t *resp_len)
+{
+	struct ceph_osd_request *req;
+	int ret;
+
+	req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_NOIO);
+	if (!req)
+		return -ENOMEM;
+
+	ceph_oid_copy(&req->r_base_oid, oid);
+	ceph_oloc_copy(&req->r_base_oloc, oloc);
+	req->r_flags = flags;
+
+	ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
+	if (ret)
+		goto out_put_req;
+
+	osd_req_op_cls_init(req, 0, CEPH_OSD_OP_CALL, class, method);
+	if (req_page)
+		osd_req_op_cls_request_data_pages(req, 0, &req_page, req_len,
+						  0, false, false);
+	if (resp_page)
+		osd_req_op_cls_response_data_pages(req, 0, &resp_page,
+						   PAGE_SIZE, 0, false, false);
+
+	ceph_osdc_start_request(osdc, req, false);
+	ret = ceph_osdc_wait_request(osdc, req);
+	if (ret >= 0) {
+		ret = req->r_ops[0].rval;
+		if (resp_page)
+			*resp_len = req->r_ops[0].outdata_len;
+	}
+
+out_put_req:
+	ceph_osdc_put_request(req);
+	return ret;
+}
+EXPORT_SYMBOL(ceph_osdc_call);
+
+/*
  * init, shutdown
  */
 int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
-- 
2.4.3


^ permalink raw reply related	[flat|nested] 29+ messages in thread

* [PATCH 04/16] libceph: support for advisory locking on RADOS objects
  2016-08-24 13:18 [PATCH 00/16] rbd: support for exclusive-lock + mpath remap bits Ilya Dryomov
                   ` (2 preceding siblings ...)
  2016-08-24 13:18 ` [PATCH 03/16] libceph: add ceph_osdc_call() single-page helper Ilya Dryomov
@ 2016-08-24 13:18 ` Ilya Dryomov
  2016-08-24 19:42   ` Alex Elder
  2016-08-24 13:18 ` [PATCH 05/16] libceph: support for lock.lock_info Ilya Dryomov
                   ` (12 subsequent siblings)
  16 siblings, 1 reply; 29+ messages in thread
From: Ilya Dryomov @ 2016-08-24 13:18 UTC (permalink / raw)
  To: ceph-devel

From: Douglas Fuller <dfuller@redhat.com>

This patch adds support for rados lock, unlock and break lock.

Based heavily on code by Mike Christie <michaelc@cs.wisc.edu>.

Signed-off-by: Douglas Fuller <dfuller@redhat.com>
Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
---
 include/linux/ceph/cls_lock_client.h |  27 ++++++
 net/ceph/Makefile                    |   1 +
 net/ceph/cls_lock_client.c           | 180 +++++++++++++++++++++++++++++++++++
 3 files changed, 208 insertions(+)
 create mode 100644 include/linux/ceph/cls_lock_client.h
 create mode 100644 net/ceph/cls_lock_client.c

diff --git a/include/linux/ceph/cls_lock_client.h b/include/linux/ceph/cls_lock_client.h
new file mode 100644
index 000000000000..4e4dffef22bb
--- /dev/null
+++ b/include/linux/ceph/cls_lock_client.h
@@ -0,0 +1,27 @@
+#ifndef _LINUX_CEPH_CLS_LOCK_CLIENT_H
+#define _LINUX_CEPH_CLS_LOCK_CLIENT_H
+
+#include <linux/ceph/osd_client.h>
+
+enum ceph_cls_lock_type {
+	CEPH_CLS_LOCK_NONE = 0,
+	CEPH_CLS_LOCK_EXCLUSIVE = 1,
+	CEPH_CLS_LOCK_SHARED = 2,
+};
+
+int ceph_cls_lock(struct ceph_osd_client *osdc,
+		  struct ceph_object_id *oid,
+		  struct ceph_object_locator *oloc,
+		  char *lock_name, u8 type, char *cookie,
+		  char *tag, char *desc, u8 flags);
+int ceph_cls_unlock(struct ceph_osd_client *osdc,
+		    struct ceph_object_id *oid,
+		    struct ceph_object_locator *oloc,
+		    char *lock_name, char *cookie);
+int ceph_cls_break_lock(struct ceph_osd_client *osdc,
+			struct ceph_object_id *oid,
+			struct ceph_object_locator *oloc,
+			char *lock_name, char *cookie,
+			struct ceph_entity_name *locker);
+
+#endif
diff --git a/net/ceph/Makefile b/net/ceph/Makefile
index 84cbed630c4b..6a5180903e7b 100644
--- a/net/ceph/Makefile
+++ b/net/ceph/Makefile
@@ -5,6 +5,7 @@ obj-$(CONFIG_CEPH_LIB) += libceph.o
 
 libceph-y := ceph_common.o messenger.o msgpool.o buffer.o pagelist.o \
 	mon_client.o \
+	cls_lock_client.o \
 	osd_client.o osdmap.o crush/crush.o crush/mapper.o crush/hash.o \
 	debugfs.o \
 	auth.o auth_none.o \
diff --git a/net/ceph/cls_lock_client.c b/net/ceph/cls_lock_client.c
new file mode 100644
index 000000000000..2a314537f958
--- /dev/null
+++ b/net/ceph/cls_lock_client.c
@@ -0,0 +1,180 @@
+#include <linux/ceph/ceph_debug.h>
+
+#include <linux/types.h>
+#include <linux/slab.h>
+
+#include <linux/ceph/cls_lock_client.h>
+#include <linux/ceph/decode.h>
+
+/**
+ * ceph_cls_lock - grab rados lock for object
+ * @oid, @oloc: object to lock
+ * @lock_name: the name of the lock
+ * @type: lock type (CEPH_CLS_LOCK_EXCLUSIVE or CEPH_CLS_LOCK_SHARED)
+ * @cookie: user-defined identifier for this instance of the lock
+ * @tag: user-defined tag
+ * @desc: user-defined lock description
+ * @flags: lock flags
+ *
+ * All operations on the same lock should use the same tag.
+ */
+int ceph_cls_lock(struct ceph_osd_client *osdc,
+		  struct ceph_object_id *oid,
+		  struct ceph_object_locator *oloc,
+		  char *lock_name, u8 type, char *cookie,
+		  char *tag, char *desc, u8 flags)
+{
+	int lock_op_buf_size;
+	int name_len = strlen(lock_name);
+	int cookie_len = strlen(cookie);
+	int tag_len = strlen(tag);
+	int desc_len = strlen(desc);
+	void *p, *end;
+	struct page *lock_op_page;
+	struct timespec mtime;
+	int ret;
+
+	lock_op_buf_size = name_len + sizeof(__le32) +
+			   cookie_len + sizeof(__le32) +
+			   tag_len + sizeof(__le32) +
+			   desc_len + sizeof(__le32) +
+			   sizeof(struct ceph_timespec) +
+			   /* flag and type */
+			   sizeof(u8) + sizeof(u8) +
+			   CEPH_ENCODING_START_BLK_LEN;
+	if (lock_op_buf_size > PAGE_SIZE)
+		return -E2BIG;
+
+	lock_op_page = alloc_page(GFP_NOIO);
+	if (!lock_op_page)
+		return -ENOMEM;
+
+	p = page_address(lock_op_page);
+	end = p + lock_op_buf_size;
+
+	/* encode cls_lock_lock_op struct */
+	ceph_start_encoding(&p, 1, 1,
+			    lock_op_buf_size - CEPH_ENCODING_START_BLK_LEN);
+	ceph_encode_string(&p, end, lock_name, name_len);
+	ceph_encode_8(&p, type);
+	ceph_encode_string(&p, end, cookie, cookie_len);
+	ceph_encode_string(&p, end, tag, tag_len);
+	ceph_encode_string(&p, end, desc, desc_len);
+	/* only support infinite duration */
+	memset(&mtime, 0, sizeof(mtime));
+	ceph_encode_timespec(p, &mtime);
+	p += sizeof(struct ceph_timespec);
+	ceph_encode_8(&p, flags);
+
+	dout("%s lock_name %s type %d cookie %s tag %s desc %s flags 0x%x\n",
+	     __func__, lock_name, type, cookie, tag, desc, flags);
+	ret = ceph_osdc_call(osdc, oid, oloc, "lock", "lock",
+			     CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
+			     lock_op_page, lock_op_buf_size, NULL, NULL);
+
+	dout("%s: status %d\n", __func__, ret);
+	__free_page(lock_op_page);
+	return ret;
+}
+EXPORT_SYMBOL(ceph_cls_lock);
+
+/**
+ * ceph_cls_unlock - release rados lock for object
+ * @oid, @oloc: object to lock
+ * @lock_name: the name of the lock
+ * @cookie: user-defined identifier for this instance of the lock
+ */
+int ceph_cls_unlock(struct ceph_osd_client *osdc,
+		    struct ceph_object_id *oid,
+		    struct ceph_object_locator *oloc,
+		    char *lock_name, char *cookie)
+{
+	int unlock_op_buf_size;
+	int name_len = strlen(lock_name);
+	int cookie_len = strlen(cookie);
+	void *p, *end;
+	struct page *unlock_op_page;
+	int ret;
+
+	unlock_op_buf_size = name_len + sizeof(__le32) +
+			     cookie_len + sizeof(__le32) +
+			     CEPH_ENCODING_START_BLK_LEN;
+	if (unlock_op_buf_size > PAGE_SIZE)
+		return -E2BIG;
+
+	unlock_op_page = alloc_page(GFP_NOIO);
+	if (!unlock_op_page)
+		return -ENOMEM;
+
+	p = page_address(unlock_op_page);
+	end = p + unlock_op_buf_size;
+
+	/* encode cls_lock_unlock_op struct */
+	ceph_start_encoding(&p, 1, 1,
+			    unlock_op_buf_size - CEPH_ENCODING_START_BLK_LEN);
+	ceph_encode_string(&p, end, lock_name, name_len);
+	ceph_encode_string(&p, end, cookie, cookie_len);
+
+	dout("%s lock_name %s cookie %s\n", __func__, lock_name, cookie);
+	ret = ceph_osdc_call(osdc, oid, oloc, "lock", "unlock",
+			     CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
+			     unlock_op_page, unlock_op_buf_size, NULL, NULL);
+
+	dout("%s: status %d\n", __func__, ret);
+	__free_page(unlock_op_page);
+	return ret;
+}
+EXPORT_SYMBOL(ceph_cls_unlock);
+
+/**
+ * ceph_cls_break_lock - release rados lock for object for specified client
+ * @oid, @oloc: object to lock
+ * @lock_name: the name of the lock
+ * @cookie: user-defined identifier for this instance of the lock
+ * @locker: current lock owner
+ */
+int ceph_cls_break_lock(struct ceph_osd_client *osdc,
+			struct ceph_object_id *oid,
+			struct ceph_object_locator *oloc,
+			char *lock_name, char *cookie,
+			struct ceph_entity_name *locker)
+{
+	int break_op_buf_size;
+	int name_len = strlen(lock_name);
+	int cookie_len = strlen(cookie);
+	struct page *break_op_page;
+	void *p, *end;
+	int ret;
+
+	break_op_buf_size = name_len + sizeof(__le32) +
+			    cookie_len + sizeof(__le32) +
+			    sizeof(u8) + sizeof(__le64) +
+			    CEPH_ENCODING_START_BLK_LEN;
+	if (break_op_buf_size > PAGE_SIZE)
+		return -E2BIG;
+
+	break_op_page = alloc_page(GFP_NOIO);
+	if (!break_op_page)
+		return -ENOMEM;
+
+	p = page_address(break_op_page);
+	end = p + break_op_buf_size;
+
+	/* encode cls_lock_break_op struct */
+	ceph_start_encoding(&p, 1, 1,
+			    break_op_buf_size - CEPH_ENCODING_START_BLK_LEN);
+	ceph_encode_string(&p, end, lock_name, name_len);
+	ceph_encode_copy(&p, locker, sizeof(*locker));
+	ceph_encode_string(&p, end, cookie, cookie_len);
+
+	dout("%s lock_name %s cookie %s locker %s%llu\n", __func__, lock_name,
+	     cookie, ENTITY_NAME(*locker));
+	ret = ceph_osdc_call(osdc, oid, oloc, "lock", "break_lock",
+			     CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
+			     break_op_page, break_op_buf_size, NULL, NULL);
+
+	dout("%s: status %d\n", __func__, ret);
+	__free_page(break_op_page);
+	return ret;
+}
+EXPORT_SYMBOL(ceph_cls_break_lock);
-- 
2.4.3


^ permalink raw reply related	[flat|nested] 29+ messages in thread

* [PATCH 05/16] libceph: support for lock.lock_info
  2016-08-24 13:18 [PATCH 00/16] rbd: support for exclusive-lock + mpath remap bits Ilya Dryomov
                   ` (3 preceding siblings ...)
  2016-08-24 13:18 ` [PATCH 04/16] libceph: support for advisory locking on RADOS objects Ilya Dryomov
@ 2016-08-24 13:18 ` Ilya Dryomov
  2016-08-24 19:56   ` Alex Elder
  2016-08-24 13:18 ` [PATCH 06/16] libceph: support for blacklisting clients Ilya Dryomov
                   ` (11 subsequent siblings)
  16 siblings, 1 reply; 29+ messages in thread
From: Ilya Dryomov @ 2016-08-24 13:18 UTC (permalink / raw)
  To: ceph-devel

From: Douglas Fuller <dfuller@redhat.com>

Add an interface for the Ceph OSD lock.lock_info method and associated
data structures.

Based heavily on code by Mike Christie <michaelc@cs.wisc.edu>.

Signed-off-by: Douglas Fuller <dfuller@redhat.com>
[idryomov@gmail.com: refactor, misc fixes throughout]
Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
---
 include/linux/ceph/cls_lock_client.h |  22 ++++++
 net/ceph/cls_lock_client.c           | 145 +++++++++++++++++++++++++++++++++++
 2 files changed, 167 insertions(+)

diff --git a/include/linux/ceph/cls_lock_client.h b/include/linux/ceph/cls_lock_client.h
index 4e4dffef22bb..84884d8d4710 100644
--- a/include/linux/ceph/cls_lock_client.h
+++ b/include/linux/ceph/cls_lock_client.h
@@ -9,6 +9,20 @@ enum ceph_cls_lock_type {
 	CEPH_CLS_LOCK_SHARED = 2,
 };
 
+struct ceph_locker_id {
+	struct ceph_entity_name name;	/* locker's client name */
+	char *cookie;			/* locker's cookie */
+};
+
+struct ceph_locker_info {
+	struct ceph_entity_addr addr;	/* locker's address */
+};
+
+struct ceph_locker {
+	struct ceph_locker_id id;
+	struct ceph_locker_info info;
+};
+
 int ceph_cls_lock(struct ceph_osd_client *osdc,
 		  struct ceph_object_id *oid,
 		  struct ceph_object_locator *oloc,
@@ -24,4 +38,12 @@ int ceph_cls_break_lock(struct ceph_osd_client *osdc,
 			char *lock_name, char *cookie,
 			struct ceph_entity_name *locker);
 
+void ceph_free_lockers(struct ceph_locker *lockers, u32 num_lockers);
+
+int ceph_cls_lock_info(struct ceph_osd_client *osdc,
+		       struct ceph_object_id *oid,
+		       struct ceph_object_locator *oloc,
+		       char *lock_name, u8 *type, char **tag,
+		       struct ceph_locker **lockers, u32 *num_lockers);
+
 #endif
diff --git a/net/ceph/cls_lock_client.c b/net/ceph/cls_lock_client.c
index 2a314537f958..50f040fdb2a9 100644
--- a/net/ceph/cls_lock_client.c
+++ b/net/ceph/cls_lock_client.c
@@ -178,3 +178,148 @@ int ceph_cls_break_lock(struct ceph_osd_client *osdc,
 	return ret;
 }
 EXPORT_SYMBOL(ceph_cls_break_lock);
+
+void ceph_free_lockers(struct ceph_locker *lockers, u32 num_lockers)
+{
+	int i;
+
+	for (i = 0; i < num_lockers; i++)
+		kfree(lockers[i].id.cookie);
+	kfree(lockers);
+}
+EXPORT_SYMBOL(ceph_free_lockers);
+
+static int decode_locker(void **p, void *end, struct ceph_locker *locker)
+{
+	u8 struct_v;
+	u32 len;
+	char *s;
+	int ret;
+
+	ret = ceph_start_decoding(p, end, 1, "locker_id_t", &struct_v, &len);
+	if (ret)
+		return ret;
+
+	ceph_decode_copy(p, &locker->id.name, sizeof(locker->id.name));
+	s = ceph_extract_encoded_string(p, end, NULL, GFP_NOIO);
+	if (IS_ERR(s))
+		return PTR_ERR(s);
+
+	locker->id.cookie = s;
+
+	ret = ceph_start_decoding(p, end, 1, "locker_info_t", &struct_v, &len);
+	if (ret)
+		return ret;
+
+	*p += sizeof(struct ceph_timespec); /* skip expiration */
+	ceph_decode_copy(p, &locker->info.addr, sizeof(locker->info.addr));
+	ceph_decode_addr(&locker->info.addr);
+	len = ceph_decode_32(p);
+	*p += len; /* skip description */
+
+	dout("%s %s%llu cookie %s addr %s\n", __func__,
+	     ENTITY_NAME(locker->id.name), locker->id.cookie,
+	     ceph_pr_addr(&locker->info.addr.in_addr));
+	return 0;
+}
+
+static int decode_lockers(void **p, void *end, u8 *type, char **tag,
+			  struct ceph_locker **lockers, u32 *num_lockers)
+{
+	u8 struct_v;
+	u32 struct_len;
+	char *s;
+	int i;
+	int ret;
+
+	ret = ceph_start_decoding(p, end, 1, "cls_lock_get_info_reply",
+				  &struct_v, &struct_len);
+	if (ret)
+		return ret;
+
+	*num_lockers = ceph_decode_32(p);
+	*lockers = kcalloc(*num_lockers, sizeof(**lockers), GFP_NOIO);
+	if (!*lockers)
+		return -ENOMEM;
+
+	for (i = 0; i < *num_lockers; i++) {
+		ret = decode_locker(p, end, *lockers + i);
+		if (ret)
+			goto err_free_lockers;
+	}
+
+	*type = ceph_decode_8(p);
+	s = ceph_extract_encoded_string(p, end, NULL, GFP_NOIO);
+	if (IS_ERR(s)) {
+		ret = PTR_ERR(s);
+		goto err_free_lockers;
+	}
+
+	*tag = s;
+	return 0;
+
+err_free_lockers:
+	ceph_free_lockers(*lockers, *num_lockers);
+	return ret;
+}
+
+/*
+ * On success, the caller is responsible for:
+ *
+ *     kfree(tag);
+ *     ceph_free_lockers(lockers, num_lockers);
+ */
+int ceph_cls_lock_info(struct ceph_osd_client *osdc,
+		       struct ceph_object_id *oid,
+		       struct ceph_object_locator *oloc,
+		       char *lock_name, u8 *type, char **tag,
+		       struct ceph_locker **lockers, u32 *num_lockers)
+{
+	int get_info_op_buf_size;
+	int name_len = strlen(lock_name);
+	struct page *get_info_op_page, *reply_page;
+	size_t reply_len;
+	void *p, *end;
+	int ret;
+
+	get_info_op_buf_size = name_len + sizeof(__le32) +
+			       CEPH_ENCODING_START_BLK_LEN;
+	if (get_info_op_buf_size > PAGE_SIZE)
+		return -E2BIG;
+
+	get_info_op_page = alloc_page(GFP_NOIO);
+	if (!get_info_op_page)
+		return -ENOMEM;
+
+	reply_page = alloc_page(GFP_NOIO);
+	if (!reply_page) {
+		__free_page(get_info_op_page);
+		return -ENOMEM;
+	}
+
+	p = page_address(get_info_op_page);
+	end = p + get_info_op_buf_size;
+
+	/* encode cls_lock_get_info_op struct */
+	ceph_start_encoding(&p, 1, 1,
+			    get_info_op_buf_size - CEPH_ENCODING_START_BLK_LEN);
+	ceph_encode_string(&p, end, lock_name, name_len);
+
+	dout("%s lock_name %s\n", __func__, lock_name);
+	ret = ceph_osdc_call(osdc, oid, oloc, "lock", "get_info",
+			     CEPH_OSD_FLAG_READ, get_info_op_page,
+			     get_info_op_buf_size, reply_page, &reply_len);
+
+	dout("%s: status %d\n", __func__, ret);
+	if (ret >= 0) {
+		p = page_address(reply_page);
+		end = p + reply_len;
+
+		ret = decode_lockers(&p, end, type, tag, lockers, num_lockers);
+	}
+
+	__free_page(get_info_op_page);
+	__free_page(reply_page);
+	return ret;
+}
+EXPORT_SYMBOL(ceph_cls_lock_info);
-- 
2.4.3


^ permalink raw reply related	[flat|nested] 29+ messages in thread

* [PATCH 06/16] libceph: support for blacklisting clients
  2016-08-24 13:18 [PATCH 00/16] rbd: support for exclusive-lock + mpath remap bits Ilya Dryomov
                   ` (4 preceding siblings ...)
  2016-08-24 13:18 ` [PATCH 05/16] libceph: support for lock.lock_info Ilya Dryomov
@ 2016-08-24 13:18 ` Ilya Dryomov
  2016-08-24 19:59   ` Alex Elder
  2016-08-24 13:18 ` [PATCH 07/16] libceph: rename ceph_client_id() -> ceph_client_gid() Ilya Dryomov
                   ` (10 subsequent siblings)
  16 siblings, 1 reply; 29+ messages in thread
From: Ilya Dryomov @ 2016-08-24 13:18 UTC (permalink / raw)
  To: ceph-devel

From: Douglas Fuller <dfuller@redhat.com>

Reuse ceph_mon_generic_request infrastructure for sending monitor
commands.  In particular, add support for 'blacklist add' to prevent
other, non-responsive clients from making further updates.

Signed-off-by: Douglas Fuller <dfuller@redhat.com>
[idryomov@gmail.com: refactor, misc fixes throughout]
Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
---
 include/linux/ceph/ceph_fs.h    | 11 ++++++
 include/linux/ceph/mon_client.h |  3 ++
 net/ceph/mon_client.c           | 82 +++++++++++++++++++++++++++++++++++++++++
 3 files changed, 96 insertions(+)

diff --git a/include/linux/ceph/ceph_fs.h b/include/linux/ceph/ceph_fs.h
index 7868d602c0a0..c086e63dcee1 100644
--- a/include/linux/ceph/ceph_fs.h
+++ b/include/linux/ceph/ceph_fs.h
@@ -138,6 +138,9 @@ struct ceph_dir_layout {
 #define CEPH_MSG_POOLOP_REPLY           48
 #define CEPH_MSG_POOLOP                 49
 
+/* mon commands */
+#define CEPH_MSG_MON_COMMAND            50
+#define CEPH_MSG_MON_COMMAND_ACK        51
 
 /* osd */
 #define CEPH_MSG_OSD_MAP                41
@@ -176,6 +179,14 @@ struct ceph_mon_statfs_reply {
 	struct ceph_statfs st;
 } __attribute__ ((packed));
 
+struct ceph_mon_command {
+	struct ceph_mon_request_header monhdr;
+	struct ceph_fsid fsid;
+	__le32 num_strs;         /* always 1 */
+	__le32 str_len;
+	char str[];
+} __attribute__ ((packed));
+
 struct ceph_osd_getmap {
 	struct ceph_mon_request_header monhdr;
 	struct ceph_fsid fsid;
diff --git a/include/linux/ceph/mon_client.h b/include/linux/ceph/mon_client.h
index 24d704d1ea5c..d5a3ecea578d 100644
--- a/include/linux/ceph/mon_client.h
+++ b/include/linux/ceph/mon_client.h
@@ -141,6 +141,9 @@ int ceph_monc_get_version(struct ceph_mon_client *monc, const char *what,
 int ceph_monc_get_version_async(struct ceph_mon_client *monc, const char *what,
 				ceph_monc_callback_t cb, u64 private_data);
 
+int ceph_monc_blacklist_add(struct ceph_mon_client *monc,
+			    struct ceph_entity_addr *client_addr);
+
 extern int ceph_monc_open_session(struct ceph_mon_client *monc);
 
 extern int ceph_monc_validate_auth(struct ceph_mon_client *monc);
diff --git a/net/ceph/mon_client.c b/net/ceph/mon_client.c
index ef34a02719d7..a8effc8b7280 100644
--- a/net/ceph/mon_client.c
+++ b/net/ceph/mon_client.c
@@ -835,6 +835,83 @@ int ceph_monc_get_version_async(struct ceph_mon_client *monc, const char *what,
 }
 EXPORT_SYMBOL(ceph_monc_get_version_async);
 
+static void handle_command_ack(struct ceph_mon_client *monc,
+			       struct ceph_msg *msg)
+{
+	struct ceph_mon_generic_request *req;
+	void *p = msg->front.iov_base;
+	void *const end = p + msg->front_alloc_len;
+	u64 tid = le64_to_cpu(msg->hdr.tid);
+
+	dout("%s msg %p tid %llu\n", __func__, msg, tid);
+
+	ceph_decode_need(&p, end, sizeof(struct ceph_mon_request_header) +
+							    sizeof(u32), bad);
+	p += sizeof(struct ceph_mon_request_header);
+
+	mutex_lock(&monc->mutex);
+	req = lookup_generic_request(&monc->generic_request_tree, tid);
+	if (!req) {
+		mutex_unlock(&monc->mutex);
+		return;
+	}
+
+	req->result = ceph_decode_32(&p);
+	__finish_generic_request(req);
+	mutex_unlock(&monc->mutex);
+
+	complete_generic_request(req);
+	return;
+
+bad:
+	pr_err("corrupt mon_command ack, tid %llu\n", tid);
+	ceph_msg_dump(msg);
+}
+
+int ceph_monc_blacklist_add(struct ceph_mon_client *monc,
+			    struct ceph_entity_addr *client_addr)
+{
+	struct ceph_mon_generic_request *req;
+	struct ceph_mon_command *h;
+	int ret = -ENOMEM;
+	int len;
+
+	req = alloc_generic_request(monc, GFP_NOIO);
+	if (!req)
+		goto out;
+
+	req->request = ceph_msg_new(CEPH_MSG_MON_COMMAND, 256, GFP_NOIO, true);
+	if (!req->request)
+		goto out;
+
+	req->reply = ceph_msg_new(CEPH_MSG_MON_COMMAND_ACK, 512, GFP_NOIO,
+				  true);
+	if (!req->reply)
+		goto out;
+
+	mutex_lock(&monc->mutex);
+	register_generic_request(req);
+	h = req->request->front.iov_base;
+	h->monhdr.have_version = 0;
+	h->monhdr.session_mon = cpu_to_le16(-1);
+	h->monhdr.session_mon_tid = 0;
+	h->fsid = monc->monmap->fsid;
+	h->num_strs = cpu_to_le32(1);
+	len = sprintf(h->str, "{ \"prefix\": \"osd blacklist\", \
+		                 \"blacklistop\": \"add\", \
+				 \"addr\": \"%pISpc/%u\" }",
+		      &client_addr->in_addr, le32_to_cpu(client_addr->nonce));
+	h->str_len = cpu_to_le32(len);
+	send_generic_request(monc, req);
+	mutex_unlock(&monc->mutex);
+
+	ret = wait_generic_request(req);
+out:
+	put_generic_request(req);
+	return ret;
+}
+EXPORT_SYMBOL(ceph_monc_blacklist_add);
+
 /*
  * Resend pending generic requests.
  */
@@ -1139,6 +1216,10 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
 		handle_get_version_reply(monc, msg);
 		break;
 
+	case CEPH_MSG_MON_COMMAND_ACK:
+		handle_command_ack(monc, msg);
+		break;
+
 	case CEPH_MSG_MON_MAP:
 		ceph_monc_handle_map(monc, msg);
 		break;
@@ -1178,6 +1259,7 @@ static struct ceph_msg *mon_alloc_msg(struct ceph_connection *con,
 		m = ceph_msg_get(monc->m_subscribe_ack);
 		break;
 	case CEPH_MSG_STATFS_REPLY:
+	case CEPH_MSG_MON_COMMAND_ACK:
 		return get_generic_reply(con, hdr, skip);
 	case CEPH_MSG_AUTH_REPLY:
 		m = ceph_msg_get(monc->m_auth_reply);
-- 
2.4.3


^ permalink raw reply related	[flat|nested] 29+ messages in thread

* [PATCH 07/16] libceph: rename ceph_client_id() -> ceph_client_gid()
  2016-08-24 13:18 [PATCH 00/16] rbd: support for exclusive-lock + mpath remap bits Ilya Dryomov
                   ` (5 preceding siblings ...)
  2016-08-24 13:18 ` [PATCH 06/16] libceph: support for blacklisting clients Ilya Dryomov
@ 2016-08-24 13:18 ` Ilya Dryomov
  2016-08-24 20:00   ` Alex Elder
  2016-08-24 13:18 ` [PATCH 08/16] rbd: introduce a per-device ordered workqueue Ilya Dryomov
                   ` (9 subsequent siblings)
  16 siblings, 1 reply; 29+ messages in thread
From: Ilya Dryomov @ 2016-08-24 13:18 UTC (permalink / raw)
  To: ceph-devel

Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
---
 drivers/block/rbd.c          | 2 +-
 include/linux/ceph/libceph.h | 2 +-
 net/ceph/ceph_common.c       | 7 ++++---
 3 files changed, 6 insertions(+), 5 deletions(-)

diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index 6c6519f6492a..e0585e9040f1 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -3758,7 +3758,7 @@ static ssize_t rbd_client_id_show(struct device *dev,
 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
 
 	return sprintf(buf, "client%lld\n",
-			ceph_client_id(rbd_dev->rbd_client->client));
+		       ceph_client_gid(rbd_dev->rbd_client->client));
 }
 
 static ssize_t rbd_pool_show(struct device *dev,
diff --git a/include/linux/ceph/libceph.h b/include/linux/ceph/libceph.h
index 83fc1fff7061..b4cffff70e44 100644
--- a/include/linux/ceph/libceph.h
+++ b/include/linux/ceph/libceph.h
@@ -264,7 +264,7 @@ extern struct ceph_client *ceph_create_client(struct ceph_options *opt,
 					      void *private,
 					      u64 supported_features,
 					      u64 required_features);
-extern u64 ceph_client_id(struct ceph_client *client);
+u64 ceph_client_gid(struct ceph_client *client);
 extern void ceph_destroy_client(struct ceph_client *client);
 extern int __ceph_open_session(struct ceph_client *client,
 			       unsigned long started);
diff --git a/net/ceph/ceph_common.c b/net/ceph/ceph_common.c
index bddfcf6f09c2..8a7921767308 100644
--- a/net/ceph/ceph_common.c
+++ b/net/ceph/ceph_common.c
@@ -566,11 +566,11 @@ int ceph_print_client_options(struct seq_file *m, struct ceph_client *client)
 }
 EXPORT_SYMBOL(ceph_print_client_options);
 
-u64 ceph_client_id(struct ceph_client *client)
+u64 ceph_client_gid(struct ceph_client *client)
 {
 	return client->monc.auth->global_id;
 }
-EXPORT_SYMBOL(ceph_client_id);
+EXPORT_SYMBOL(ceph_client_gid);
 
 /*
  * create a fresh client instance
@@ -685,7 +685,8 @@ int __ceph_open_session(struct ceph_client *client, unsigned long started)
 			return client->auth_err;
 	}
 
-	pr_info("client%llu fsid %pU\n", ceph_client_id(client), &client->fsid);
+	pr_info("client%llu fsid %pU\n", ceph_client_gid(client),
+		&client->fsid);
 	ceph_debugfs_client_init(client);
 
 	return 0;
-- 
2.4.3


^ permalink raw reply related	[flat|nested] 29+ messages in thread

* [PATCH 08/16] rbd: introduce a per-device ordered workqueue
  2016-08-24 13:18 [PATCH 00/16] rbd: support for exclusive-lock + mpath remap bits Ilya Dryomov
                   ` (6 preceding siblings ...)
  2016-08-24 13:18 ` [PATCH 07/16] libceph: rename ceph_client_id() -> ceph_client_gid() Ilya Dryomov
@ 2016-08-24 13:18 ` Ilya Dryomov
  2016-08-24 13:18 ` [PATCH 09/16] rbd: retry watch re-registration periodically Ilya Dryomov
                   ` (8 subsequent siblings)
  16 siblings, 0 replies; 29+ messages in thread
From: Ilya Dryomov @ 2016-08-24 13:18 UTC (permalink / raw)
  To: ceph-devel

This is going to be used for reregistering watch requests and
exclusive-lock tasks: acquire/request lock, notify-acquired, release
lock, notify-released.  Some refactoring in the map/unmap paths was
necessary to give this workqueue a meaningful name: "rbdX-tasks".

Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
---
 drivers/block/rbd.c | 151 ++++++++++++++++++++++++----------------------------
 1 file changed, 71 insertions(+), 80 deletions(-)

diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index e0585e9040f1..1c805eea6767 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -128,11 +128,8 @@ static int atomic_dec_return_safe(atomic_t *v)
 /*
  * An RBD device name will be "rbd#", where the "rbd" comes from
  * RBD_DRV_NAME above, and # is a unique integer identifier.
- * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
- * enough to hold all possible device names.
  */
 #define DEV_NAME_LEN		32
-#define MAX_INT_FORMAT_WIDTH	((5 * sizeof (int)) / 2 + 1)
 
 /*
  * block device image metadata (in-memory version)
@@ -353,10 +350,12 @@ struct rbd_device {
 	struct ceph_object_id	header_oid;
 	struct ceph_object_locator header_oloc;
 
-	struct ceph_file_layout	layout;
+	struct ceph_file_layout	layout;		/* used for all rbd requests */
 
 	struct ceph_osd_linger_request *watch_handle;
 
+	struct workqueue_struct	*task_wq;
+
 	struct rbd_spec		*parent_spec;
 	u64			parent_overlap;
 	atomic_t		parent_ref;
@@ -3944,11 +3943,8 @@ static void rbd_spec_free(struct kref *kref)
 	kfree(spec);
 }
 
-static void rbd_dev_release(struct device *dev)
+static void rbd_dev_free(struct rbd_device *rbd_dev)
 {
-	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
-	bool need_put = !!rbd_dev->opts;
-
 	ceph_oid_destroy(&rbd_dev->header_oid);
 	ceph_oloc_destroy(&rbd_dev->header_oloc);
 
@@ -3956,6 +3952,19 @@ static void rbd_dev_release(struct device *dev)
 	rbd_spec_put(rbd_dev->spec);
 	kfree(rbd_dev->opts);
 	kfree(rbd_dev);
+}
+
+static void rbd_dev_release(struct device *dev)
+{
+	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
+	bool need_put = !!rbd_dev->opts;
+
+	if (need_put) {
+		destroy_workqueue(rbd_dev->task_wq);
+		ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
+	}
+
+	rbd_dev_free(rbd_dev);
 
 	/*
 	 * This is racy, but way better than putting module outside of
@@ -3966,19 +3975,16 @@ static void rbd_dev_release(struct device *dev)
 		module_put(THIS_MODULE);
 }
 
-static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
-					 struct rbd_spec *spec,
-					 struct rbd_options *opts)
+static struct rbd_device *__rbd_dev_create(struct rbd_client *rbdc,
+					   struct rbd_spec *spec)
 {
 	struct rbd_device *rbd_dev;
 
-	rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
+	rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
 	if (!rbd_dev)
 		return NULL;
 
 	spin_lock_init(&rbd_dev->lock);
-	rbd_dev->flags = 0;
-	atomic_set(&rbd_dev->parent_ref, 0);
 	INIT_LIST_HEAD(&rbd_dev->node);
 	init_rwsem(&rbd_dev->header_rwsem);
 
@@ -3992,9 +3998,6 @@ static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
 
 	rbd_dev->rbd_client = rbdc;
 	rbd_dev->spec = spec;
-	rbd_dev->opts = opts;
-
-	/* Initialize the layout used for all rbd requests */
 
 	rbd_dev->layout.stripe_unit = 1 << RBD_MAX_OBJ_ORDER;
 	rbd_dev->layout.stripe_count = 1;
@@ -4002,15 +4005,48 @@ static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
 	rbd_dev->layout.pool_id = spec->pool_id;
 	RCU_INIT_POINTER(rbd_dev->layout.pool_ns, NULL);
 
-	/*
-	 * If this is a mapping rbd_dev (as opposed to a parent one),
-	 * pin our module.  We have a ref from do_rbd_add(), so use
-	 * __module_get().
-	 */
-	if (rbd_dev->opts)
-		__module_get(THIS_MODULE);
+	return rbd_dev;
+}
+
+/*
+ * Create a mapping rbd_dev.
+ */
+static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
+					 struct rbd_spec *spec,
+					 struct rbd_options *opts)
+{
+	struct rbd_device *rbd_dev;
+
+	rbd_dev = __rbd_dev_create(rbdc, spec);
+	if (!rbd_dev)
+		return NULL;
+
+	rbd_dev->opts = opts;
+
+	/* get an id and fill in device name */
+	rbd_dev->dev_id = ida_simple_get(&rbd_dev_id_ida, 0,
+					 minor_to_rbd_dev_id(1 << MINORBITS),
+					 GFP_KERNEL);
+	if (rbd_dev->dev_id < 0)
+		goto fail_rbd_dev;
+
+	sprintf(rbd_dev->name, RBD_DRV_NAME "%d", rbd_dev->dev_id);
+	rbd_dev->task_wq = alloc_ordered_workqueue("%s-tasks", WQ_MEM_RECLAIM,
+						   rbd_dev->name);
+	if (!rbd_dev->task_wq)
+		goto fail_dev_id;
 
+	/* we have a ref from do_rbd_add() */
+	__module_get(THIS_MODULE);
+
+	dout("%s rbd_dev %p dev_id %d\n", __func__, rbd_dev, rbd_dev->dev_id);
 	return rbd_dev;
+
+fail_dev_id:
+	ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
+fail_rbd_dev:
+	rbd_dev_free(rbd_dev);
+	return NULL;
 }
 
 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
@@ -4646,46 +4682,6 @@ static int rbd_dev_header_info(struct rbd_device *rbd_dev)
 }
 
 /*
- * Get a unique rbd identifier for the given new rbd_dev, and add
- * the rbd_dev to the global list.
- */
-static int rbd_dev_id_get(struct rbd_device *rbd_dev)
-{
-	int new_dev_id;
-
-	new_dev_id = ida_simple_get(&rbd_dev_id_ida,
-				    0, minor_to_rbd_dev_id(1 << MINORBITS),
-				    GFP_KERNEL);
-	if (new_dev_id < 0)
-		return new_dev_id;
-
-	rbd_dev->dev_id = new_dev_id;
-
-	spin_lock(&rbd_dev_list_lock);
-	list_add_tail(&rbd_dev->node, &rbd_dev_list);
-	spin_unlock(&rbd_dev_list_lock);
-
-	dout("rbd_dev %p given dev id %d\n", rbd_dev, rbd_dev->dev_id);
-
-	return 0;
-}
-
-/*
- * Remove an rbd_dev from the global list, and record that its
- * identifier is no longer in use.
- */
-static void rbd_dev_id_put(struct rbd_device *rbd_dev)
-{
-	spin_lock(&rbd_dev_list_lock);
-	list_del_init(&rbd_dev->node);
-	spin_unlock(&rbd_dev_list_lock);
-
-	ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
-
-	dout("rbd_dev %p released dev id %d\n", rbd_dev, rbd_dev->dev_id);
-}
-
-/*
  * Skips over white space at *buf, and updates *buf to point to the
  * first found non-space character (if any). Returns the length of
  * the token (string of non-white space characters) found.  Note
@@ -5077,8 +5073,7 @@ static int rbd_dev_probe_parent(struct rbd_device *rbd_dev, int depth)
 		goto out_err;
 	}
 
-	parent = rbd_dev_create(rbd_dev->rbd_client, rbd_dev->parent_spec,
-				NULL);
+	parent = __rbd_dev_create(rbd_dev->rbd_client, rbd_dev->parent_spec);
 	if (!parent) {
 		ret = -ENOMEM;
 		goto out_err;
@@ -5113,22 +5108,12 @@ static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
 {
 	int ret;
 
-	/* Get an id and fill in device name. */
-
-	ret = rbd_dev_id_get(rbd_dev);
-	if (ret)
-		goto err_out_unlock;
-
-	BUILD_BUG_ON(DEV_NAME_LEN
-			< sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
-	sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
-
 	/* Record our major and minor device numbers. */
 
 	if (!single_major) {
 		ret = register_blkdev(0, rbd_dev->name);
 		if (ret < 0)
-			goto err_out_id;
+			goto err_out_unlock;
 
 		rbd_dev->major = ret;
 		rbd_dev->minor = 0;
@@ -5160,6 +5145,10 @@ static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
 	set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
 	up_write(&rbd_dev->header_rwsem);
 
+	spin_lock(&rbd_dev_list_lock);
+	list_add_tail(&rbd_dev->node, &rbd_dev_list);
+	spin_unlock(&rbd_dev_list_lock);
+
 	add_disk(rbd_dev->disk);
 	pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
 		(unsigned long long) rbd_dev->mapping.size);
@@ -5173,8 +5162,6 @@ err_out_disk:
 err_out_blkdev:
 	if (!single_major)
 		unregister_blkdev(rbd_dev->major, rbd_dev->name);
-err_out_id:
-	rbd_dev_id_put(rbd_dev);
 err_out_unlock:
 	up_write(&rbd_dev->header_rwsem);
 	return ret;
@@ -5406,12 +5393,16 @@ static ssize_t rbd_add_single_major(struct bus_type *bus,
 static void rbd_dev_device_release(struct rbd_device *rbd_dev)
 {
 	rbd_free_disk(rbd_dev);
+
+	spin_lock(&rbd_dev_list_lock);
+	list_del_init(&rbd_dev->node);
+	spin_unlock(&rbd_dev_list_lock);
+
 	clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
 	device_del(&rbd_dev->dev);
 	rbd_dev_mapping_clear(rbd_dev);
 	if (!single_major)
 		unregister_blkdev(rbd_dev->major, rbd_dev->name);
-	rbd_dev_id_put(rbd_dev);
 }
 
 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
-- 
2.4.3


^ permalink raw reply related	[flat|nested] 29+ messages in thread

* [PATCH 09/16] rbd: retry watch re-registration periodically
  2016-08-24 13:18 [PATCH 00/16] rbd: support for exclusive-lock + mpath remap bits Ilya Dryomov
                   ` (7 preceding siblings ...)
  2016-08-24 13:18 ` [PATCH 08/16] rbd: introduce a per-device ordered workqueue Ilya Dryomov
@ 2016-08-24 13:18 ` Ilya Dryomov
  2016-08-24 13:18 ` [PATCH 10/16] rbd: support for exclusive-lock feature Ilya Dryomov
                   ` (7 subsequent siblings)
  16 siblings, 0 replies; 29+ messages in thread
From: Ilya Dryomov @ 2016-08-24 13:18 UTC (permalink / raw)
  To: ceph-devel

Revamp watch code to support retrying watch re-registration:

- add rbd_dev->watch_state for more robust errcb handling
- store watch cookie separately to avoid dereferencing watch_handle
  which is set to NULL on unwatch
- move re-register code into a delayed work and retry re-registration
  every second, unless the client is blacklisted

Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
---
 drivers/block/rbd.c   | 138 +++++++++++++++++++++++++++++++++++++++-----------
 net/ceph/osd_client.c |   1 +
 2 files changed, 110 insertions(+), 29 deletions(-)

diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index 1c805eea6767..cb96fb19e8a7 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -114,6 +114,8 @@ static int atomic_dec_return_safe(atomic_t *v)
 
 #define RBD_OBJ_PREFIX_LEN_MAX	64
 
+#define RBD_RETRY_DELAY		msecs_to_jiffies(1000)
+
 /* Feature bits */
 
 #define RBD_FEATURE_LAYERING	(1<<0)
@@ -319,6 +321,12 @@ struct rbd_img_request {
 #define for_each_obj_request_safe(ireq, oreq, n) \
 	list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
 
+enum rbd_watch_state {
+	RBD_WATCH_STATE_UNREGISTERED,
+	RBD_WATCH_STATE_REGISTERED,
+	RBD_WATCH_STATE_ERROR,
+};
+
 struct rbd_mapping {
 	u64                     size;
 	u64                     features;
@@ -352,7 +360,11 @@ struct rbd_device {
 
 	struct ceph_file_layout	layout;		/* used for all rbd requests */
 
+	struct mutex		watch_mutex;
+	enum rbd_watch_state	watch_state;
 	struct ceph_osd_linger_request *watch_handle;
+	u64			watch_cookie;
+	struct delayed_work	watch_dwork;
 
 	struct workqueue_struct	*task_wq;
 
@@ -3083,9 +3095,6 @@ out_err:
 	obj_request_done_set(obj_request);
 }
 
-static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev);
-static void __rbd_dev_header_unwatch_sync(struct rbd_device *rbd_dev);
-
 static void rbd_watch_cb(void *arg, u64 notify_id, u64 cookie,
 			 u64 notifier_id, void *data, size_t data_len)
 {
@@ -3113,35 +3122,34 @@ static void rbd_watch_cb(void *arg, u64 notify_id, u64 cookie,
 		rbd_warn(rbd_dev, "notify_ack ret %d", ret);
 }
 
+static void __rbd_unregister_watch(struct rbd_device *rbd_dev);
+
 static void rbd_watch_errcb(void *arg, u64 cookie, int err)
 {
 	struct rbd_device *rbd_dev = arg;
-	int ret;
 
 	rbd_warn(rbd_dev, "encountered watch error: %d", err);
 
-	__rbd_dev_header_unwatch_sync(rbd_dev);
+	mutex_lock(&rbd_dev->watch_mutex);
+	if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED) {
+		__rbd_unregister_watch(rbd_dev);
+		rbd_dev->watch_state = RBD_WATCH_STATE_ERROR;
 
-	ret = rbd_dev_header_watch_sync(rbd_dev);
-	if (ret) {
-		rbd_warn(rbd_dev, "failed to reregister watch: %d", ret);
-		return;
+		queue_delayed_work(rbd_dev->task_wq, &rbd_dev->watch_dwork, 0);
 	}
-
-	ret = rbd_dev_refresh(rbd_dev);
-	if (ret)
-		rbd_warn(rbd_dev, "reregisteration refresh failed: %d", ret);
+	mutex_unlock(&rbd_dev->watch_mutex);
 }
 
 /*
- * Initiate a watch request, synchronously.
+ * watch_mutex must be locked
  */
-static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev)
+static int __rbd_register_watch(struct rbd_device *rbd_dev)
 {
 	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
 	struct ceph_osd_linger_request *handle;
 
 	rbd_assert(!rbd_dev->watch_handle);
+	dout("%s rbd_dev %p\n", __func__, rbd_dev);
 
 	handle = ceph_osdc_watch(osdc, &rbd_dev->header_oid,
 				 &rbd_dev->header_oloc, rbd_watch_cb,
@@ -3153,13 +3161,16 @@ static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev)
 	return 0;
 }
 
-static void __rbd_dev_header_unwatch_sync(struct rbd_device *rbd_dev)
+/*
+ * watch_mutex must be locked
+ */
+static void __rbd_unregister_watch(struct rbd_device *rbd_dev)
 {
 	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
 	int ret;
 
-	if (!rbd_dev->watch_handle)
-		return;
+	rbd_assert(rbd_dev->watch_handle);
+	dout("%s rbd_dev %p\n", __func__, rbd_dev);
 
 	ret = ceph_osdc_unwatch(osdc, rbd_dev->watch_handle);
 	if (ret)
@@ -3168,17 +3179,80 @@ static void __rbd_dev_header_unwatch_sync(struct rbd_device *rbd_dev)
 	rbd_dev->watch_handle = NULL;
 }
 
-/*
- * Tear down a watch request, synchronously.
- */
-static void rbd_dev_header_unwatch_sync(struct rbd_device *rbd_dev)
+static int rbd_register_watch(struct rbd_device *rbd_dev)
+{
+	int ret;
+
+	mutex_lock(&rbd_dev->watch_mutex);
+	rbd_assert(rbd_dev->watch_state == RBD_WATCH_STATE_UNREGISTERED);
+	ret = __rbd_register_watch(rbd_dev);
+	if (ret)
+		goto out;
+
+	rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
+	rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
+
+out:
+	mutex_unlock(&rbd_dev->watch_mutex);
+	return ret;
+}
+
+static void cancel_tasks_sync(struct rbd_device *rbd_dev)
 {
-	__rbd_dev_header_unwatch_sync(rbd_dev);
+	dout("%s rbd_dev %p\n", __func__, rbd_dev);
+
+	cancel_delayed_work_sync(&rbd_dev->watch_dwork);
+}
+
+static void rbd_unregister_watch(struct rbd_device *rbd_dev)
+{
+	cancel_tasks_sync(rbd_dev);
+
+	mutex_lock(&rbd_dev->watch_mutex);
+	if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED)
+		__rbd_unregister_watch(rbd_dev);
+	rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED;
+	mutex_unlock(&rbd_dev->watch_mutex);
 
-	dout("%s flushing notifies\n", __func__);
 	ceph_osdc_flush_notifies(&rbd_dev->rbd_client->client->osdc);
 }
 
+static void rbd_reregister_watch(struct work_struct *work)
+{
+	struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
+					    struct rbd_device, watch_dwork);
+	int ret;
+
+	dout("%s rbd_dev %p\n", __func__, rbd_dev);
+
+	mutex_lock(&rbd_dev->watch_mutex);
+	if (rbd_dev->watch_state != RBD_WATCH_STATE_ERROR)
+		goto fail_unlock;
+
+	ret = __rbd_register_watch(rbd_dev);
+	if (ret) {
+		rbd_warn(rbd_dev, "failed to reregister watch: %d", ret);
+		if (ret != -EBLACKLISTED)
+			queue_delayed_work(rbd_dev->task_wq,
+					   &rbd_dev->watch_dwork,
+					   RBD_RETRY_DELAY);
+		goto fail_unlock;
+	}
+
+	rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
+	rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
+	mutex_unlock(&rbd_dev->watch_mutex);
+
+	ret = rbd_dev_refresh(rbd_dev);
+	if (ret)
+		rbd_warn(rbd_dev, "reregisteration refresh failed: %d", ret);
+
+	return;
+
+fail_unlock:
+	mutex_unlock(&rbd_dev->watch_mutex);
+}
+
 /*
  * Synchronous osd object method call.  Returns the number of bytes
  * returned in the outbound buffer, or a negative error code.
@@ -3945,6 +4019,8 @@ static void rbd_spec_free(struct kref *kref)
 
 static void rbd_dev_free(struct rbd_device *rbd_dev)
 {
+	WARN_ON(rbd_dev->watch_state != RBD_WATCH_STATE_UNREGISTERED);
+
 	ceph_oid_destroy(&rbd_dev->header_oid);
 	ceph_oloc_destroy(&rbd_dev->header_oloc);
 
@@ -3991,6 +4067,10 @@ static struct rbd_device *__rbd_dev_create(struct rbd_client *rbdc,
 	ceph_oid_init(&rbd_dev->header_oid);
 	ceph_oloc_init(&rbd_dev->header_oloc);
 
+	mutex_init(&rbd_dev->watch_mutex);
+	rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED;
+	INIT_DELAYED_WORK(&rbd_dev->watch_dwork, rbd_reregister_watch);
+
 	rbd_dev->dev.bus = &rbd_bus_type;
 	rbd_dev->dev.type = &rbd_device_type;
 	rbd_dev->dev.parent = &rbd_root_dev;
@@ -5222,7 +5302,7 @@ static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth)
 		goto err_out_format;
 
 	if (!depth) {
-		ret = rbd_dev_header_watch_sync(rbd_dev);
+		ret = rbd_register_watch(rbd_dev);
 		if (ret) {
 			if (ret == -ENOENT)
 				pr_info("image %s/%s does not exist\n",
@@ -5281,7 +5361,7 @@ err_out_probe:
 	rbd_dev_unprobe(rbd_dev);
 err_out_watch:
 	if (!depth)
-		rbd_dev_header_unwatch_sync(rbd_dev);
+		rbd_unregister_watch(rbd_dev);
 err_out_format:
 	rbd_dev->image_format = 0;
 	kfree(rbd_dev->spec->image_id);
@@ -5348,11 +5428,11 @@ static ssize_t do_rbd_add(struct bus_type *bus,
 	rc = rbd_dev_device_setup(rbd_dev);
 	if (rc) {
 		/*
-		 * rbd_dev_header_unwatch_sync() can't be moved into
+		 * rbd_unregister_watch() can't be moved into
 		 * rbd_dev_image_release() without refactoring, see
 		 * commit 1f3ef78861ac.
 		 */
-		rbd_dev_header_unwatch_sync(rbd_dev);
+		rbd_unregister_watch(rbd_dev);
 		rbd_dev_image_release(rbd_dev);
 		goto out;
 	}
@@ -5473,7 +5553,7 @@ static ssize_t do_rbd_remove(struct bus_type *bus,
 	if (ret < 0 || already)
 		return ret;
 
-	rbd_dev_header_unwatch_sync(rbd_dev);
+	rbd_unregister_watch(rbd_dev);
 
 	/*
 	 * Don't free anything from rbd_dev->disk until after all
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index fbc6b7090c65..d9bf7a1d0a58 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -4014,6 +4014,7 @@ EXPORT_SYMBOL(ceph_osdc_list_watchers);
  */
 void ceph_osdc_flush_notifies(struct ceph_osd_client *osdc)
 {
+	dout("%s osdc %p\n", __func__, osdc);
 	flush_workqueue(osdc->notify_wq);
 }
 EXPORT_SYMBOL(ceph_osdc_flush_notifies);
-- 
2.4.3


^ permalink raw reply related	[flat|nested] 29+ messages in thread

* [PATCH 10/16] rbd: support for exclusive-lock feature
  2016-08-24 13:18 [PATCH 00/16] rbd: support for exclusive-lock + mpath remap bits Ilya Dryomov
                   ` (8 preceding siblings ...)
  2016-08-24 13:18 ` [PATCH 09/16] rbd: retry watch re-registration periodically Ilya Dryomov
@ 2016-08-24 13:18 ` Ilya Dryomov
  2016-08-24 13:18 ` [PATCH 11/16] rbd: print capacity in decimal and features in hex Ilya Dryomov
                   ` (6 subsequent siblings)
  16 siblings, 0 replies; 29+ messages in thread
From: Ilya Dryomov @ 2016-08-24 13:18 UTC (permalink / raw)
  To: ceph-devel

Add basic support for RBD_FEATURE_EXCLUSIVE_LOCK feature.  Maintenance
operations (resize, snapshot create, etc) are offloaded to librbd via
returning -EOPNOTSUPP - librbd should request the lock and execute the
operation.

Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
---
 drivers/block/rbd.c       | 812 +++++++++++++++++++++++++++++++++++++++++++++-
 drivers/block/rbd_types.h |  11 +
 net/ceph/ceph_strings.c   |   1 +
 3 files changed, 808 insertions(+), 16 deletions(-)

diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index cb96fb19e8a7..7cda1cc60c2c 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -31,6 +31,7 @@
 #include <linux/ceph/libceph.h>
 #include <linux/ceph/osd_client.h>
 #include <linux/ceph/mon_client.h>
+#include <linux/ceph/cls_lock_client.h>
 #include <linux/ceph/decode.h>
 #include <linux/parser.h>
 #include <linux/bsearch.h>
@@ -114,14 +115,17 @@ static int atomic_dec_return_safe(atomic_t *v)
 
 #define RBD_OBJ_PREFIX_LEN_MAX	64
 
+#define RBD_NOTIFY_TIMEOUT	5	/* seconds */
 #define RBD_RETRY_DELAY		msecs_to_jiffies(1000)
 
 /* Feature bits */
 
 #define RBD_FEATURE_LAYERING	(1<<0)
 #define RBD_FEATURE_STRIPINGV2	(1<<1)
-#define RBD_FEATURES_ALL \
-	    (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2)
+#define RBD_FEATURE_EXCLUSIVE_LOCK (1<<2)
+#define RBD_FEATURES_ALL	(RBD_FEATURE_LAYERING |		\
+				 RBD_FEATURE_STRIPINGV2 |	\
+				 RBD_FEATURE_EXCLUSIVE_LOCK)
 
 /* Features supported by this (client software) implementation. */
 
@@ -327,6 +331,18 @@ enum rbd_watch_state {
 	RBD_WATCH_STATE_ERROR,
 };
 
+enum rbd_lock_state {
+	RBD_LOCK_STATE_UNLOCKED,
+	RBD_LOCK_STATE_LOCKED,
+	RBD_LOCK_STATE_RELEASING,
+};
+
+/* WatchNotify::ClientId */
+struct rbd_client_id {
+	u64 gid;
+	u64 handle;
+};
+
 struct rbd_mapping {
 	u64                     size;
 	u64                     features;
@@ -366,6 +382,15 @@ struct rbd_device {
 	u64			watch_cookie;
 	struct delayed_work	watch_dwork;
 
+	struct rw_semaphore	lock_rwsem;
+	enum rbd_lock_state	lock_state;
+	struct rbd_client_id	owner_cid;
+	struct work_struct	acquired_lock_work;
+	struct work_struct	released_lock_work;
+	struct delayed_work	lock_dwork;
+	struct work_struct	unlock_work;
+	wait_queue_head_t	lock_waitq;
+
 	struct workqueue_struct	*task_wq;
 
 	struct rbd_spec		*parent_spec;
@@ -450,6 +475,29 @@ static int minor_to_rbd_dev_id(int minor)
 	return minor >> RBD_SINGLE_MAJOR_PART_SHIFT;
 }
 
+static bool rbd_is_lock_supported(struct rbd_device *rbd_dev)
+{
+	return (rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK) &&
+	       rbd_dev->spec->snap_id == CEPH_NOSNAP &&
+	       !rbd_dev->mapping.read_only;
+}
+
+static bool __rbd_is_lock_owner(struct rbd_device *rbd_dev)
+{
+	return rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED ||
+	       rbd_dev->lock_state == RBD_LOCK_STATE_RELEASING;
+}
+
+static bool rbd_is_lock_owner(struct rbd_device *rbd_dev)
+{
+	bool is_lock_owner;
+
+	down_read(&rbd_dev->lock_rwsem);
+	is_lock_owner = __rbd_is_lock_owner(rbd_dev);
+	up_read(&rbd_dev->lock_rwsem);
+	return is_lock_owner;
+}
+
 static BUS_ATTR(add, S_IWUSR, NULL, rbd_add);
 static BUS_ATTR(remove, S_IWUSR, NULL, rbd_remove);
 static BUS_ATTR(add_single_major, S_IWUSR, NULL, rbd_add_single_major);
@@ -3095,31 +3143,690 @@ out_err:
 	obj_request_done_set(obj_request);
 }
 
-static void rbd_watch_cb(void *arg, u64 notify_id, u64 cookie,
-			 u64 notifier_id, void *data, size_t data_len)
+static const struct rbd_client_id rbd_empty_cid;
+
+static bool rbd_cid_equal(const struct rbd_client_id *lhs,
+			  const struct rbd_client_id *rhs)
+{
+	return lhs->gid == rhs->gid && lhs->handle == rhs->handle;
+}
+
+static struct rbd_client_id rbd_get_cid(struct rbd_device *rbd_dev)
+{
+	struct rbd_client_id cid;
+
+	mutex_lock(&rbd_dev->watch_mutex);
+	cid.gid = ceph_client_gid(rbd_dev->rbd_client->client);
+	cid.handle = rbd_dev->watch_cookie;
+	mutex_unlock(&rbd_dev->watch_mutex);
+	return cid;
+}
+
+/*
+ * lock_rwsem must be held for write
+ */
+static void rbd_set_owner_cid(struct rbd_device *rbd_dev,
+			      const struct rbd_client_id *cid)
+{
+	dout("%s rbd_dev %p %llu-%llu -> %llu-%llu\n", __func__, rbd_dev,
+	     rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle,
+	     cid->gid, cid->handle);
+	rbd_dev->owner_cid = *cid; /* struct */
+}
+
+static void format_lock_cookie(struct rbd_device *rbd_dev, char *buf)
+{
+	mutex_lock(&rbd_dev->watch_mutex);
+	sprintf(buf, "%s %llu", RBD_LOCK_COOKIE_PREFIX, rbd_dev->watch_cookie);
+	mutex_unlock(&rbd_dev->watch_mutex);
+}
+
+/*
+ * lock_rwsem must be held for write
+ */
+static int rbd_lock(struct rbd_device *rbd_dev)
+{
+	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
+	struct rbd_client_id cid = rbd_get_cid(rbd_dev);
+	char cookie[32];
+	int ret;
+
+	WARN_ON(__rbd_is_lock_owner(rbd_dev));
+
+	format_lock_cookie(rbd_dev, cookie);
+	ret = ceph_cls_lock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
+			    RBD_LOCK_NAME, CEPH_CLS_LOCK_EXCLUSIVE, cookie,
+			    RBD_LOCK_TAG, "", 0);
+	if (ret)
+		return ret;
+
+	rbd_dev->lock_state = RBD_LOCK_STATE_LOCKED;
+	rbd_set_owner_cid(rbd_dev, &cid);
+	queue_work(rbd_dev->task_wq, &rbd_dev->acquired_lock_work);
+	return 0;
+}
+
+/*
+ * lock_rwsem must be held for write
+ */
+static int rbd_unlock(struct rbd_device *rbd_dev)
 {
-	struct rbd_device *rbd_dev = arg;
 	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
+	char cookie[32];
 	int ret;
 
-	dout("%s rbd_dev %p cookie %llu notify_id %llu\n", __func__, rbd_dev,
-	     cookie, notify_id);
+	WARN_ON(!__rbd_is_lock_owner(rbd_dev));
+
+	rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
+
+	format_lock_cookie(rbd_dev, cookie);
+	ret = ceph_cls_unlock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
+			      RBD_LOCK_NAME, cookie);
+	if (ret && ret != -ENOENT) {
+		rbd_warn(rbd_dev, "cls_unlock failed: %d", ret);
+		return ret;
+	}
+
+	rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
+	queue_work(rbd_dev->task_wq, &rbd_dev->released_lock_work);
+	return 0;
+}
+
+static int __rbd_notify_op_lock(struct rbd_device *rbd_dev,
+				enum rbd_notify_op notify_op,
+				struct page ***preply_pages,
+				size_t *preply_len)
+{
+	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
+	struct rbd_client_id cid = rbd_get_cid(rbd_dev);
+	int buf_size = 4 + 8 + 8 + CEPH_ENCODING_START_BLK_LEN;
+	char buf[buf_size];
+	void *p = buf;
+
+	dout("%s rbd_dev %p notify_op %d\n", __func__, rbd_dev, notify_op);
+
+	/* encode *LockPayload NotifyMessage (op + ClientId) */
+	ceph_start_encoding(&p, 2, 1, buf_size - CEPH_ENCODING_START_BLK_LEN);
+	ceph_encode_32(&p, notify_op);
+	ceph_encode_64(&p, cid.gid);
+	ceph_encode_64(&p, cid.handle);
+
+	return ceph_osdc_notify(osdc, &rbd_dev->header_oid,
+				&rbd_dev->header_oloc, buf, buf_size,
+				RBD_NOTIFY_TIMEOUT, preply_pages, preply_len);
+}
+
+static void rbd_notify_op_lock(struct rbd_device *rbd_dev,
+			       enum rbd_notify_op notify_op)
+{
+	struct page **reply_pages;
+	size_t reply_len;
+
+	__rbd_notify_op_lock(rbd_dev, notify_op, &reply_pages, &reply_len);
+	ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len));
+}
+
+static void rbd_notify_acquired_lock(struct work_struct *work)
+{
+	struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
+						  acquired_lock_work);
+
+	rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_ACQUIRED_LOCK);
+}
+
+static void rbd_notify_released_lock(struct work_struct *work)
+{
+	struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
+						  released_lock_work);
+
+	rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_RELEASED_LOCK);
+}
+
+static int rbd_request_lock(struct rbd_device *rbd_dev)
+{
+	struct page **reply_pages;
+	size_t reply_len;
+	bool lock_owner_responded = false;
+	int ret;
 
+	dout("%s rbd_dev %p\n", __func__, rbd_dev);
+
+	ret = __rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_REQUEST_LOCK,
+				   &reply_pages, &reply_len);
+	if (ret && ret != -ETIMEDOUT) {
+		rbd_warn(rbd_dev, "failed to request lock: %d", ret);
+		goto out;
+	}
+
+	if (reply_len > 0 && reply_len <= PAGE_SIZE) {
+		void *p = page_address(reply_pages[0]);
+		void *const end = p + reply_len;
+		u32 n;
+
+		ceph_decode_32_safe(&p, end, n, e_inval); /* num_acks */
+		while (n--) {
+			u8 struct_v;
+			u32 len;
+
+			ceph_decode_need(&p, end, 8 + 8, e_inval);
+			p += 8 + 8; /* skip gid and cookie */
+
+			ceph_decode_32_safe(&p, end, len, e_inval);
+			if (!len)
+				continue;
+
+			if (lock_owner_responded) {
+				rbd_warn(rbd_dev,
+					 "duplicate lock owners detected");
+				ret = -EIO;
+				goto out;
+			}
+
+			lock_owner_responded = true;
+			ret = ceph_start_decoding(&p, end, 1, "ResponseMessage",
+						  &struct_v, &len);
+			if (ret) {
+				rbd_warn(rbd_dev,
+					 "failed to decode ResponseMessage: %d",
+					 ret);
+				goto e_inval;
+			}
+
+			ret = ceph_decode_32(&p);
+		}
+	}
+
+	if (!lock_owner_responded) {
+		rbd_warn(rbd_dev, "no lock owners detected");
+		ret = -ETIMEDOUT;
+	}
+
+out:
+	ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len));
+	return ret;
+
+e_inval:
+	ret = -EINVAL;
+	goto out;
+}
+
+static void wake_requests(struct rbd_device *rbd_dev, bool wake_all)
+{
+	dout("%s rbd_dev %p wake_all %d\n", __func__, rbd_dev, wake_all);
+
+	cancel_delayed_work(&rbd_dev->lock_dwork);
+	if (wake_all)
+		wake_up_all(&rbd_dev->lock_waitq);
+	else
+		wake_up(&rbd_dev->lock_waitq);
+}
+
+static int get_lock_owner_info(struct rbd_device *rbd_dev,
+			       struct ceph_locker **lockers, u32 *num_lockers)
+{
+	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
+	u8 lock_type;
+	char *lock_tag;
+	int ret;
+
+	dout("%s rbd_dev %p\n", __func__, rbd_dev);
+
+	ret = ceph_cls_lock_info(osdc, &rbd_dev->header_oid,
+				 &rbd_dev->header_oloc, RBD_LOCK_NAME,
+				 &lock_type, &lock_tag, lockers, num_lockers);
+	if (ret)
+		return ret;
+
+	if (*num_lockers == 0) {
+		dout("%s rbd_dev %p no lockers detected\n", __func__, rbd_dev);
+		goto out;
+	}
+
+	if (strcmp(lock_tag, RBD_LOCK_TAG)) {
+		rbd_warn(rbd_dev, "locked by external mechanism, tag %s",
+			 lock_tag);
+		ret = -EBUSY;
+		goto out;
+	}
+
+	if (lock_type == CEPH_CLS_LOCK_SHARED) {
+		rbd_warn(rbd_dev, "shared lock type detected");
+		ret = -EBUSY;
+		goto out;
+	}
+
+	if (strncmp((*lockers)[0].id.cookie, RBD_LOCK_COOKIE_PREFIX,
+		    strlen(RBD_LOCK_COOKIE_PREFIX))) {
+		rbd_warn(rbd_dev, "locked by external mechanism, cookie %s",
+			 (*lockers)[0].id.cookie);
+		ret = -EBUSY;
+		goto out;
+	}
+
+out:
+	kfree(lock_tag);
+	return ret;
+}
+
+static int find_watcher(struct rbd_device *rbd_dev,
+			const struct ceph_locker *locker)
+{
+	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
+	struct ceph_watch_item *watchers;
+	u32 num_watchers;
+	u64 cookie;
+	int i;
+	int ret;
+
+	ret = ceph_osdc_list_watchers(osdc, &rbd_dev->header_oid,
+				      &rbd_dev->header_oloc, &watchers,
+				      &num_watchers);
+	if (ret)
+		return ret;
+
+	sscanf(locker->id.cookie, RBD_LOCK_COOKIE_PREFIX " %llu", &cookie);
+	for (i = 0; i < num_watchers; i++) {
+		if (!memcmp(&watchers[i].addr, &locker->info.addr,
+			    sizeof(locker->info.addr)) &&
+		    watchers[i].cookie == cookie) {
+			struct rbd_client_id cid = {
+				.gid = le64_to_cpu(watchers[i].name.num),
+				.handle = cookie,
+			};
+
+			dout("%s rbd_dev %p found cid %llu-%llu\n", __func__,
+			     rbd_dev, cid.gid, cid.handle);
+			rbd_set_owner_cid(rbd_dev, &cid);
+			ret = 1;
+			goto out;
+		}
+	}
+
+	dout("%s rbd_dev %p no watchers\n", __func__, rbd_dev);
+	ret = 0;
+out:
+	kfree(watchers);
+	return ret;
+}
+
+/*
+ * lock_rwsem must be held for write
+ */
+static int rbd_try_lock(struct rbd_device *rbd_dev)
+{
+	struct ceph_client *client = rbd_dev->rbd_client->client;
+	struct ceph_locker *lockers;
+	u32 num_lockers;
+	int ret;
+
+	for (;;) {
+		ret = rbd_lock(rbd_dev);
+		if (ret != -EBUSY)
+			return ret;
+
+		/* determine if the current lock holder is still alive */
+		ret = get_lock_owner_info(rbd_dev, &lockers, &num_lockers);
+		if (ret)
+			return ret;
+
+		if (num_lockers == 0)
+			goto again;
+
+		ret = find_watcher(rbd_dev, lockers);
+		if (ret) {
+			if (ret > 0)
+				ret = 0; /* have to request lock */
+			goto out;
+		}
+
+		rbd_warn(rbd_dev, "%s%llu seems dead, breaking lock",
+			 ENTITY_NAME(lockers[0].id.name));
+
+		ret = ceph_monc_blacklist_add(&client->monc,
+					      &lockers[0].info.addr);
+		if (ret) {
+			rbd_warn(rbd_dev, "blacklist of %s%llu failed: %d",
+				 ENTITY_NAME(lockers[0].id.name), ret);
+			goto out;
+		}
+
+		ret = ceph_cls_break_lock(&client->osdc, &rbd_dev->header_oid,
+					  &rbd_dev->header_oloc, RBD_LOCK_NAME,
+					  lockers[0].id.cookie,
+					  &lockers[0].id.name);
+		if (ret && ret != -ENOENT)
+			goto out;
+
+again:
+		ceph_free_lockers(lockers, num_lockers);
+	}
+
+out:
+	ceph_free_lockers(lockers, num_lockers);
+	return ret;
+}
+
+/*
+ * ret is set only if lock_state is RBD_LOCK_STATE_UNLOCKED
+ */
+static enum rbd_lock_state rbd_try_acquire_lock(struct rbd_device *rbd_dev,
+						int *pret)
+{
+	enum rbd_lock_state lock_state;
+
+	down_read(&rbd_dev->lock_rwsem);
+	dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev,
+	     rbd_dev->lock_state);
+	if (__rbd_is_lock_owner(rbd_dev)) {
+		lock_state = rbd_dev->lock_state;
+		up_read(&rbd_dev->lock_rwsem);
+		return lock_state;
+	}
+
+	up_read(&rbd_dev->lock_rwsem);
+	down_write(&rbd_dev->lock_rwsem);
+	dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev,
+	     rbd_dev->lock_state);
+	if (!__rbd_is_lock_owner(rbd_dev)) {
+		*pret = rbd_try_lock(rbd_dev);
+		if (*pret)
+			rbd_warn(rbd_dev, "failed to acquire lock: %d", *pret);
+	}
+
+	lock_state = rbd_dev->lock_state;
+	up_write(&rbd_dev->lock_rwsem);
+	return lock_state;
+}
+
+static void rbd_acquire_lock(struct work_struct *work)
+{
+	struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
+					    struct rbd_device, lock_dwork);
+	enum rbd_lock_state lock_state;
+	int ret;
+
+	dout("%s rbd_dev %p\n", __func__, rbd_dev);
+again:
+	lock_state = rbd_try_acquire_lock(rbd_dev, &ret);
+	if (lock_state != RBD_LOCK_STATE_UNLOCKED || ret == -EBLACKLISTED) {
+		if (lock_state == RBD_LOCK_STATE_LOCKED)
+			wake_requests(rbd_dev, true);
+		dout("%s rbd_dev %p lock_state %d ret %d - done\n", __func__,
+		     rbd_dev, lock_state, ret);
+		return;
+	}
+
+	ret = rbd_request_lock(rbd_dev);
+	if (ret == -ETIMEDOUT) {
+		goto again; /* treat this as a dead client */
+	} else if (ret < 0) {
+		rbd_warn(rbd_dev, "error requesting lock: %d", ret);
+		mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
+				 RBD_RETRY_DELAY);
+	} else {
+		/*
+		 * lock owner acked, but resend if we don't see them
+		 * release the lock
+		 */
+		dout("%s rbd_dev %p requeueing lock_dwork\n", __func__,
+		     rbd_dev);
+		mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
+		    msecs_to_jiffies(2 * RBD_NOTIFY_TIMEOUT * MSEC_PER_SEC));
+	}
+}
+
+/*
+ * lock_rwsem must be held for write
+ */
+static bool rbd_release_lock(struct rbd_device *rbd_dev)
+{
+	dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev,
+	     rbd_dev->lock_state);
+	if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED)
+		return false;
+
+	rbd_dev->lock_state = RBD_LOCK_STATE_RELEASING;
+	downgrade_write(&rbd_dev->lock_rwsem);
 	/*
-	 * Until adequate refresh error handling is in place, there is
-	 * not much we can do here, except warn.
+	 * Ensure that all in-flight IO is flushed.
 	 *
-	 * See http://tracker.ceph.com/issues/5040
+	 * FIXME: ceph_osdc_sync() flushes the entire OSD client, which
+	 * may be shared with other devices.
 	 */
-	ret = rbd_dev_refresh(rbd_dev);
-	if (ret)
-		rbd_warn(rbd_dev, "refresh failed: %d", ret);
+	ceph_osdc_sync(&rbd_dev->rbd_client->client->osdc);
+	up_read(&rbd_dev->lock_rwsem);
+
+	down_write(&rbd_dev->lock_rwsem);
+	dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev,
+	     rbd_dev->lock_state);
+	if (rbd_dev->lock_state != RBD_LOCK_STATE_RELEASING)
+		return false;
+
+	if (!rbd_unlock(rbd_dev))
+		/*
+		 * Give others a chance to grab the lock - we would re-acquire
+		 * almost immediately if we got new IO during ceph_osdc_sync()
+		 * otherwise.  We need to ack our own notifications, so this
+		 * lock_dwork will be requeued from rbd_wait_state_locked()
+		 * after wake_requests() in rbd_handle_released_lock().
+		 */
+		cancel_delayed_work(&rbd_dev->lock_dwork);
+
+	return true;
+}
+
+static void rbd_release_lock_work(struct work_struct *work)
+{
+	struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
+						  unlock_work);
+
+	down_write(&rbd_dev->lock_rwsem);
+	rbd_release_lock(rbd_dev);
+	up_write(&rbd_dev->lock_rwsem);
+}
+
+static void rbd_handle_acquired_lock(struct rbd_device *rbd_dev, u8 struct_v,
+				     void **p)
+{
+	struct rbd_client_id cid = { 0 };
+
+	if (struct_v >= 2) {
+		cid.gid = ceph_decode_64(p);
+		cid.handle = ceph_decode_64(p);
+	}
+
+	dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
+	     cid.handle);
+	if (!rbd_cid_equal(&cid, &rbd_empty_cid)) {
+		down_write(&rbd_dev->lock_rwsem);
+		if (rbd_cid_equal(&cid, &rbd_dev->owner_cid)) {
+			/*
+			 * we already know that the remote client is
+			 * the owner
+			 */
+			up_write(&rbd_dev->lock_rwsem);
+			return;
+		}
+
+		rbd_set_owner_cid(rbd_dev, &cid);
+		downgrade_write(&rbd_dev->lock_rwsem);
+	} else {
+		down_read(&rbd_dev->lock_rwsem);
+	}
+
+	if (!__rbd_is_lock_owner(rbd_dev))
+		wake_requests(rbd_dev, false);
+	up_read(&rbd_dev->lock_rwsem);
+}
+
+static void rbd_handle_released_lock(struct rbd_device *rbd_dev, u8 struct_v,
+				     void **p)
+{
+	struct rbd_client_id cid = { 0 };
+
+	if (struct_v >= 2) {
+		cid.gid = ceph_decode_64(p);
+		cid.handle = ceph_decode_64(p);
+	}
+
+	dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
+	     cid.handle);
+	if (!rbd_cid_equal(&cid, &rbd_empty_cid)) {
+		down_write(&rbd_dev->lock_rwsem);
+		if (!rbd_cid_equal(&cid, &rbd_dev->owner_cid)) {
+			dout("%s rbd_dev %p unexpected owner, cid %llu-%llu != owner_cid %llu-%llu\n",
+			     __func__, rbd_dev, cid.gid, cid.handle,
+			     rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle);
+			up_write(&rbd_dev->lock_rwsem);
+			return;
+		}
+
+		rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
+		downgrade_write(&rbd_dev->lock_rwsem);
+	} else {
+		down_read(&rbd_dev->lock_rwsem);
+	}
+
+	if (!__rbd_is_lock_owner(rbd_dev))
+		wake_requests(rbd_dev, false);
+	up_read(&rbd_dev->lock_rwsem);
+}
+
+static bool rbd_handle_request_lock(struct rbd_device *rbd_dev, u8 struct_v,
+				    void **p)
+{
+	struct rbd_client_id my_cid = rbd_get_cid(rbd_dev);
+	struct rbd_client_id cid = { 0 };
+	bool need_to_send;
+
+	if (struct_v >= 2) {
+		cid.gid = ceph_decode_64(p);
+		cid.handle = ceph_decode_64(p);
+	}
+
+	dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
+	     cid.handle);
+	if (rbd_cid_equal(&cid, &my_cid))
+		return false;
+
+	down_read(&rbd_dev->lock_rwsem);
+	need_to_send = __rbd_is_lock_owner(rbd_dev);
+	if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED) {
+		if (!rbd_cid_equal(&rbd_dev->owner_cid, &rbd_empty_cid)) {
+			dout("%s rbd_dev %p queueing unlock_work\n", __func__,
+			     rbd_dev);
+			queue_work(rbd_dev->task_wq, &rbd_dev->unlock_work);
+		}
+	}
+	up_read(&rbd_dev->lock_rwsem);
+	return need_to_send;
+}
+
+static void __rbd_acknowledge_notify(struct rbd_device *rbd_dev,
+				     u64 notify_id, u64 cookie, s32 *result)
+{
+	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
+	int buf_size = 4 + CEPH_ENCODING_START_BLK_LEN;
+	char buf[buf_size];
+	int ret;
+
+	if (result) {
+		void *p = buf;
+
+		/* encode ResponseMessage */
+		ceph_start_encoding(&p, 1, 1,
+				    buf_size - CEPH_ENCODING_START_BLK_LEN);
+		ceph_encode_32(&p, *result);
+	} else {
+		buf_size = 0;
+	}
 
 	ret = ceph_osdc_notify_ack(osdc, &rbd_dev->header_oid,
 				   &rbd_dev->header_oloc, notify_id, cookie,
-				   NULL, 0);
+				   buf, buf_size);
 	if (ret)
-		rbd_warn(rbd_dev, "notify_ack ret %d", ret);
+		rbd_warn(rbd_dev, "acknowledge_notify failed: %d", ret);
+}
+
+static void rbd_acknowledge_notify(struct rbd_device *rbd_dev, u64 notify_id,
+				   u64 cookie)
+{
+	dout("%s rbd_dev %p\n", __func__, rbd_dev);
+	__rbd_acknowledge_notify(rbd_dev, notify_id, cookie, NULL);
+}
+
+static void rbd_acknowledge_notify_result(struct rbd_device *rbd_dev,
+					  u64 notify_id, u64 cookie, s32 result)
+{
+	dout("%s rbd_dev %p result %d\n", __func__, rbd_dev, result);
+	__rbd_acknowledge_notify(rbd_dev, notify_id, cookie, &result);
+}
+
+static void rbd_watch_cb(void *arg, u64 notify_id, u64 cookie,
+			 u64 notifier_id, void *data, size_t data_len)
+{
+	struct rbd_device *rbd_dev = arg;
+	void *p = data;
+	void *const end = p + data_len;
+	u8 struct_v;
+	u32 len;
+	u32 notify_op;
+	int ret;
+
+	dout("%s rbd_dev %p cookie %llu notify_id %llu data_len %zu\n",
+	     __func__, rbd_dev, cookie, notify_id, data_len);
+	if (data_len) {
+		ret = ceph_start_decoding(&p, end, 1, "NotifyMessage",
+					  &struct_v, &len);
+		if (ret) {
+			rbd_warn(rbd_dev, "failed to decode NotifyMessage: %d",
+				 ret);
+			return;
+		}
+
+		notify_op = ceph_decode_32(&p);
+	} else {
+		/* legacy notification for header updates */
+		notify_op = RBD_NOTIFY_OP_HEADER_UPDATE;
+		len = 0;
+	}
+
+	dout("%s rbd_dev %p notify_op %u\n", __func__, rbd_dev, notify_op);
+	switch (notify_op) {
+	case RBD_NOTIFY_OP_ACQUIRED_LOCK:
+		rbd_handle_acquired_lock(rbd_dev, struct_v, &p);
+		rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
+		break;
+	case RBD_NOTIFY_OP_RELEASED_LOCK:
+		rbd_handle_released_lock(rbd_dev, struct_v, &p);
+		rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
+		break;
+	case RBD_NOTIFY_OP_REQUEST_LOCK:
+		if (rbd_handle_request_lock(rbd_dev, struct_v, &p))
+			/*
+			 * send ResponseMessage(0) back so the client
+			 * can detect a missing owner
+			 */
+			rbd_acknowledge_notify_result(rbd_dev, notify_id,
+						      cookie, 0);
+		else
+			rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
+		break;
+	case RBD_NOTIFY_OP_HEADER_UPDATE:
+		ret = rbd_dev_refresh(rbd_dev);
+		if (ret)
+			rbd_warn(rbd_dev, "refresh failed: %d", ret);
+
+		rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
+		break;
+	default:
+		if (rbd_is_lock_owner(rbd_dev))
+			rbd_acknowledge_notify_result(rbd_dev, notify_id,
+						      cookie, -EOPNOTSUPP);
+		else
+			rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
+		break;
+	}
 }
 
 static void __rbd_unregister_watch(struct rbd_device *rbd_dev);
@@ -3130,6 +3837,10 @@ static void rbd_watch_errcb(void *arg, u64 cookie, int err)
 
 	rbd_warn(rbd_dev, "encountered watch error: %d", err);
 
+	down_write(&rbd_dev->lock_rwsem);
+	rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
+	up_write(&rbd_dev->lock_rwsem);
+
 	mutex_lock(&rbd_dev->watch_mutex);
 	if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED) {
 		__rbd_unregister_watch(rbd_dev);
@@ -3202,10 +3913,15 @@ static void cancel_tasks_sync(struct rbd_device *rbd_dev)
 	dout("%s rbd_dev %p\n", __func__, rbd_dev);
 
 	cancel_delayed_work_sync(&rbd_dev->watch_dwork);
+	cancel_work_sync(&rbd_dev->acquired_lock_work);
+	cancel_work_sync(&rbd_dev->released_lock_work);
+	cancel_delayed_work_sync(&rbd_dev->lock_dwork);
+	cancel_work_sync(&rbd_dev->unlock_work);
 }
 
 static void rbd_unregister_watch(struct rbd_device *rbd_dev)
 {
+	WARN_ON(waitqueue_active(&rbd_dev->lock_waitq));
 	cancel_tasks_sync(rbd_dev);
 
 	mutex_lock(&rbd_dev->watch_mutex);
@@ -3221,10 +3937,15 @@ static void rbd_reregister_watch(struct work_struct *work)
 {
 	struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
 					    struct rbd_device, watch_dwork);
+	bool was_lock_owner = false;
 	int ret;
 
 	dout("%s rbd_dev %p\n", __func__, rbd_dev);
 
+	down_write(&rbd_dev->lock_rwsem);
+	if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED)
+		was_lock_owner = rbd_release_lock(rbd_dev);
+
 	mutex_lock(&rbd_dev->watch_mutex);
 	if (rbd_dev->watch_state != RBD_WATCH_STATE_ERROR)
 		goto fail_unlock;
@@ -3247,10 +3968,20 @@ static void rbd_reregister_watch(struct work_struct *work)
 	if (ret)
 		rbd_warn(rbd_dev, "reregisteration refresh failed: %d", ret);
 
+	if (was_lock_owner) {
+		ret = rbd_try_lock(rbd_dev);
+		if (ret)
+			rbd_warn(rbd_dev, "reregisteration lock failed: %d",
+				 ret);
+	}
+
+	up_write(&rbd_dev->lock_rwsem);
+	wake_requests(rbd_dev, true);
 	return;
 
 fail_unlock:
 	mutex_unlock(&rbd_dev->watch_mutex);
+	up_write(&rbd_dev->lock_rwsem);
 }
 
 /*
@@ -3340,6 +4071,29 @@ out:
 	return ret;
 }
 
+/*
+ * lock_rwsem must be held for read
+ */
+static void rbd_wait_state_locked(struct rbd_device *rbd_dev)
+{
+	DEFINE_WAIT(wait);
+
+	do {
+		/*
+		 * Note the use of mod_delayed_work() in rbd_acquire_lock()
+		 * and cancel_delayed_work() in wake_requests().
+		 */
+		dout("%s rbd_dev %p queueing lock_dwork\n", __func__, rbd_dev);
+		queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
+		prepare_to_wait_exclusive(&rbd_dev->lock_waitq, &wait,
+					  TASK_UNINTERRUPTIBLE);
+		up_read(&rbd_dev->lock_rwsem);
+		schedule();
+		down_read(&rbd_dev->lock_rwsem);
+	} while (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED);
+	finish_wait(&rbd_dev->lock_waitq, &wait);
+}
+
 static void rbd_queue_workfn(struct work_struct *work)
 {
 	struct request *rq = blk_mq_rq_from_pdu(work);
@@ -3350,6 +4104,7 @@ static void rbd_queue_workfn(struct work_struct *work)
 	u64 length = blk_rq_bytes(rq);
 	enum obj_operation_type op_type;
 	u64 mapping_size;
+	bool must_be_locked = false;
 	int result;
 
 	if (rq->cmd_type != REQ_TYPE_FS) {
@@ -3411,6 +4166,7 @@ static void rbd_queue_workfn(struct work_struct *work)
 	if (op_type != OBJ_OP_READ) {
 		snapc = rbd_dev->header.snapc;
 		ceph_get_snap_context(snapc);
+		must_be_locked = rbd_is_lock_supported(rbd_dev);
 	}
 	up_read(&rbd_dev->header_rwsem);
 
@@ -3421,11 +4177,17 @@ static void rbd_queue_workfn(struct work_struct *work)
 		goto err_rq;
 	}
 
+	if (must_be_locked) {
+		down_read(&rbd_dev->lock_rwsem);
+		if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED)
+			rbd_wait_state_locked(rbd_dev);
+	}
+
 	img_request = rbd_img_request_create(rbd_dev, offset, length, op_type,
 					     snapc);
 	if (!img_request) {
 		result = -ENOMEM;
-		goto err_rq;
+		goto err_unlock;
 	}
 	img_request->rq = rq;
 	snapc = NULL; /* img_request consumes a ref */
@@ -3443,10 +4205,15 @@ static void rbd_queue_workfn(struct work_struct *work)
 	if (result)
 		goto err_img_request;
 
+	if (must_be_locked)
+		up_read(&rbd_dev->lock_rwsem);
 	return;
 
 err_img_request:
 	rbd_img_request_put(img_request);
+err_unlock:
+	if (must_be_locked)
+		up_read(&rbd_dev->lock_rwsem);
 err_rq:
 	if (result)
 		rbd_warn(rbd_dev, "%s %llx at %llx result %d",
@@ -4020,6 +4787,7 @@ static void rbd_spec_free(struct kref *kref)
 static void rbd_dev_free(struct rbd_device *rbd_dev)
 {
 	WARN_ON(rbd_dev->watch_state != RBD_WATCH_STATE_UNREGISTERED);
+	WARN_ON(rbd_dev->lock_state != RBD_LOCK_STATE_UNLOCKED);
 
 	ceph_oid_destroy(&rbd_dev->header_oid);
 	ceph_oloc_destroy(&rbd_dev->header_oloc);
@@ -4071,6 +4839,14 @@ static struct rbd_device *__rbd_dev_create(struct rbd_client *rbdc,
 	rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED;
 	INIT_DELAYED_WORK(&rbd_dev->watch_dwork, rbd_reregister_watch);
 
+	init_rwsem(&rbd_dev->lock_rwsem);
+	rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
+	INIT_WORK(&rbd_dev->acquired_lock_work, rbd_notify_acquired_lock);
+	INIT_WORK(&rbd_dev->released_lock_work, rbd_notify_released_lock);
+	INIT_DELAYED_WORK(&rbd_dev->lock_dwork, rbd_acquire_lock);
+	INIT_WORK(&rbd_dev->unlock_work, rbd_release_lock_work);
+	init_waitqueue_head(&rbd_dev->lock_waitq);
+
 	rbd_dev->dev.bus = &rbd_bus_type;
 	rbd_dev->dev.type = &rbd_device_type;
 	rbd_dev->dev.parent = &rbd_root_dev;
@@ -5553,6 +6329,10 @@ static ssize_t do_rbd_remove(struct bus_type *bus,
 	if (ret < 0 || already)
 		return ret;
 
+	down_write(&rbd_dev->lock_rwsem);
+	if (__rbd_is_lock_owner(rbd_dev))
+		rbd_unlock(rbd_dev);
+	up_write(&rbd_dev->lock_rwsem);
 	rbd_unregister_watch(rbd_dev);
 
 	/*
diff --git a/drivers/block/rbd_types.h b/drivers/block/rbd_types.h
index 49d77cbcf8bd..94f367db27b0 100644
--- a/drivers/block/rbd_types.h
+++ b/drivers/block/rbd_types.h
@@ -28,6 +28,17 @@
 #define RBD_DATA_PREFIX        "rbd_data."
 #define RBD_ID_PREFIX          "rbd_id."
 
+#define RBD_LOCK_NAME          "rbd_lock"
+#define RBD_LOCK_TAG           "internal"
+#define RBD_LOCK_COOKIE_PREFIX "auto"
+
+enum rbd_notify_op {
+	RBD_NOTIFY_OP_ACQUIRED_LOCK      = 0,
+	RBD_NOTIFY_OP_RELEASED_LOCK      = 1,
+	RBD_NOTIFY_OP_REQUEST_LOCK       = 2,
+	RBD_NOTIFY_OP_HEADER_UPDATE      = 3,
+};
+
 /*
  * For format version 1, rbd image 'foo' consists of objects
  *   foo.rbd		- image metadata
diff --git a/net/ceph/ceph_strings.c b/net/ceph/ceph_strings.c
index 3773a4fa11e3..19b7d8aa915c 100644
--- a/net/ceph/ceph_strings.c
+++ b/net/ceph/ceph_strings.c
@@ -15,6 +15,7 @@ const char *ceph_entity_type_name(int type)
 	default: return "unknown";
 	}
 }
+EXPORT_SYMBOL(ceph_entity_type_name);
 
 const char *ceph_osd_op_name(int op)
 {
-- 
2.4.3


^ permalink raw reply related	[flat|nested] 29+ messages in thread

* [PATCH 11/16] rbd: print capacity in decimal and features in hex
  2016-08-24 13:18 [PATCH 00/16] rbd: support for exclusive-lock + mpath remap bits Ilya Dryomov
                   ` (9 preceding siblings ...)
  2016-08-24 13:18 ` [PATCH 10/16] rbd: support for exclusive-lock feature Ilya Dryomov
@ 2016-08-24 13:18 ` Ilya Dryomov
  2016-08-24 13:18 ` [PATCH 12/16] rbd: add 'client_addr' sysfs rbd device attribute Ilya Dryomov
                   ` (5 subsequent siblings)
  16 siblings, 0 replies; 29+ messages in thread
From: Ilya Dryomov @ 2016-08-24 13:18 UTC (permalink / raw)
  To: ceph-devel

With exclusive-lock added and more to come, print features into dmesg.
Change capacity to decimal while at it.

Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
---
 drivers/block/rbd.c | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)

diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index 7cda1cc60c2c..fd1a9891b348 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -6006,8 +6006,9 @@ static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
 	spin_unlock(&rbd_dev_list_lock);
 
 	add_disk(rbd_dev->disk);
-	pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
-		(unsigned long long) rbd_dev->mapping.size);
+	pr_info("%s: capacity %llu features 0x%llx\n", rbd_dev->disk->disk_name,
+		(unsigned long long)get_capacity(rbd_dev->disk) << SECTOR_SHIFT,
+		rbd_dev->header.features);
 
 	return ret;
 
-- 
2.4.3


^ permalink raw reply related	[flat|nested] 29+ messages in thread

* [PATCH 12/16] rbd: add 'client_addr' sysfs rbd device attribute
  2016-08-24 13:18 [PATCH 00/16] rbd: support for exclusive-lock + mpath remap bits Ilya Dryomov
                   ` (10 preceding siblings ...)
  2016-08-24 13:18 ` [PATCH 11/16] rbd: print capacity in decimal and features in hex Ilya Dryomov
@ 2016-08-24 13:18 ` Ilya Dryomov
  2016-08-24 13:18 ` [PATCH 13/16] rbd: add 'cluster_fsid' " Ilya Dryomov
                   ` (4 subsequent siblings)
  16 siblings, 0 replies; 29+ messages in thread
From: Ilya Dryomov @ 2016-08-24 13:18 UTC (permalink / raw)
  To: ceph-devel

Export client addr/nonce, so userspace can check if a image is being
blacklisted.

Signed-off-by: Mike Christie <mchristi@redhat.com>
[idryomov@gmail.com: ceph_client_addr(), endianess fix]
Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
---
 Documentation/ABI/testing/sysfs-bus-rbd |  6 ++++++
 drivers/block/rbd.c                     | 13 +++++++++++++
 include/linux/ceph/libceph.h            |  1 +
 net/ceph/ceph_common.c                  |  6 ++++++
 4 files changed, 26 insertions(+)

diff --git a/Documentation/ABI/testing/sysfs-bus-rbd b/Documentation/ABI/testing/sysfs-bus-rbd
index 2ddd680929d8..273e27f2491f 100644
--- a/Documentation/ABI/testing/sysfs-bus-rbd
+++ b/Documentation/ABI/testing/sysfs-bus-rbd
@@ -43,6 +43,12 @@ Description:	Available only if rbd module is inserted with single_major
 Entries under /sys/bus/rbd/devices/<dev-id>/
 --------------------------------------------
 
+client_addr
+
+	The ceph unique client entity_addr_t (address + nonce).
+	The format is <address>:<port>/<nonce>: '1.2.3.4:1234/5678' or
+	'[1:2:3:4:5:6:7:8]:1234/5678'.  (August 2016, since 4.9.)
+
 client_id
 
 	The ceph unique client id that was assigned for this specific session.
diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index fd1a9891b348..69d76c3afcdd 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -4592,6 +4592,17 @@ static ssize_t rbd_minor_show(struct device *dev,
 	return sprintf(buf, "%d\n", rbd_dev->minor);
 }
 
+static ssize_t rbd_client_addr_show(struct device *dev,
+				    struct device_attribute *attr, char *buf)
+{
+	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
+	struct ceph_entity_addr *client_addr =
+	    ceph_client_addr(rbd_dev->rbd_client->client);
+
+	return sprintf(buf, "%pISpc/%u\n", &client_addr->in_addr,
+		       le32_to_cpu(client_addr->nonce));
+}
+
 static ssize_t rbd_client_id_show(struct device *dev,
 				  struct device_attribute *attr, char *buf)
 {
@@ -4702,6 +4713,7 @@ static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
 static DEVICE_ATTR(minor, S_IRUGO, rbd_minor_show, NULL);
+static DEVICE_ATTR(client_addr, S_IRUGO, rbd_client_addr_show, NULL);
 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
@@ -4716,6 +4728,7 @@ static struct attribute *rbd_attrs[] = {
 	&dev_attr_features.attr,
 	&dev_attr_major.attr,
 	&dev_attr_minor.attr,
+	&dev_attr_client_addr.attr,
 	&dev_attr_client_id.attr,
 	&dev_attr_pool.attr,
 	&dev_attr_pool_id.attr,
diff --git a/include/linux/ceph/libceph.h b/include/linux/ceph/libceph.h
index b4cffff70e44..1816c5e26581 100644
--- a/include/linux/ceph/libceph.h
+++ b/include/linux/ceph/libceph.h
@@ -264,6 +264,7 @@ extern struct ceph_client *ceph_create_client(struct ceph_options *opt,
 					      void *private,
 					      u64 supported_features,
 					      u64 required_features);
+struct ceph_entity_addr *ceph_client_addr(struct ceph_client *client);
 u64 ceph_client_gid(struct ceph_client *client);
 extern void ceph_destroy_client(struct ceph_client *client);
 extern int __ceph_open_session(struct ceph_client *client,
diff --git a/net/ceph/ceph_common.c b/net/ceph/ceph_common.c
index 8a7921767308..464e88599b9d 100644
--- a/net/ceph/ceph_common.c
+++ b/net/ceph/ceph_common.c
@@ -566,6 +566,12 @@ int ceph_print_client_options(struct seq_file *m, struct ceph_client *client)
 }
 EXPORT_SYMBOL(ceph_print_client_options);
 
+struct ceph_entity_addr *ceph_client_addr(struct ceph_client *client)
+{
+	return &client->msgr.inst.addr;
+}
+EXPORT_SYMBOL(ceph_client_addr);
+
 u64 ceph_client_gid(struct ceph_client *client)
 {
 	return client->monc.auth->global_id;
-- 
2.4.3


^ permalink raw reply related	[flat|nested] 29+ messages in thread

* [PATCH 13/16] rbd: add 'cluster_fsid' sysfs rbd device attribute
  2016-08-24 13:18 [PATCH 00/16] rbd: support for exclusive-lock + mpath remap bits Ilya Dryomov
                   ` (11 preceding siblings ...)
  2016-08-24 13:18 ` [PATCH 12/16] rbd: add 'client_addr' sysfs rbd device attribute Ilya Dryomov
@ 2016-08-24 13:18 ` Ilya Dryomov
  2016-08-24 13:18 ` [PATCH 14/16] rbd: add 'snap_id' " Ilya Dryomov
                   ` (3 subsequent siblings)
  16 siblings, 0 replies; 29+ messages in thread
From: Ilya Dryomov @ 2016-08-24 13:18 UTC (permalink / raw)
  To: ceph-devel

From: Mike Christie <mchristi@redhat.com>

Export the cluster fsid, so tools like udev and multipath-tools can use
it for part of the uuid.

Signed-off-by: Mike Christie <mchristi@redhat.com>
Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
---
 Documentation/ABI/testing/sysfs-bus-rbd |  4 ++++
 drivers/block/rbd.c                     | 10 ++++++++++
 2 files changed, 14 insertions(+)

diff --git a/Documentation/ABI/testing/sysfs-bus-rbd b/Documentation/ABI/testing/sysfs-bus-rbd
index 273e27f2491f..9a655f3f4386 100644
--- a/Documentation/ABI/testing/sysfs-bus-rbd
+++ b/Documentation/ABI/testing/sysfs-bus-rbd
@@ -53,6 +53,10 @@ client_id
 
 	The ceph unique client id that was assigned for this specific session.
 
+cluster_fsid
+
+	The ceph cluster UUID.  (August 2016, since 4.9.)
+
 features
 
 	A hexadecimal encoding of the feature bits for this image.
diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index 69d76c3afcdd..c95104a80065 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -4612,6 +4612,14 @@ static ssize_t rbd_client_id_show(struct device *dev,
 		       ceph_client_gid(rbd_dev->rbd_client->client));
 }
 
+static ssize_t rbd_cluster_fsid_show(struct device *dev,
+				     struct device_attribute *attr, char *buf)
+{
+	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
+
+	return sprintf(buf, "%pU\n", &rbd_dev->rbd_client->client->fsid);
+}
+
 static ssize_t rbd_pool_show(struct device *dev,
 			     struct device_attribute *attr, char *buf)
 {
@@ -4715,6 +4723,7 @@ static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
 static DEVICE_ATTR(minor, S_IRUGO, rbd_minor_show, NULL);
 static DEVICE_ATTR(client_addr, S_IRUGO, rbd_client_addr_show, NULL);
 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
+static DEVICE_ATTR(cluster_fsid, S_IRUGO, rbd_cluster_fsid_show, NULL);
 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
@@ -4730,6 +4739,7 @@ static struct attribute *rbd_attrs[] = {
 	&dev_attr_minor.attr,
 	&dev_attr_client_addr.attr,
 	&dev_attr_client_id.attr,
+	&dev_attr_cluster_fsid.attr,
 	&dev_attr_pool.attr,
 	&dev_attr_pool_id.attr,
 	&dev_attr_name.attr,
-- 
2.4.3


^ permalink raw reply related	[flat|nested] 29+ messages in thread

* [PATCH 14/16] rbd: add 'snap_id' sysfs rbd device attribute
  2016-08-24 13:18 [PATCH 00/16] rbd: support for exclusive-lock + mpath remap bits Ilya Dryomov
                   ` (12 preceding siblings ...)
  2016-08-24 13:18 ` [PATCH 13/16] rbd: add 'cluster_fsid' " Ilya Dryomov
@ 2016-08-24 13:18 ` Ilya Dryomov
  2016-08-24 13:18 ` [PATCH 15/16] rbd: add 'config_info' " Ilya Dryomov
                   ` (2 subsequent siblings)
  16 siblings, 0 replies; 29+ messages in thread
From: Ilya Dryomov @ 2016-08-24 13:18 UTC (permalink / raw)
  To: ceph-devel

From: Mike Christie <mchristi@redhat.com>

Export snap id in sysfs, so tools like multipathd can use it in a uuid.

Signed-off-by: Mike Christie <mchristi@redhat.com>
Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
---
 Documentation/ABI/testing/sysfs-bus-rbd |  4 ++++
 drivers/block/rbd.c                     | 10 ++++++++++
 2 files changed, 14 insertions(+)

diff --git a/Documentation/ABI/testing/sysfs-bus-rbd b/Documentation/ABI/testing/sysfs-bus-rbd
index 9a655f3f4386..7bf8d4fa6f63 100644
--- a/Documentation/ABI/testing/sysfs-bus-rbd
+++ b/Documentation/ABI/testing/sysfs-bus-rbd
@@ -102,6 +102,10 @@ current_snap
 
 	The current snapshot for which the device is mapped.
 
+snap_id
+
+	The current snapshot's id.  (August 2016, since 4.9.)
+
 parent
 
 	Information identifying the chain of parent images in a layered rbd
diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index c95104a80065..36ebec19dc20 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -4669,6 +4669,14 @@ static ssize_t rbd_snap_show(struct device *dev,
 	return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
 }
 
+static ssize_t rbd_snap_id_show(struct device *dev,
+				struct device_attribute *attr, char *buf)
+{
+	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
+
+	return sprintf(buf, "%llu\n", rbd_dev->spec->snap_id);
+}
+
 /*
  * For a v2 image, shows the chain of parent images, separated by empty
  * lines.  For v1 images or if there is no parent, shows "(no parent
@@ -4730,6 +4738,7 @@ static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
+static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
 
 static struct attribute *rbd_attrs[] = {
@@ -4745,6 +4754,7 @@ static struct attribute *rbd_attrs[] = {
 	&dev_attr_name.attr,
 	&dev_attr_image_id.attr,
 	&dev_attr_current_snap.attr,
+	&dev_attr_snap_id.attr,
 	&dev_attr_parent.attr,
 	&dev_attr_refresh.attr,
 	NULL
-- 
2.4.3


^ permalink raw reply related	[flat|nested] 29+ messages in thread

* [PATCH 15/16] rbd: add 'config_info' sysfs rbd device attribute
  2016-08-24 13:18 [PATCH 00/16] rbd: support for exclusive-lock + mpath remap bits Ilya Dryomov
                   ` (13 preceding siblings ...)
  2016-08-24 13:18 ` [PATCH 14/16] rbd: add 'snap_id' " Ilya Dryomov
@ 2016-08-24 13:18 ` Ilya Dryomov
  2016-08-24 13:18 ` [PATCH 16/16] rbd: add force close option Ilya Dryomov
  2016-08-24 18:34 ` [PATCH 00/16] rbd: support for exclusive-lock + mpath remap bits Mike Christie
  16 siblings, 0 replies; 29+ messages in thread
From: Ilya Dryomov @ 2016-08-24 13:18 UTC (permalink / raw)
  To: ceph-devel

From: Mike Christie <mchristi@redhat.com>

Export the info used to setup the rbd image, so it can be used to remap
the image.

Signed-off-by: Mike Christie <mchristi@redhat.com>
[idryomov@gmail.com: do_rbd_add() EH]
Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
---
 Documentation/ABI/testing/sysfs-bus-rbd |  5 +++++
 drivers/block/rbd.c                     | 23 +++++++++++++++++++++--
 2 files changed, 26 insertions(+), 2 deletions(-)

diff --git a/Documentation/ABI/testing/sysfs-bus-rbd b/Documentation/ABI/testing/sysfs-bus-rbd
index 7bf8d4fa6f63..6dccbf82fcf4 100644
--- a/Documentation/ABI/testing/sysfs-bus-rbd
+++ b/Documentation/ABI/testing/sysfs-bus-rbd
@@ -57,6 +57,11 @@ cluster_fsid
 
 	The ceph cluster UUID.  (August 2016, since 4.9.)
 
+config_info
+
+	The string written into /sys/bus/rbd/add{,_single_major}.  (August
+	2016, since 4.9.)
+
 features
 
 	A hexadecimal encoding of the feature bits for this image.
diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index 36ebec19dc20..8ff2dc872008 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -370,6 +370,7 @@ struct rbd_device {
 	unsigned long		flags;		/* possibly lock protected */
 	struct rbd_spec		*spec;
 	struct rbd_options	*opts;
+	char			*config_info;	/* add{,_single_major} string */
 
 	struct ceph_object_id	header_oid;
 	struct ceph_object_locator header_oloc;
@@ -4620,6 +4621,14 @@ static ssize_t rbd_cluster_fsid_show(struct device *dev,
 	return sprintf(buf, "%pU\n", &rbd_dev->rbd_client->client->fsid);
 }
 
+static ssize_t rbd_config_info_show(struct device *dev,
+				    struct device_attribute *attr, char *buf)
+{
+	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
+
+	return sprintf(buf, "%s\n", rbd_dev->config_info);
+}
+
 static ssize_t rbd_pool_show(struct device *dev,
 			     struct device_attribute *attr, char *buf)
 {
@@ -4732,6 +4741,7 @@ static DEVICE_ATTR(minor, S_IRUGO, rbd_minor_show, NULL);
 static DEVICE_ATTR(client_addr, S_IRUGO, rbd_client_addr_show, NULL);
 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
 static DEVICE_ATTR(cluster_fsid, S_IRUGO, rbd_cluster_fsid_show, NULL);
+static DEVICE_ATTR(config_info, S_IRUSR, rbd_config_info_show, NULL);
 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
@@ -4749,6 +4759,7 @@ static struct attribute *rbd_attrs[] = {
 	&dev_attr_client_addr.attr,
 	&dev_attr_client_id.attr,
 	&dev_attr_cluster_fsid.attr,
+	&dev_attr_config_info.attr,
 	&dev_attr_pool.attr,
 	&dev_attr_pool_id.attr,
 	&dev_attr_name.attr,
@@ -4824,6 +4835,7 @@ static void rbd_dev_free(struct rbd_device *rbd_dev)
 
 	ceph_oid_destroy(&rbd_dev->header_oid);
 	ceph_oloc_destroy(&rbd_dev->header_oloc);
+	kfree(rbd_dev->config_info);
 
 	rbd_put_client(rbd_dev->rbd_client);
 	rbd_spec_put(rbd_dev->spec);
@@ -6223,10 +6235,18 @@ static ssize_t do_rbd_add(struct bus_type *bus,
 	spec = NULL;		/* rbd_dev now owns this */
 	rbd_opts = NULL;	/* rbd_dev now owns this */
 
+	rbd_dev->config_info = kstrdup(buf, GFP_KERNEL);
+	if (!rbd_dev->config_info) {
+		rc = -ENOMEM;
+		goto err_out_rbd_dev;
+	}
+
 	down_write(&rbd_dev->header_rwsem);
 	rc = rbd_dev_image_probe(rbd_dev, 0);
-	if (rc < 0)
+	if (rc < 0) {
+		up_write(&rbd_dev->header_rwsem);
 		goto err_out_rbd_dev;
+	}
 
 	/* If we are mapping a snapshot it must be marked read-only */
 
@@ -6253,7 +6273,6 @@ out:
 	return rc;
 
 err_out_rbd_dev:
-	up_write(&rbd_dev->header_rwsem);
 	rbd_dev_destroy(rbd_dev);
 err_out_client:
 	rbd_put_client(rbdc);
-- 
2.4.3


^ permalink raw reply related	[flat|nested] 29+ messages in thread

* [PATCH 16/16] rbd: add force close option
  2016-08-24 13:18 [PATCH 00/16] rbd: support for exclusive-lock + mpath remap bits Ilya Dryomov
                   ` (14 preceding siblings ...)
  2016-08-24 13:18 ` [PATCH 15/16] rbd: add 'config_info' " Ilya Dryomov
@ 2016-08-24 13:18 ` Ilya Dryomov
  2016-08-24 18:34 ` [PATCH 00/16] rbd: support for exclusive-lock + mpath remap bits Mike Christie
  16 siblings, 0 replies; 29+ messages in thread
From: Ilya Dryomov @ 2016-08-24 13:18 UTC (permalink / raw)
  To: ceph-devel

From: Mike Christie <mchristi@redhat.com>

This adds a force close option, so we can force the unmapping
of a rbd device that is open. If a path/device is blacklisted, apps
like multipathd can map a new device and then unmap the old one.
The unmapping cleanup would then be handled by the generic hotunplug
code paths in multipahd like is done for iSCSI, FC/FCOE, SAS, etc.

Signed-off-by: Mike Christie <mchristi@redhat.com>
Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
---
 Documentation/ABI/testing/sysfs-bus-rbd | 10 +++++++---
 drivers/block/rbd.c                     | 35 ++++++++++++++++++++++++---------
 2 files changed, 33 insertions(+), 12 deletions(-)

diff --git a/Documentation/ABI/testing/sysfs-bus-rbd b/Documentation/ABI/testing/sysfs-bus-rbd
index 6dccbf82fcf4..f208ac58d613 100644
--- a/Documentation/ABI/testing/sysfs-bus-rbd
+++ b/Documentation/ABI/testing/sysfs-bus-rbd
@@ -6,7 +6,7 @@ Description:
 
 Being used for adding and removing rbd block devices.
 
-Usage: <mon ip addr> <options> <pool name> <rbd image name> [snap name]
+Usage: <mon ip addr> <options> <pool name> <rbd image name> [<snap name>]
 
  $ echo "192.168.0.1 name=admin rbd foo" > /sys/bus/rbd/add
 
@@ -14,9 +14,13 @@ The snapshot name can be "-" or omitted to map the image read/write. A <dev-id>
 will be assigned for any registered block device. If snapshot is used, it will
 be mapped read-only.
 
-Removal of a device:
+Usage: <dev-id> [force]
 
-  $ echo <dev-id> > /sys/bus/rbd/remove
+ $ echo 2 > /sys/bus/rbd/remove
+
+Optional "force" argument which when passed will wait for running requests and
+then unmap the image. Requests sent to the driver after initiating the removal
+will be failed.  (August 2016, since 4.9.)
 
 What:		/sys/bus/rbd/add_single_major
 Date:		December 2013
diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index 8ff2dc872008..35fc1da6c83d 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -6347,18 +6347,26 @@ static ssize_t do_rbd_remove(struct bus_type *bus,
 	struct rbd_device *rbd_dev = NULL;
 	struct list_head *tmp;
 	int dev_id;
-	unsigned long ul;
+	char opt_buf[6];
 	bool already = false;
+	bool force = false;
 	int ret;
 
-	ret = kstrtoul(buf, 10, &ul);
-	if (ret)
-		return ret;
-
-	/* convert to int; abort if we lost anything in the conversion */
-	dev_id = (int)ul;
-	if (dev_id != ul)
+	dev_id = -1;
+	opt_buf[0] = '\0';
+	sscanf(buf, "%d %5s", &dev_id, opt_buf);
+	if (dev_id < 0) {
+		pr_err("dev_id out of range\n");
 		return -EINVAL;
+	}
+	if (opt_buf[0] != '\0') {
+		if (!strcmp(opt_buf, "force")) {
+			force = true;
+		} else {
+			pr_err("bad remove option at '%s'\n", opt_buf);
+			return -EINVAL;
+		}
+	}
 
 	ret = -ENOENT;
 	spin_lock(&rbd_dev_list_lock);
@@ -6371,7 +6379,7 @@ static ssize_t do_rbd_remove(struct bus_type *bus,
 	}
 	if (!ret) {
 		spin_lock_irq(&rbd_dev->lock);
-		if (rbd_dev->open_count)
+		if (rbd_dev->open_count && !force)
 			ret = -EBUSY;
 		else
 			already = test_and_set_bit(RBD_DEV_FLAG_REMOVING,
@@ -6382,6 +6390,15 @@ static ssize_t do_rbd_remove(struct bus_type *bus,
 	if (ret < 0 || already)
 		return ret;
 
+	if (force) {
+		/*
+		 * Prevent new IO from being queued and wait for existing
+		 * IO to complete/fail.
+		 */
+		blk_mq_freeze_queue(rbd_dev->disk->queue);
+		blk_set_queue_dying(rbd_dev->disk->queue);
+	}
+
 	down_write(&rbd_dev->lock_rwsem);
 	if (__rbd_is_lock_owner(rbd_dev))
 		rbd_unlock(rbd_dev);
-- 
2.4.3


^ permalink raw reply related	[flat|nested] 29+ messages in thread

* Re: [PATCH 00/16] rbd: support for exclusive-lock + mpath remap bits
  2016-08-24 13:18 [PATCH 00/16] rbd: support for exclusive-lock + mpath remap bits Ilya Dryomov
                   ` (15 preceding siblings ...)
  2016-08-24 13:18 ` [PATCH 16/16] rbd: add force close option Ilya Dryomov
@ 2016-08-24 18:34 ` Mike Christie
  16 siblings, 0 replies; 29+ messages in thread
From: Mike Christie @ 2016-08-24 18:34 UTC (permalink / raw)
  To: Ilya Dryomov, ceph-devel

On 08/24/2016 08:18 AM, Ilya Dryomov wrote:
> Hello,
> 
> This series already has a general thumbs up from Mike and an ack on
> sysfs interface additions from Sage, but, as wip-exclusive-lock branch
> has been through quite a few rebases recently, I wanted to send what
> hopefully is the final version for a wider review.
> 
> Thanks,
> 
>                 Ilya
> 
> 
> Douglas Fuller (5):
>   libceph: support for CEPH_OSD_OP_LIST_WATCHERS
>   libceph: add ceph_osdc_call() single-page helper
>   libceph: support for advisory locking on RADOS objects
>   libceph: support for lock.lock_info
>   libceph: support for blacklisting clients
> 
> Ilya Dryomov (7):
>   libceph: rename ceph_entity_name_encode() ->
>     ceph_auth_entity_name_encode()
>   libceph: rename ceph_client_id() -> ceph_client_gid()
>   rbd: introduce a per-device ordered workqueue
>   rbd: retry watch re-registration periodically
>   rbd: support for exclusive-lock feature
>   rbd: print capacity in decimal and features in hex
>   rbd: add 'client_addr' sysfs rbd device attribute
> 
> Mike Christie (4):
>   rbd: add 'cluster_fsid' sysfs rbd device attribute
>   rbd: add 'snap_id' sysfs rbd device attribute
>   rbd: add 'config_info' sysfs rbd device attribute
>   rbd: add force close option
> 

Reviewed-by and Tested-by: Mike Christie <mchristi@redhat.com>



^ permalink raw reply	[flat|nested] 29+ messages in thread

* Re: [PATCH 01/16] libceph: rename ceph_entity_name_encode() -> ceph_auth_entity_name_encode()
  2016-08-24 13:18 ` [PATCH 01/16] libceph: rename ceph_entity_name_encode() -> ceph_auth_entity_name_encode() Ilya Dryomov
@ 2016-08-24 18:49   ` Alex Elder
  2016-08-24 20:15     ` Ilya Dryomov
  0 siblings, 1 reply; 29+ messages in thread
From: Alex Elder @ 2016-08-24 18:49 UTC (permalink / raw)
  To: Ilya Dryomov, ceph-devel

On 08/24/2016 08:18 AM, Ilya Dryomov wrote:
> Clear up EntityName vs entity_name_t confusion.

If I understand what your code does, you're using
"auth_entity_name" to represent whatever "EntityName"
is.  (At this point I'm just looking at the code; I
haven't gone to look at what an entity_name_t or
EntitnyName are...)

In any case, this looks good to me.

Reviewed-by: Alex Elder <elder@linaro.org>

> 
> Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
> ---
>  include/linux/ceph/auth.h | 2 +-
>  net/ceph/auth.c           | 7 +++++--
>  net/ceph/auth_none.c      | 2 +-
>  3 files changed, 7 insertions(+), 4 deletions(-)
> 
> diff --git a/include/linux/ceph/auth.h b/include/linux/ceph/auth.h
> index 1563265d2097..374bb1c4ef52 100644
> --- a/include/linux/ceph/auth.h
> +++ b/include/linux/ceph/auth.h
> @@ -104,7 +104,7 @@ extern int ceph_auth_build_hello(struct ceph_auth_client *ac,
>  extern int ceph_handle_auth_reply(struct ceph_auth_client *ac,
>  				  void *buf, size_t len,
>  				  void *reply_buf, size_t reply_len);
> -extern int ceph_entity_name_encode(const char *name, void **p, void *end);
> +int ceph_auth_entity_name_encode(const char *name, void **p, void *end);
>  
>  extern int ceph_build_auth(struct ceph_auth_client *ac,
>  		    void *msg_buf, size_t msg_len);
> diff --git a/net/ceph/auth.c b/net/ceph/auth.c
> index 2bc5965fdd1e..78067dda9d3c 100644
> --- a/net/ceph/auth.c
> +++ b/net/ceph/auth.c
> @@ -82,7 +82,10 @@ void ceph_auth_reset(struct ceph_auth_client *ac)
>  	mutex_unlock(&ac->mutex);
>  }
>  
> -int ceph_entity_name_encode(const char *name, void **p, void *end)
> +/*
> + * EntityName, not to be confused with entity_name_t
> + */
> +int ceph_auth_entity_name_encode(const char *name, void **p, void *end)
>  {
>  	int len = strlen(name);
>  
> @@ -124,7 +127,7 @@ int ceph_auth_build_hello(struct ceph_auth_client *ac, void *buf, size_t len)
>  	for (i = 0; i < num; i++)
>  		ceph_encode_32(&p, supported_protocols[i]);
>  
> -	ret = ceph_entity_name_encode(ac->name, &p, end);
> +	ret = ceph_auth_entity_name_encode(ac->name, &p, end);
>  	if (ret < 0)
>  		goto out;
>  	ceph_decode_need(&p, end, sizeof(u64), bad);
> diff --git a/net/ceph/auth_none.c b/net/ceph/auth_none.c
> index 5f836f02ae36..df45e467c81f 100644
> --- a/net/ceph/auth_none.c
> +++ b/net/ceph/auth_none.c
> @@ -46,7 +46,7 @@ static int ceph_auth_none_build_authorizer(struct ceph_auth_client *ac,
>  	int ret;
>  
>  	ceph_encode_8_safe(&p, end, 1, e_range);
> -	ret = ceph_entity_name_encode(ac->name, &p, end);
> +	ret = ceph_auth_entity_name_encode(ac->name, &p, end);
>  	if (ret < 0)
>  		return ret;
>  
> 


^ permalink raw reply	[flat|nested] 29+ messages in thread

* Re: [PATCH 02/16] libceph: support for CEPH_OSD_OP_LIST_WATCHERS
  2016-08-24 13:18 ` [PATCH 02/16] libceph: support for CEPH_OSD_OP_LIST_WATCHERS Ilya Dryomov
@ 2016-08-24 19:29   ` Alex Elder
  2016-08-24 20:43     ` Ilya Dryomov
  0 siblings, 1 reply; 29+ messages in thread
From: Alex Elder @ 2016-08-24 19:29 UTC (permalink / raw)
  To: Ilya Dryomov, ceph-devel

On 08/24/2016 08:18 AM, Ilya Dryomov wrote:
> From: Douglas Fuller <dfuller@redhat.com>
> 
> Add support for this Ceph OSD op, needed to support the RBD exclusive
> lock feature.

It would be nice to provide a short description of the op, or
maybe a reference to something that explains what it is for.

I have a couple comments below.  I'm continuing to review the
code that I see without having a solid knowledge of how
things are supposed to work and what the other end does.

I do have suggestions, but I don't really see anything
wrong with this code.

Reviewed-by: Alex Elder <elder@linaro.org>

> Signed-off-by: Douglas Fuller <dfuller@redhat.com>
> [idryomov@gmail.com: refactor, misc fixes throughout]
> Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
> ---
>  include/linux/ceph/osd_client.h |  15 +++++-
>  net/ceph/osd_client.c           | 117 ++++++++++++++++++++++++++++++++++++++++
>  2 files changed, 131 insertions(+), 1 deletion(-)
> 
> diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h
> index 858932304260..19821a191732 100644
> --- a/include/linux/ceph/osd_client.h
> +++ b/include/linux/ceph/osd_client.h
> @@ -121,6 +121,9 @@ struct ceph_osd_req_op {
>  			struct ceph_osd_data response_data;
>  		} notify;
>  		struct {
> +			struct ceph_osd_data response_data;
> +		} list_watchers;
> +		struct {
>  			u64 expected_object_size;
>  			u64 expected_write_size;
>  		} alloc_hint;
> @@ -249,6 +252,12 @@ struct ceph_osd_linger_request {
>  	size_t *preply_len;
>  };
>  
> +struct ceph_watch_item {
> +	struct ceph_entity_name name;
> +	u64 cookie;
> +	struct ceph_entity_addr addr;
> +};
> +
>  struct ceph_osd_client {
>  	struct ceph_client     *client;
>  
> @@ -346,7 +355,6 @@ extern void osd_req_op_cls_response_data_pages(struct ceph_osd_request *,
>  					struct page **pages, u64 length,
>  					u32 alignment, bool pages_from_pool,
>  					bool own_pages);
> -
>  extern void osd_req_op_cls_init(struct ceph_osd_request *osd_req,
>  					unsigned int which, u16 opcode,
>  					const char *class, const char *method);
> @@ -434,5 +442,10 @@ int ceph_osdc_notify(struct ceph_osd_client *osdc,
>  		     size_t *preply_len);
>  int ceph_osdc_watch_check(struct ceph_osd_client *osdc,
>  			  struct ceph_osd_linger_request *lreq);
> +int ceph_osdc_list_watchers(struct ceph_osd_client *osdc,
> +			    struct ceph_object_id *oid,
> +			    struct ceph_object_locator *oloc,
> +			    struct ceph_watch_item **watchers,
> +			    u32 *num_watchers);
>  #endif
>  
> diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
> index a97e7b506612..dd51ec8ce97f 100644
> --- a/net/ceph/osd_client.c
> +++ b/net/ceph/osd_client.c
> @@ -338,6 +338,9 @@ static void osd_req_op_data_release(struct ceph_osd_request *osd_req,
>  		ceph_osd_data_release(&op->notify.request_data);
>  		ceph_osd_data_release(&op->notify.response_data);
>  		break;
> +	case CEPH_OSD_OP_LIST_WATCHERS:
> +		ceph_osd_data_release(&op->list_watchers.response_data);
> +		break;
>  	default:
>  		break;
>  	}
> @@ -863,6 +866,8 @@ static u32 osd_req_encode_op(struct ceph_osd_op *dst,
>  	case CEPH_OSD_OP_NOTIFY:
>  		dst->notify.cookie = cpu_to_le64(src->notify.cookie);
>  		break;
> +	case CEPH_OSD_OP_LIST_WATCHERS:
> +		break;
>  	case CEPH_OSD_OP_SETALLOCHINT:
>  		dst->alloc_hint.expected_object_size =
>  		    cpu_to_le64(src->alloc_hint.expected_object_size);
> @@ -1445,6 +1450,10 @@ static void setup_request_data(struct ceph_osd_request *req,
>  			ceph_osdc_msg_data_add(req->r_reply,
>  					       &op->extent.osd_data);
>  			break;
> +		case CEPH_OSD_OP_LIST_WATCHERS:
> +			ceph_osdc_msg_data_add(req->r_reply,
> +					       &op->list_watchers.response_data);
> +			break;
>  
>  		/* both */
>  		case CEPH_OSD_OP_CALL:
> @@ -3891,6 +3900,114 @@ int ceph_osdc_watch_check(struct ceph_osd_client *osdc,
>  	return ret;
>  }
>  
> +static int decode_watcher(void **p, void *end, struct ceph_watch_item *item)
> +{
> +	u8 struct_v;
> +	u32 struct_len;
> +	int ret;
> +
> +	ret = ceph_start_decoding(p, end, 2, "watch_item_t",
> +				  &struct_v, &struct_len);
> +	if (ret)
> +		return ret;
> +
> +	ceph_decode_copy(p, &item->name, sizeof(item->name));
> +	item->cookie = ceph_decode_64(p);
> +	*p += 4; /* skip timeout_seconds */
> +	if (struct_v >= 2) {
> +		ceph_decode_copy(p, &item->addr, sizeof(item->addr));
> +		ceph_decode_addr(&item->addr);

ceph_decode_addr() is a little ugly in how it swaps between
wire (little-endian) byte order and native byte order in the
same underlying sockaddr_storage structure.  Why isn't
ceph_decode_addr() more like, for example, ceph_decode_timespec()?


> +	}
> +
> +	dout("%s %s%llu cookie %llu addr %s\n", __func__,
> +	     ENTITY_NAME(item->name), item->cookie,
> +	     ceph_pr_addr(&item->addr.in_addr));
> +	return 0;
> +}
> +
> +static int decode_watchers(void **p, void *end,
> +			   struct ceph_watch_item **watchers,
> +			   u32 *num_watchers)
> +{
> +	u8 struct_v;
> +	u32 struct_len;
> +	int i;
> +	int ret;
> +
> +	ret = ceph_start_decoding(p, end, 1, "obj_list_watch_response_t",
> +				  &struct_v, &struct_len);
> +	if (ret)
> +		return ret;
> +
> +	*num_watchers = ceph_decode_32(p);
> +	*watchers = kcalloc(*num_watchers, sizeof(**watchers), GFP_NOIO);
> +	if (!*watchers)
> +		return -ENOMEM;
> +
> +	for (i = 0; i < *num_watchers; i++) {
> +		ret = decode_watcher(p, end, *watchers + i);
> +		if (ret) {
> +			kfree(*watchers);

			*watchers = NULL;

> +			return ret;
> +		}
> +	}
> +
> +	return 0;
> +}
> +
> +/*
> + * On success, the caller is responsible for:
> + *
> + *     kfree(watchers);
> + */
> +int ceph_osdc_list_watchers(struct ceph_osd_client *osdc,
> +			    struct ceph_object_id *oid,
> +			    struct ceph_object_locator *oloc,
> +			    struct ceph_watch_item **watchers,
> +			    u32 *num_watchers)
> +{
> +	struct ceph_osd_request *req;
> +	struct page **pages;
> +	int ret;
> +
> +	req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_NOIO);
> +	if (!req)
> +		return -ENOMEM;
> +
> +	ceph_oid_copy(&req->r_base_oid, oid);
> +	ceph_oloc_copy(&req->r_base_oloc, oloc);
> +	req->r_flags = CEPH_OSD_FLAG_READ;
> +
> +	ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
> +	if (ret)
> +		goto out_put_req;
> +
> +	pages = ceph_alloc_page_vector(1, GFP_NOIO);

Is there anything that guarantees that the number of watchers
is such that the response will always fit in a single page?
Will the response contain an error in that case?

> +	if (IS_ERR(pages)) {
> +		ret = PTR_ERR(pages);
> +		goto out_put_req;
> +	}
> +
> +	osd_req_op_init(req, 0, CEPH_OSD_OP_LIST_WATCHERS, 0);
> +	ceph_osd_data_pages_init(osd_req_op_data(req, 0, list_watchers,
> +						 response_data),
> +				 pages, PAGE_SIZE, 0, false, true);
> +
> +	ceph_osdc_start_request(osdc, req, false);
> +	ret = ceph_osdc_wait_request(osdc, req);
> +	if (ret >= 0) {
> +		void *p = page_address(pages[0]);
> +		void *const end = p + req->r_ops[0].outdata_len;
> +
> +		ret = decode_watchers(&p, end, watchers, num_watchers);
> +	}
> +
> +out_put_req:
> +	ceph_osdc_put_request(req);
> +	return ret;
> +}
> +EXPORT_SYMBOL(ceph_osdc_list_watchers);
> +
>  /*
>   * Call all pending notify callbacks - for use after a watch is
>   * unregistered, to make sure no more callbacks for it will be invoked
> 


^ permalink raw reply	[flat|nested] 29+ messages in thread

* Re: [PATCH 03/16] libceph: add ceph_osdc_call() single-page helper
  2016-08-24 13:18 ` [PATCH 03/16] libceph: add ceph_osdc_call() single-page helper Ilya Dryomov
@ 2016-08-24 19:37   ` Alex Elder
  0 siblings, 0 replies; 29+ messages in thread
From: Alex Elder @ 2016-08-24 19:37 UTC (permalink / raw)
  To: Ilya Dryomov, ceph-devel

On 08/24/2016 08:18 AM, Ilya Dryomov wrote:
> From: Douglas Fuller <dfuller@redhat.com>
> 
> Add a convenience function to osd_client to send Ceph OSD
> 'class' ops. The interface assumes that the request and
> reply data each consist of single pages.

Looks good.

Reviewed-by: Alex Elder <elder@linaro.org>

> Signed-off-by: Douglas Fuller <dfuller@redhat.com>
> Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
> ---
>  include/linux/ceph/osd_client.h |  8 +++++++
>  net/ceph/osd_client.c           | 51 +++++++++++++++++++++++++++++++++++++++++
>  2 files changed, 59 insertions(+)
> 
> diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h
> index 19821a191732..96337b15a60d 100644
> --- a/include/linux/ceph/osd_client.h
> +++ b/include/linux/ceph/osd_client.h
> @@ -397,6 +397,14 @@ extern void ceph_osdc_sync(struct ceph_osd_client *osdc);
>  extern void ceph_osdc_flush_notifies(struct ceph_osd_client *osdc);
>  void ceph_osdc_maybe_request_map(struct ceph_osd_client *osdc);
>  
> +int ceph_osdc_call(struct ceph_osd_client *osdc,
> +		   struct ceph_object_id *oid,
> +		   struct ceph_object_locator *oloc,
> +		   const char *class, const char *method,
> +		   unsigned int flags,
> +		   struct page *req_page, size_t req_len,
> +		   struct page *resp_page, size_t *resp_len);
> +
>  extern int ceph_osdc_readpages(struct ceph_osd_client *osdc,
>  			       struct ceph_vino vino,
>  			       struct ceph_file_layout *layout,
> diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
> index dd51ec8ce97f..fbc6b7090c65 100644
> --- a/net/ceph/osd_client.c
> +++ b/net/ceph/osd_client.c
> @@ -4027,6 +4027,57 @@ void ceph_osdc_maybe_request_map(struct ceph_osd_client *osdc)
>  EXPORT_SYMBOL(ceph_osdc_maybe_request_map);
>  
>  /*
> + * Execute an OSD class method on an object.
> + *
> + * @flags: CEPH_OSD_FLAG_*
> + * @resp_len: out param for reply length
> + */
> +int ceph_osdc_call(struct ceph_osd_client *osdc,
> +		   struct ceph_object_id *oid,
> +		   struct ceph_object_locator *oloc,
> +		   const char *class, const char *method,
> +		   unsigned int flags,
> +		   struct page *req_page, size_t req_len,
> +		   struct page *resp_page, size_t *resp_len)
> +{
> +	struct ceph_osd_request *req;
> +	int ret;
> +
> +	req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_NOIO);
> +	if (!req)
> +		return -ENOMEM;
> +
> +	ceph_oid_copy(&req->r_base_oid, oid);
> +	ceph_oloc_copy(&req->r_base_oloc, oloc);
> +	req->r_flags = flags;
> +
> +	ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
> +	if (ret)
> +		goto out_put_req;
> +
> +	osd_req_op_cls_init(req, 0, CEPH_OSD_OP_CALL, class, method);
> +	if (req_page)
> +		osd_req_op_cls_request_data_pages(req, 0, &req_page, req_len,
> +						  0, false, false);
> +	if (resp_page)
> +		osd_req_op_cls_response_data_pages(req, 0, &resp_page,
> +						   PAGE_SIZE, 0, false, false);
> +
> +	ceph_osdc_start_request(osdc, req, false);
> +	ret = ceph_osdc_wait_request(osdc, req);
> +	if (ret >= 0) {
> +		ret = req->r_ops[0].rval;
> +		if (resp_page)
> +			*resp_len = req->r_ops[0].outdata_len;
> +	}
> +
> +out_put_req:
> +	ceph_osdc_put_request(req);
> +	return ret;
> +}
> +EXPORT_SYMBOL(ceph_osdc_call);
> +
> +/*
>   * init, shutdown
>   */
>  int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
> 


^ permalink raw reply	[flat|nested] 29+ messages in thread

* Re: [PATCH 04/16] libceph: support for advisory locking on RADOS objects
  2016-08-24 13:18 ` [PATCH 04/16] libceph: support for advisory locking on RADOS objects Ilya Dryomov
@ 2016-08-24 19:42   ` Alex Elder
  2016-08-24 20:49     ` Ilya Dryomov
  0 siblings, 1 reply; 29+ messages in thread
From: Alex Elder @ 2016-08-24 19:42 UTC (permalink / raw)
  To: Ilya Dryomov, ceph-devel

On 08/24/2016 08:18 AM, Ilya Dryomov wrote:
> From: Douglas Fuller <dfuller@redhat.com>
> 
> This patch adds support for rados lock, unlock and break lock.
> 
> Based heavily on code by Mike Christie <michaelc@cs.wisc.edu>.

I have an unrelated comment on ceph_encode_timespec().  But
this looks good to me otherwise.

Reviewed-by: Alex Elder <elder@linaro.org>


> Signed-off-by: Douglas Fuller <dfuller@redhat.com>
> Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
> ---
>  include/linux/ceph/cls_lock_client.h |  27 ++++++
>  net/ceph/Makefile                    |   1 +
>  net/ceph/cls_lock_client.c           | 180 +++++++++++++++++++++++++++++++++++
>  3 files changed, 208 insertions(+)
>  create mode 100644 include/linux/ceph/cls_lock_client.h
>  create mode 100644 net/ceph/cls_lock_client.c
> 
> diff --git a/include/linux/ceph/cls_lock_client.h b/include/linux/ceph/cls_lock_client.h
> new file mode 100644
> index 000000000000..4e4dffef22bb
> --- /dev/null
> +++ b/include/linux/ceph/cls_lock_client.h
> @@ -0,0 +1,27 @@
> +#ifndef _LINUX_CEPH_CLS_LOCK_CLIENT_H
> +#define _LINUX_CEPH_CLS_LOCK_CLIENT_H
> +
> +#include <linux/ceph/osd_client.h>
> +
> +enum ceph_cls_lock_type {
> +	CEPH_CLS_LOCK_NONE = 0,
> +	CEPH_CLS_LOCK_EXCLUSIVE = 1,
> +	CEPH_CLS_LOCK_SHARED = 2,
> +};
> +
> +int ceph_cls_lock(struct ceph_osd_client *osdc,
> +		  struct ceph_object_id *oid,
> +		  struct ceph_object_locator *oloc,
> +		  char *lock_name, u8 type, char *cookie,
> +		  char *tag, char *desc, u8 flags);
> +int ceph_cls_unlock(struct ceph_osd_client *osdc,
> +		    struct ceph_object_id *oid,
> +		    struct ceph_object_locator *oloc,
> +		    char *lock_name, char *cookie);
> +int ceph_cls_break_lock(struct ceph_osd_client *osdc,
> +			struct ceph_object_id *oid,
> +			struct ceph_object_locator *oloc,
> +			char *lock_name, char *cookie,
> +			struct ceph_entity_name *locker);
> +
> +#endif
> diff --git a/net/ceph/Makefile b/net/ceph/Makefile
> index 84cbed630c4b..6a5180903e7b 100644
> --- a/net/ceph/Makefile
> +++ b/net/ceph/Makefile
> @@ -5,6 +5,7 @@ obj-$(CONFIG_CEPH_LIB) += libceph.o
>  
>  libceph-y := ceph_common.o messenger.o msgpool.o buffer.o pagelist.o \
>  	mon_client.o \
> +	cls_lock_client.o \
>  	osd_client.o osdmap.o crush/crush.o crush/mapper.o crush/hash.o \
>  	debugfs.o \
>  	auth.o auth_none.o \
> diff --git a/net/ceph/cls_lock_client.c b/net/ceph/cls_lock_client.c
> new file mode 100644
> index 000000000000..2a314537f958
> --- /dev/null
> +++ b/net/ceph/cls_lock_client.c
> @@ -0,0 +1,180 @@
> +#include <linux/ceph/ceph_debug.h>
> +
> +#include <linux/types.h>
> +#include <linux/slab.h>
> +
> +#include <linux/ceph/cls_lock_client.h>
> +#include <linux/ceph/decode.h>
> +
> +/**
> + * ceph_cls_lock - grab rados lock for object
> + * @oid, @oloc: object to lock
> + * @lock_name: the name of the lock
> + * @type: lock type (CEPH_CLS_LOCK_EXCLUSIVE or CEPH_CLS_LOCK_SHARED)
> + * @cookie: user-defined identifier for this instance of the lock
> + * @tag: user-defined tag
> + * @desc: user-defined lock description
> + * @flags: lock flags
> + *
> + * All operations on the same lock should use the same tag.
> + */
> +int ceph_cls_lock(struct ceph_osd_client *osdc,
> +		  struct ceph_object_id *oid,
> +		  struct ceph_object_locator *oloc,
> +		  char *lock_name, u8 type, char *cookie,
> +		  char *tag, char *desc, u8 flags)
> +{
> +	int lock_op_buf_size;
> +	int name_len = strlen(lock_name);
> +	int cookie_len = strlen(cookie);
> +	int tag_len = strlen(tag);
> +	int desc_len = strlen(desc);
> +	void *p, *end;
> +	struct page *lock_op_page;
> +	struct timespec mtime;
> +	int ret;
> +
> +	lock_op_buf_size = name_len + sizeof(__le32) +
> +			   cookie_len + sizeof(__le32) +
> +			   tag_len + sizeof(__le32) +
> +			   desc_len + sizeof(__le32) +
> +			   sizeof(struct ceph_timespec) +
> +			   /* flag and type */
> +			   sizeof(u8) + sizeof(u8) +
> +			   CEPH_ENCODING_START_BLK_LEN;
> +	if (lock_op_buf_size > PAGE_SIZE)
> +		return -E2BIG;
> +
> +	lock_op_page = alloc_page(GFP_NOIO);
> +	if (!lock_op_page)
> +		return -ENOMEM;
> +
> +	p = page_address(lock_op_page);
> +	end = p + lock_op_buf_size;
> +
> +	/* encode cls_lock_lock_op struct */
> +	ceph_start_encoding(&p, 1, 1,
> +			    lock_op_buf_size - CEPH_ENCODING_START_BLK_LEN);
> +	ceph_encode_string(&p, end, lock_name, name_len);
> +	ceph_encode_8(&p, type);
> +	ceph_encode_string(&p, end, cookie, cookie_len);
> +	ceph_encode_string(&p, end, tag, tag_len);
> +	ceph_encode_string(&p, end, desc, desc_len);
> +	/* only support infinite duration */
> +	memset(&mtime, 0, sizeof(mtime));
> +	ceph_encode_timespec(p, &mtime);

This is another unfortunate encoding function.  It should take
the address of the pointer, and advance the pointer internally
like most of the other ceph_encode_*() functions do.

> +	p += sizeof(struct ceph_timespec);
> +	ceph_encode_8(&p, flags);
> +
> +	dout("%s lock_name %s type %d cookie %s tag %s desc %s flags 0x%x\n",
> +	     __func__, lock_name, type, cookie, tag, desc, flags);
> +	ret = ceph_osdc_call(osdc, oid, oloc, "lock", "lock",
> +			     CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
> +			     lock_op_page, lock_op_buf_size, NULL, NULL);
> +
> +	dout("%s: status %d\n", __func__, ret);
> +	__free_page(lock_op_page);
> +	return ret;
> +}
> +EXPORT_SYMBOL(ceph_cls_lock);
> +
> +/**
> + * ceph_cls_unlock - release rados lock for object
> + * @oid, @oloc: object to lock
> + * @lock_name: the name of the lock
> + * @cookie: user-defined identifier for this instance of the lock
> + */
> +int ceph_cls_unlock(struct ceph_osd_client *osdc,
> +		    struct ceph_object_id *oid,
> +		    struct ceph_object_locator *oloc,
> +		    char *lock_name, char *cookie)
> +{
> +	int unlock_op_buf_size;
> +	int name_len = strlen(lock_name);
> +	int cookie_len = strlen(cookie);
> +	void *p, *end;
> +	struct page *unlock_op_page;
> +	int ret;
> +
> +	unlock_op_buf_size = name_len + sizeof(__le32) +
> +			     cookie_len + sizeof(__le32) +
> +			     CEPH_ENCODING_START_BLK_LEN;
> +	if (unlock_op_buf_size > PAGE_SIZE)
> +		return -E2BIG;
> +
> +	unlock_op_page = alloc_page(GFP_NOIO);
> +	if (!unlock_op_page)
> +		return -ENOMEM;
> +
> +	p = page_address(unlock_op_page);
> +	end = p + unlock_op_buf_size;
> +
> +	/* encode cls_lock_unlock_op struct */
> +	ceph_start_encoding(&p, 1, 1,
> +			    unlock_op_buf_size - CEPH_ENCODING_START_BLK_LEN);
> +	ceph_encode_string(&p, end, lock_name, name_len);
> +	ceph_encode_string(&p, end, cookie, cookie_len);
> +
> +	dout("%s lock_name %s cookie %s\n", __func__, lock_name, cookie);
> +	ret = ceph_osdc_call(osdc, oid, oloc, "lock", "unlock",
> +			     CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
> +			     unlock_op_page, unlock_op_buf_size, NULL, NULL);
> +
> +	dout("%s: status %d\n", __func__, ret);
> +	__free_page(unlock_op_page);
> +	return ret;
> +}
> +EXPORT_SYMBOL(ceph_cls_unlock);
> +
> +/**
> + * ceph_cls_break_lock - release rados lock for object for specified client
> + * @oid, @oloc: object to lock
> + * @lock_name: the name of the lock
> + * @cookie: user-defined identifier for this instance of the lock
> + * @locker: current lock owner
> + */
> +int ceph_cls_break_lock(struct ceph_osd_client *osdc,
> +			struct ceph_object_id *oid,
> +			struct ceph_object_locator *oloc,
> +			char *lock_name, char *cookie,
> +			struct ceph_entity_name *locker)
> +{
> +	int break_op_buf_size;
> +	int name_len = strlen(lock_name);
> +	int cookie_len = strlen(cookie);
> +	struct page *break_op_page;
> +	void *p, *end;
> +	int ret;
> +
> +	break_op_buf_size = name_len + sizeof(__le32) +
> +			    cookie_len + sizeof(__le32) +
> +			    sizeof(u8) + sizeof(__le64) +
> +			    CEPH_ENCODING_START_BLK_LEN;
> +	if (break_op_buf_size > PAGE_SIZE)
> +		return -E2BIG;
> +
> +	break_op_page = alloc_page(GFP_NOIO);
> +	if (!break_op_page)
> +		return -ENOMEM;
> +
> +	p = page_address(break_op_page);
> +	end = p + break_op_buf_size;
> +
> +	/* encode cls_lock_break_op struct */
> +	ceph_start_encoding(&p, 1, 1,
> +			    break_op_buf_size - CEPH_ENCODING_START_BLK_LEN);
> +	ceph_encode_string(&p, end, lock_name, name_len);
> +	ceph_encode_copy(&p, locker, sizeof(*locker));
> +	ceph_encode_string(&p, end, cookie, cookie_len);
> +
> +	dout("%s lock_name %s cookie %s locker %s%llu\n", __func__, lock_name,
> +	     cookie, ENTITY_NAME(*locker));
> +	ret = ceph_osdc_call(osdc, oid, oloc, "lock", "break_lock",
> +			     CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
> +			     break_op_page, break_op_buf_size, NULL, NULL);
> +
> +	dout("%s: status %d\n", __func__, ret);
> +	__free_page(break_op_page);
> +	return ret;
> +}
> +EXPORT_SYMBOL(ceph_cls_break_lock);
> 


^ permalink raw reply	[flat|nested] 29+ messages in thread

* Re: [PATCH 05/16] libceph: support for lock.lock_info
  2016-08-24 13:18 ` [PATCH 05/16] libceph: support for lock.lock_info Ilya Dryomov
@ 2016-08-24 19:56   ` Alex Elder
  0 siblings, 0 replies; 29+ messages in thread
From: Alex Elder @ 2016-08-24 19:56 UTC (permalink / raw)
  To: Ilya Dryomov, ceph-devel

On 08/24/2016 08:18 AM, Ilya Dryomov wrote:
> From: Douglas Fuller <dfuller@redhat.com>
> 
> Add an interface for the Ceph OSD lock.lock_info method and associated
> data structures.
> 
> Based heavily on code by Mike Christie <michaelc@cs.wisc.edu>.

Looks good to me.

Reviewed-by: Alex Elder <elder@linaro.org>

> 
> Signed-off-by: Douglas Fuller <dfuller@redhat.com>
> [idryomov@gmail.com: refactor, misc fixes throughout]
> Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
> ---
>  include/linux/ceph/cls_lock_client.h |  22 ++++++
>  net/ceph/cls_lock_client.c           | 145 +++++++++++++++++++++++++++++++++++
>  2 files changed, 167 insertions(+)
> 
> diff --git a/include/linux/ceph/cls_lock_client.h b/include/linux/ceph/cls_lock_client.h
> index 4e4dffef22bb..84884d8d4710 100644
> --- a/include/linux/ceph/cls_lock_client.h
> +++ b/include/linux/ceph/cls_lock_client.h
> @@ -9,6 +9,20 @@ enum ceph_cls_lock_type {
>  	CEPH_CLS_LOCK_SHARED = 2,
>  };
>  
> +struct ceph_locker_id {
> +	struct ceph_entity_name name;	/* locker's client name */
> +	char *cookie;			/* locker's cookie */
> +};
> +
> +struct ceph_locker_info {
> +	struct ceph_entity_addr addr;	/* locker's address */
> +};
> +
> +struct ceph_locker {
> +	struct ceph_locker_id id;
> +	struct ceph_locker_info info;
> +};
> +
>  int ceph_cls_lock(struct ceph_osd_client *osdc,
>  		  struct ceph_object_id *oid,
>  		  struct ceph_object_locator *oloc,
> @@ -24,4 +38,12 @@ int ceph_cls_break_lock(struct ceph_osd_client *osdc,
>  			char *lock_name, char *cookie,
>  			struct ceph_entity_name *locker);
>  
> +void ceph_free_lockers(struct ceph_locker *lockers, u32 num_lockers);
> +
> +int ceph_cls_lock_info(struct ceph_osd_client *osdc,
> +		       struct ceph_object_id *oid,
> +		       struct ceph_object_locator *oloc,
> +		       char *lock_name, u8 *type, char **tag,
> +		       struct ceph_locker **lockers, u32 *num_lockers);
> +
>  #endif
> diff --git a/net/ceph/cls_lock_client.c b/net/ceph/cls_lock_client.c
> index 2a314537f958..50f040fdb2a9 100644
> --- a/net/ceph/cls_lock_client.c
> +++ b/net/ceph/cls_lock_client.c
> @@ -178,3 +178,148 @@ int ceph_cls_break_lock(struct ceph_osd_client *osdc,
>  	return ret;
>  }
>  EXPORT_SYMBOL(ceph_cls_break_lock);
> +
> +void ceph_free_lockers(struct ceph_locker *lockers, u32 num_lockers)
> +{
> +	int i;
> +
> +	for (i = 0; i < num_lockers; i++)
> +		kfree(lockers[i].id.cookie);
> +	kfree(lockers);
> +}
> +EXPORT_SYMBOL(ceph_free_lockers);
> +
> +static int decode_locker(void **p, void *end, struct ceph_locker *locker)
> +{
> +	u8 struct_v;
> +	u32 len;
> +	char *s;
> +	int ret;
> +
> +	ret = ceph_start_decoding(p, end, 1, "locker_id_t", &struct_v, &len);
> +	if (ret)
> +		return ret;
> +
> +	ceph_decode_copy(p, &locker->id.name, sizeof(locker->id.name));
> +	s = ceph_extract_encoded_string(p, end, NULL, GFP_NOIO);
> +	if (IS_ERR(s))
> +		return PTR_ERR(s);
> +
> +	locker->id.cookie = s;
> +
> +	ret = ceph_start_decoding(p, end, 1, "locker_info_t", &struct_v, &len);
> +	if (ret)
> +		return ret;
> +
> +	*p += sizeof(struct ceph_timespec); /* skip expiration */
> +	ceph_decode_copy(p, &locker->info.addr, sizeof(locker->info.addr));
> +	ceph_decode_addr(&locker->info.addr);
> +	len = ceph_decode_32(p);
> +	*p += len; /* skip description */
> +
> +	dout("%s %s%llu cookie %s addr %s\n", __func__,
> +	     ENTITY_NAME(locker->id.name), locker->id.cookie,
> +	     ceph_pr_addr(&locker->info.addr.in_addr));
> +	return 0;
> +}
> +
> +static int decode_lockers(void **p, void *end, u8 *type, char **tag,
> +			  struct ceph_locker **lockers, u32 *num_lockers)
> +{
> +	u8 struct_v;
> +	u32 struct_len;
> +	char *s;
> +	int i;
> +	int ret;
> +
> +	ret = ceph_start_decoding(p, end, 1, "cls_lock_get_info_reply",
> +				  &struct_v, &struct_len);
> +	if (ret)
> +		return ret;
> +
> +	*num_lockers = ceph_decode_32(p);
> +	*lockers = kcalloc(*num_lockers, sizeof(**lockers), GFP_NOIO);
> +	if (!*lockers)
> +		return -ENOMEM;
> +
> +	for (i = 0; i < *num_lockers; i++) {
> +		ret = decode_locker(p, end, *lockers + i);
> +		if (ret)
> +			goto err_free_lockers;
> +	}
> +
> +	*type = ceph_decode_8(p);
> +	s = ceph_extract_encoded_string(p, end, NULL, GFP_NOIO);
> +	if (IS_ERR(s)) {
> +		ret = PTR_ERR(s);
> +		goto err_free_lockers;
> +	}
> +
> +	*tag = s;
> +	return 0;
> +
> +err_free_lockers:
> +	ceph_free_lockers(*lockers, *num_lockers);
> +	return ret;
> +}
> +
> +/*
> + * On success, the caller is responsible for:
> + *
> + *     kfree(tag);
> + *     ceph_free_lockers(lockers, num_lockers);
> + */
> +int ceph_cls_lock_info(struct ceph_osd_client *osdc,
> +		       struct ceph_object_id *oid,
> +		       struct ceph_object_locator *oloc,
> +		       char *lock_name, u8 *type, char **tag,
> +		       struct ceph_locker **lockers, u32 *num_lockers)
> +{
> +	int get_info_op_buf_size;
> +	int name_len = strlen(lock_name);
> +	struct page *get_info_op_page, *reply_page;
> +	size_t reply_len;
> +	void *p, *end;
> +	int ret;
> +
> +	get_info_op_buf_size = name_len + sizeof(__le32) +
> +			       CEPH_ENCODING_START_BLK_LEN;
> +	if (get_info_op_buf_size > PAGE_SIZE)
> +		return -E2BIG;
> +
> +	get_info_op_page = alloc_page(GFP_NOIO);
> +	if (!get_info_op_page)
> +		return -ENOMEM;
> +
> +	reply_page = alloc_page(GFP_NOIO);
> +	if (!reply_page) {
> +		__free_page(get_info_op_page);
> +		return -ENOMEM;
> +	}
> +
> +	p = page_address(get_info_op_page);
> +	end = p + get_info_op_buf_size;
> +
> +	/* encode cls_lock_get_info_op struct */
> +	ceph_start_encoding(&p, 1, 1,
> +			    get_info_op_buf_size - CEPH_ENCODING_START_BLK_LEN);
> +	ceph_encode_string(&p, end, lock_name, name_len);
> +
> +	dout("%s lock_name %s\n", __func__, lock_name);
> +	ret = ceph_osdc_call(osdc, oid, oloc, "lock", "get_info",
> +			     CEPH_OSD_FLAG_READ, get_info_op_page,
> +			     get_info_op_buf_size, reply_page, &reply_len);
> +
> +	dout("%s: status %d\n", __func__, ret);
> +	if (ret >= 0) {
> +		p = page_address(reply_page);
> +		end = p + reply_len;
> +
> +		ret = decode_lockers(&p, end, type, tag, lockers, num_lockers);
> +	}
> +
> +	__free_page(get_info_op_page);
> +	__free_page(reply_page);
> +	return ret;
> +}
> +EXPORT_SYMBOL(ceph_cls_lock_info);
> 


^ permalink raw reply	[flat|nested] 29+ messages in thread

* Re: [PATCH 06/16] libceph: support for blacklisting clients
  2016-08-24 13:18 ` [PATCH 06/16] libceph: support for blacklisting clients Ilya Dryomov
@ 2016-08-24 19:59   ` Alex Elder
  0 siblings, 0 replies; 29+ messages in thread
From: Alex Elder @ 2016-08-24 19:59 UTC (permalink / raw)
  To: Ilya Dryomov, ceph-devel

On 08/24/2016 08:18 AM, Ilya Dryomov wrote:
> From: Douglas Fuller <dfuller@redhat.com>
> 
> Reuse ceph_mon_generic_request infrastructure for sending monitor
> commands.  In particular, add support for 'blacklist add' to prevent
> other, non-responsive clients from making further updates.
> 
> Signed-off-by: Douglas Fuller <dfuller@redhat.com>
> [idryomov@gmail.com: refactor, misc fixes throughout]
> Signed-off-by: Ilya Dryomov <idryomov@gmail.com>

Looks OK to me.

Reviewed-by: Alex Elder <elder@linaro.org>

> ---
>  include/linux/ceph/ceph_fs.h    | 11 ++++++
>  include/linux/ceph/mon_client.h |  3 ++
>  net/ceph/mon_client.c           | 82 +++++++++++++++++++++++++++++++++++++++++
>  3 files changed, 96 insertions(+)
> 
> diff --git a/include/linux/ceph/ceph_fs.h b/include/linux/ceph/ceph_fs.h
> index 7868d602c0a0..c086e63dcee1 100644
> --- a/include/linux/ceph/ceph_fs.h
> +++ b/include/linux/ceph/ceph_fs.h
> @@ -138,6 +138,9 @@ struct ceph_dir_layout {
>  #define CEPH_MSG_POOLOP_REPLY           48
>  #define CEPH_MSG_POOLOP                 49
>  
> +/* mon commands */
> +#define CEPH_MSG_MON_COMMAND            50
> +#define CEPH_MSG_MON_COMMAND_ACK        51
>  
>  /* osd */
>  #define CEPH_MSG_OSD_MAP                41
> @@ -176,6 +179,14 @@ struct ceph_mon_statfs_reply {
>  	struct ceph_statfs st;
>  } __attribute__ ((packed));
>  
> +struct ceph_mon_command {
> +	struct ceph_mon_request_header monhdr;
> +	struct ceph_fsid fsid;
> +	__le32 num_strs;         /* always 1 */
> +	__le32 str_len;
> +	char str[];
> +} __attribute__ ((packed));
> +
>  struct ceph_osd_getmap {
>  	struct ceph_mon_request_header monhdr;
>  	struct ceph_fsid fsid;
> diff --git a/include/linux/ceph/mon_client.h b/include/linux/ceph/mon_client.h
> index 24d704d1ea5c..d5a3ecea578d 100644
> --- a/include/linux/ceph/mon_client.h
> +++ b/include/linux/ceph/mon_client.h
> @@ -141,6 +141,9 @@ int ceph_monc_get_version(struct ceph_mon_client *monc, const char *what,
>  int ceph_monc_get_version_async(struct ceph_mon_client *monc, const char *what,
>  				ceph_monc_callback_t cb, u64 private_data);
>  
> +int ceph_monc_blacklist_add(struct ceph_mon_client *monc,
> +			    struct ceph_entity_addr *client_addr);
> +
>  extern int ceph_monc_open_session(struct ceph_mon_client *monc);
>  
>  extern int ceph_monc_validate_auth(struct ceph_mon_client *monc);
> diff --git a/net/ceph/mon_client.c b/net/ceph/mon_client.c
> index ef34a02719d7..a8effc8b7280 100644
> --- a/net/ceph/mon_client.c
> +++ b/net/ceph/mon_client.c
> @@ -835,6 +835,83 @@ int ceph_monc_get_version_async(struct ceph_mon_client *monc, const char *what,
>  }
>  EXPORT_SYMBOL(ceph_monc_get_version_async);
>  
> +static void handle_command_ack(struct ceph_mon_client *monc,
> +			       struct ceph_msg *msg)
> +{
> +	struct ceph_mon_generic_request *req;
> +	void *p = msg->front.iov_base;
> +	void *const end = p + msg->front_alloc_len;
> +	u64 tid = le64_to_cpu(msg->hdr.tid);
> +
> +	dout("%s msg %p tid %llu\n", __func__, msg, tid);
> +
> +	ceph_decode_need(&p, end, sizeof(struct ceph_mon_request_header) +
> +							    sizeof(u32), bad);
> +	p += sizeof(struct ceph_mon_request_header);
> +
> +	mutex_lock(&monc->mutex);
> +	req = lookup_generic_request(&monc->generic_request_tree, tid);
> +	if (!req) {
> +		mutex_unlock(&monc->mutex);
> +		return;
> +	}
> +
> +	req->result = ceph_decode_32(&p);
> +	__finish_generic_request(req);
> +	mutex_unlock(&monc->mutex);
> +
> +	complete_generic_request(req);
> +	return;
> +
> +bad:
> +	pr_err("corrupt mon_command ack, tid %llu\n", tid);
> +	ceph_msg_dump(msg);
> +}
> +
> +int ceph_monc_blacklist_add(struct ceph_mon_client *monc,
> +			    struct ceph_entity_addr *client_addr)
> +{
> +	struct ceph_mon_generic_request *req;
> +	struct ceph_mon_command *h;
> +	int ret = -ENOMEM;
> +	int len;
> +
> +	req = alloc_generic_request(monc, GFP_NOIO);
> +	if (!req)
> +		goto out;
> +
> +	req->request = ceph_msg_new(CEPH_MSG_MON_COMMAND, 256, GFP_NOIO, true);
> +	if (!req->request)
> +		goto out;
> +
> +	req->reply = ceph_msg_new(CEPH_MSG_MON_COMMAND_ACK, 512, GFP_NOIO,
> +				  true);
> +	if (!req->reply)
> +		goto out;
> +
> +	mutex_lock(&monc->mutex);
> +	register_generic_request(req);
> +	h = req->request->front.iov_base;
> +	h->monhdr.have_version = 0;
> +	h->monhdr.session_mon = cpu_to_le16(-1);
> +	h->monhdr.session_mon_tid = 0;
> +	h->fsid = monc->monmap->fsid;
> +	h->num_strs = cpu_to_le32(1);
> +	len = sprintf(h->str, "{ \"prefix\": \"osd blacklist\", \
> +		                 \"blacklistop\": \"add\", \
> +				 \"addr\": \"%pISpc/%u\" }",
> +		      &client_addr->in_addr, le32_to_cpu(client_addr->nonce));
> +	h->str_len = cpu_to_le32(len);
> +	send_generic_request(monc, req);
> +	mutex_unlock(&monc->mutex);
> +
> +	ret = wait_generic_request(req);
> +out:
> +	put_generic_request(req);
> +	return ret;
> +}
> +EXPORT_SYMBOL(ceph_monc_blacklist_add);
> +
>  /*
>   * Resend pending generic requests.
>   */
> @@ -1139,6 +1216,10 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
>  		handle_get_version_reply(monc, msg);
>  		break;
>  
> +	case CEPH_MSG_MON_COMMAND_ACK:
> +		handle_command_ack(monc, msg);
> +		break;
> +
>  	case CEPH_MSG_MON_MAP:
>  		ceph_monc_handle_map(monc, msg);
>  		break;
> @@ -1178,6 +1259,7 @@ static struct ceph_msg *mon_alloc_msg(struct ceph_connection *con,
>  		m = ceph_msg_get(monc->m_subscribe_ack);
>  		break;
>  	case CEPH_MSG_STATFS_REPLY:
> +	case CEPH_MSG_MON_COMMAND_ACK:
>  		return get_generic_reply(con, hdr, skip);
>  	case CEPH_MSG_AUTH_REPLY:
>  		m = ceph_msg_get(monc->m_auth_reply);
> 


^ permalink raw reply	[flat|nested] 29+ messages in thread

* Re: [PATCH 07/16] libceph: rename ceph_client_id() -> ceph_client_gid()
  2016-08-24 13:18 ` [PATCH 07/16] libceph: rename ceph_client_id() -> ceph_client_gid() Ilya Dryomov
@ 2016-08-24 20:00   ` Alex Elder
  2016-08-24 20:56     ` Ilya Dryomov
  0 siblings, 1 reply; 29+ messages in thread
From: Alex Elder @ 2016-08-24 20:00 UTC (permalink / raw)
  To: Ilya Dryomov, ceph-devel


Why?  There must be at least a one-line explanation.

On 08/24/2016 08:18 AM, Ilya Dryomov wrote:
> Signed-off-by: Ilya Dryomov <idryomov@gmail.com>

Looks good.

Reviewed-by: Alex Elder <elder@linaro.org>

> ---
>  drivers/block/rbd.c          | 2 +-
>  include/linux/ceph/libceph.h | 2 +-
>  net/ceph/ceph_common.c       | 7 ++++---
>  3 files changed, 6 insertions(+), 5 deletions(-)
> 
> diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
> index 6c6519f6492a..e0585e9040f1 100644
> --- a/drivers/block/rbd.c
> +++ b/drivers/block/rbd.c
> @@ -3758,7 +3758,7 @@ static ssize_t rbd_client_id_show(struct device *dev,
>  	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
>  
>  	return sprintf(buf, "client%lld\n",
> -			ceph_client_id(rbd_dev->rbd_client->client));
> +		       ceph_client_gid(rbd_dev->rbd_client->client));
>  }
>  
>  static ssize_t rbd_pool_show(struct device *dev,
> diff --git a/include/linux/ceph/libceph.h b/include/linux/ceph/libceph.h
> index 83fc1fff7061..b4cffff70e44 100644
> --- a/include/linux/ceph/libceph.h
> +++ b/include/linux/ceph/libceph.h
> @@ -264,7 +264,7 @@ extern struct ceph_client *ceph_create_client(struct ceph_options *opt,
>  					      void *private,
>  					      u64 supported_features,
>  					      u64 required_features);
> -extern u64 ceph_client_id(struct ceph_client *client);
> +u64 ceph_client_gid(struct ceph_client *client);
>  extern void ceph_destroy_client(struct ceph_client *client);
>  extern int __ceph_open_session(struct ceph_client *client,
>  			       unsigned long started);
> diff --git a/net/ceph/ceph_common.c b/net/ceph/ceph_common.c
> index bddfcf6f09c2..8a7921767308 100644
> --- a/net/ceph/ceph_common.c
> +++ b/net/ceph/ceph_common.c
> @@ -566,11 +566,11 @@ int ceph_print_client_options(struct seq_file *m, struct ceph_client *client)
>  }
>  EXPORT_SYMBOL(ceph_print_client_options);
>  
> -u64 ceph_client_id(struct ceph_client *client)
> +u64 ceph_client_gid(struct ceph_client *client)
>  {
>  	return client->monc.auth->global_id;
>  }
> -EXPORT_SYMBOL(ceph_client_id);
> +EXPORT_SYMBOL(ceph_client_gid);
>  
>  /*
>   * create a fresh client instance
> @@ -685,7 +685,8 @@ int __ceph_open_session(struct ceph_client *client, unsigned long started)
>  			return client->auth_err;
>  	}
>  
> -	pr_info("client%llu fsid %pU\n", ceph_client_id(client), &client->fsid);
> +	pr_info("client%llu fsid %pU\n", ceph_client_gid(client),
> +		&client->fsid);
>  	ceph_debugfs_client_init(client);
>  
>  	return 0;
> 


^ permalink raw reply	[flat|nested] 29+ messages in thread

* Re: [PATCH 01/16] libceph: rename ceph_entity_name_encode() -> ceph_auth_entity_name_encode()
  2016-08-24 18:49   ` Alex Elder
@ 2016-08-24 20:15     ` Ilya Dryomov
  0 siblings, 0 replies; 29+ messages in thread
From: Ilya Dryomov @ 2016-08-24 20:15 UTC (permalink / raw)
  To: Alex Elder; +Cc: Ceph Development

On Wed, Aug 24, 2016 at 8:49 PM, Alex Elder <elder@ieee.org> wrote:
> On 08/24/2016 08:18 AM, Ilya Dryomov wrote:
>> Clear up EntityName vs entity_name_t confusion.
>
> If I understand what your code does, you're using
> "auth_entity_name" to represent whatever "EntityName"
> is.  (At this point I'm just looking at the code; I
> haven't gone to look at what an entity_name_t or
> EntitnyName are...)

Correct.  Those are two different types: EntityName is integer type
(osd, mds, etc) + string id ("0", "a", etc), while entity_name_t is
integer type + integer id.  One of Doug's patches indicated that he was
confused about the two and it took me a moment to disambiguate so
I went ahead and renamed the function.

Thanks,

                Ilya

^ permalink raw reply	[flat|nested] 29+ messages in thread

* Re: [PATCH 02/16] libceph: support for CEPH_OSD_OP_LIST_WATCHERS
  2016-08-24 19:29   ` Alex Elder
@ 2016-08-24 20:43     ` Ilya Dryomov
  0 siblings, 0 replies; 29+ messages in thread
From: Ilya Dryomov @ 2016-08-24 20:43 UTC (permalink / raw)
  To: Alex Elder; +Cc: Ceph Development

On Wed, Aug 24, 2016 at 9:29 PM, Alex Elder <elder@ieee.org> wrote:
> On 08/24/2016 08:18 AM, Ilya Dryomov wrote:
>> From: Douglas Fuller <dfuller@redhat.com>
>>
>> Add support for this Ceph OSD op, needed to support the RBD exclusive
>> lock feature.
>
> It would be nice to provide a short description of the op, or
> maybe a reference to something that explains what it is for.

Well, it's kind of self-explanatory, especially if you look at the
signature of ceph_osdc_list_watchers(): it returns a list of watchers
(watchers, num_watchers) for a given RADOS object (oid, oloc).

>
> I have a couple comments below.  I'm continuing to review the
> code that I see without having a solid knowledge of how
> things are supposed to work and what the other end does.
>
> I do have suggestions, but I don't really see anything
> wrong with this code.
>
> Reviewed-by: Alex Elder <elder@linaro.org>
>
>> Signed-off-by: Douglas Fuller <dfuller@redhat.com>
>> [idryomov@gmail.com: refactor, misc fixes throughout]
>> Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
>> ---
>>  include/linux/ceph/osd_client.h |  15 +++++-
>>  net/ceph/osd_client.c           | 117 ++++++++++++++++++++++++++++++++++++++++
>>  2 files changed, 131 insertions(+), 1 deletion(-)
>>
>> diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h
>> index 858932304260..19821a191732 100644
>> --- a/include/linux/ceph/osd_client.h
>> +++ b/include/linux/ceph/osd_client.h
>> @@ -121,6 +121,9 @@ struct ceph_osd_req_op {
>>                       struct ceph_osd_data response_data;
>>               } notify;
>>               struct {
>> +                     struct ceph_osd_data response_data;
>> +             } list_watchers;
>> +             struct {
>>                       u64 expected_object_size;
>>                       u64 expected_write_size;
>>               } alloc_hint;
>> @@ -249,6 +252,12 @@ struct ceph_osd_linger_request {
>>       size_t *preply_len;
>>  };
>>
>> +struct ceph_watch_item {
>> +     struct ceph_entity_name name;
>> +     u64 cookie;
>> +     struct ceph_entity_addr addr;
>> +};
>> +
>>  struct ceph_osd_client {
>>       struct ceph_client     *client;
>>
>> @@ -346,7 +355,6 @@ extern void osd_req_op_cls_response_data_pages(struct ceph_osd_request *,
>>                                       struct page **pages, u64 length,
>>                                       u32 alignment, bool pages_from_pool,
>>                                       bool own_pages);
>> -
>>  extern void osd_req_op_cls_init(struct ceph_osd_request *osd_req,
>>                                       unsigned int which, u16 opcode,
>>                                       const char *class, const char *method);
>> @@ -434,5 +442,10 @@ int ceph_osdc_notify(struct ceph_osd_client *osdc,
>>                    size_t *preply_len);
>>  int ceph_osdc_watch_check(struct ceph_osd_client *osdc,
>>                         struct ceph_osd_linger_request *lreq);
>> +int ceph_osdc_list_watchers(struct ceph_osd_client *osdc,
>> +                         struct ceph_object_id *oid,
>> +                         struct ceph_object_locator *oloc,
>> +                         struct ceph_watch_item **watchers,
>> +                         u32 *num_watchers);
>>  #endif
>>
>> diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
>> index a97e7b506612..dd51ec8ce97f 100644
>> --- a/net/ceph/osd_client.c
>> +++ b/net/ceph/osd_client.c
>> @@ -338,6 +338,9 @@ static void osd_req_op_data_release(struct ceph_osd_request *osd_req,
>>               ceph_osd_data_release(&op->notify.request_data);
>>               ceph_osd_data_release(&op->notify.response_data);
>>               break;
>> +     case CEPH_OSD_OP_LIST_WATCHERS:
>> +             ceph_osd_data_release(&op->list_watchers.response_data);
>> +             break;
>>       default:
>>               break;
>>       }
>> @@ -863,6 +866,8 @@ static u32 osd_req_encode_op(struct ceph_osd_op *dst,
>>       case CEPH_OSD_OP_NOTIFY:
>>               dst->notify.cookie = cpu_to_le64(src->notify.cookie);
>>               break;
>> +     case CEPH_OSD_OP_LIST_WATCHERS:
>> +             break;
>>       case CEPH_OSD_OP_SETALLOCHINT:
>>               dst->alloc_hint.expected_object_size =
>>                   cpu_to_le64(src->alloc_hint.expected_object_size);
>> @@ -1445,6 +1450,10 @@ static void setup_request_data(struct ceph_osd_request *req,
>>                       ceph_osdc_msg_data_add(req->r_reply,
>>                                              &op->extent.osd_data);
>>                       break;
>> +             case CEPH_OSD_OP_LIST_WATCHERS:
>> +                     ceph_osdc_msg_data_add(req->r_reply,
>> +                                            &op->list_watchers.response_data);
>> +                     break;
>>
>>               /* both */
>>               case CEPH_OSD_OP_CALL:
>> @@ -3891,6 +3900,114 @@ int ceph_osdc_watch_check(struct ceph_osd_client *osdc,
>>       return ret;
>>  }
>>
>> +static int decode_watcher(void **p, void *end, struct ceph_watch_item *item)
>> +{
>> +     u8 struct_v;
>> +     u32 struct_len;
>> +     int ret;
>> +
>> +     ret = ceph_start_decoding(p, end, 2, "watch_item_t",
>> +                               &struct_v, &struct_len);
>> +     if (ret)
>> +             return ret;
>> +
>> +     ceph_decode_copy(p, &item->name, sizeof(item->name));
>> +     item->cookie = ceph_decode_64(p);
>> +     *p += 4; /* skip timeout_seconds */
>> +     if (struct_v >= 2) {
>> +             ceph_decode_copy(p, &item->addr, sizeof(item->addr));
>> +             ceph_decode_addr(&item->addr);
>
> ceph_decode_addr() is a little ugly in how it swaps between
> wire (little-endian) byte order and native byte order in the

It doesn't matter here, but wire is big-endian in this case.

> same underlying sockaddr_storage structure.  Why isn't
> ceph_decode_addr() more like, for example, ceph_decode_timespec()?

I suppose because only the address family field needs to be swapped.
It goes way back - you would need to ask Sage ;)

>
>
>> +     }
>> +
>> +     dout("%s %s%llu cookie %llu addr %s\n", __func__,
>> +          ENTITY_NAME(item->name), item->cookie,
>> +          ceph_pr_addr(&item->addr.in_addr));
>> +     return 0;
>> +}
>> +
>> +static int decode_watchers(void **p, void *end,
>> +                        struct ceph_watch_item **watchers,
>> +                        u32 *num_watchers)
>> +{
>> +     u8 struct_v;
>> +     u32 struct_len;
>> +     int i;
>> +     int ret;
>> +
>> +     ret = ceph_start_decoding(p, end, 1, "obj_list_watch_response_t",
>> +                               &struct_v, &struct_len);
>> +     if (ret)
>> +             return ret;
>> +
>> +     *num_watchers = ceph_decode_32(p);
>> +     *watchers = kcalloc(*num_watchers, sizeof(**watchers), GFP_NOIO);
>> +     if (!*watchers)
>> +             return -ENOMEM;
>> +
>> +     for (i = 0; i < *num_watchers; i++) {
>> +             ret = decode_watcher(p, end, *watchers + i);
>> +             if (ret) {
>> +                     kfree(*watchers);
>
>                         *watchers = NULL;
>
>> +                     return ret;
>> +             }
>> +     }
>> +
>> +     return 0;
>> +}
>> +
>> +/*
>> + * On success, the caller is responsible for:
>> + *
>> + *     kfree(watchers);
>> + */
>> +int ceph_osdc_list_watchers(struct ceph_osd_client *osdc,
>> +                         struct ceph_object_id *oid,
>> +                         struct ceph_object_locator *oloc,
>> +                         struct ceph_watch_item **watchers,
>> +                         u32 *num_watchers)
>> +{
>> +     struct ceph_osd_request *req;
>> +     struct page **pages;
>> +     int ret;
>> +
>> +     req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_NOIO);
>> +     if (!req)
>> +             return -ENOMEM;
>> +
>> +     ceph_oid_copy(&req->r_base_oid, oid);
>> +     ceph_oloc_copy(&req->r_base_oloc, oloc);
>> +     req->r_flags = CEPH_OSD_FLAG_READ;
>> +
>> +     ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
>> +     if (ret)
>> +             goto out_put_req;
>> +
>> +     pages = ceph_alloc_page_vector(1, GFP_NOIO);
>
> Is there anything that guarantees that the number of watchers
> is such that the response will always fit in a single page?
> Will the response contain an error in that case?

The libceph messenger will drop the reply on the floor.  It's an
annoying problem with the current reply buffer preallocation scheme
that pops up all over the codebase, high on my list of things to fix.

On a practical note, I can't think of a use case with more than
a couple of watchers.  This is _exclusive_ lock, after all...

Thanks,

                Ilya

^ permalink raw reply	[flat|nested] 29+ messages in thread

* Re: [PATCH 04/16] libceph: support for advisory locking on RADOS objects
  2016-08-24 19:42   ` Alex Elder
@ 2016-08-24 20:49     ` Ilya Dryomov
  0 siblings, 0 replies; 29+ messages in thread
From: Ilya Dryomov @ 2016-08-24 20:49 UTC (permalink / raw)
  To: Alex Elder; +Cc: Ceph Development

On Wed, Aug 24, 2016 at 9:42 PM, Alex Elder <elder@ieee.org> wrote:
> On 08/24/2016 08:18 AM, Ilya Dryomov wrote:
>> From: Douglas Fuller <dfuller@redhat.com>
>>
>> This patch adds support for rados lock, unlock and break lock.
>>
>> Based heavily on code by Mike Christie <michaelc@cs.wisc.edu>.
>
> I have an unrelated comment on ceph_encode_timespec().  But
> this looks good to me otherwise.
>
> Reviewed-by: Alex Elder <elder@linaro.org>
>
>
>> Signed-off-by: Douglas Fuller <dfuller@redhat.com>
>> Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
>> ---
>>  include/linux/ceph/cls_lock_client.h |  27 ++++++
>>  net/ceph/Makefile                    |   1 +
>>  net/ceph/cls_lock_client.c           | 180 +++++++++++++++++++++++++++++++++++
>>  3 files changed, 208 insertions(+)
>>  create mode 100644 include/linux/ceph/cls_lock_client.h
>>  create mode 100644 net/ceph/cls_lock_client.c
>>
>> diff --git a/include/linux/ceph/cls_lock_client.h b/include/linux/ceph/cls_lock_client.h
>> new file mode 100644
>> index 000000000000..4e4dffef22bb
>> --- /dev/null
>> +++ b/include/linux/ceph/cls_lock_client.h
>> @@ -0,0 +1,27 @@
>> +#ifndef _LINUX_CEPH_CLS_LOCK_CLIENT_H
>> +#define _LINUX_CEPH_CLS_LOCK_CLIENT_H
>> +
>> +#include <linux/ceph/osd_client.h>
>> +
>> +enum ceph_cls_lock_type {
>> +     CEPH_CLS_LOCK_NONE = 0,
>> +     CEPH_CLS_LOCK_EXCLUSIVE = 1,
>> +     CEPH_CLS_LOCK_SHARED = 2,
>> +};
>> +
>> +int ceph_cls_lock(struct ceph_osd_client *osdc,
>> +               struct ceph_object_id *oid,
>> +               struct ceph_object_locator *oloc,
>> +               char *lock_name, u8 type, char *cookie,
>> +               char *tag, char *desc, u8 flags);
>> +int ceph_cls_unlock(struct ceph_osd_client *osdc,
>> +                 struct ceph_object_id *oid,
>> +                 struct ceph_object_locator *oloc,
>> +                 char *lock_name, char *cookie);
>> +int ceph_cls_break_lock(struct ceph_osd_client *osdc,
>> +                     struct ceph_object_id *oid,
>> +                     struct ceph_object_locator *oloc,
>> +                     char *lock_name, char *cookie,
>> +                     struct ceph_entity_name *locker);
>> +
>> +#endif
>> diff --git a/net/ceph/Makefile b/net/ceph/Makefile
>> index 84cbed630c4b..6a5180903e7b 100644
>> --- a/net/ceph/Makefile
>> +++ b/net/ceph/Makefile
>> @@ -5,6 +5,7 @@ obj-$(CONFIG_CEPH_LIB) += libceph.o
>>
>>  libceph-y := ceph_common.o messenger.o msgpool.o buffer.o pagelist.o \
>>       mon_client.o \
>> +     cls_lock_client.o \
>>       osd_client.o osdmap.o crush/crush.o crush/mapper.o crush/hash.o \
>>       debugfs.o \
>>       auth.o auth_none.o \
>> diff --git a/net/ceph/cls_lock_client.c b/net/ceph/cls_lock_client.c
>> new file mode 100644
>> index 000000000000..2a314537f958
>> --- /dev/null
>> +++ b/net/ceph/cls_lock_client.c
>> @@ -0,0 +1,180 @@
>> +#include <linux/ceph/ceph_debug.h>
>> +
>> +#include <linux/types.h>
>> +#include <linux/slab.h>
>> +
>> +#include <linux/ceph/cls_lock_client.h>
>> +#include <linux/ceph/decode.h>
>> +
>> +/**
>> + * ceph_cls_lock - grab rados lock for object
>> + * @oid, @oloc: object to lock
>> + * @lock_name: the name of the lock
>> + * @type: lock type (CEPH_CLS_LOCK_EXCLUSIVE or CEPH_CLS_LOCK_SHARED)
>> + * @cookie: user-defined identifier for this instance of the lock
>> + * @tag: user-defined tag
>> + * @desc: user-defined lock description
>> + * @flags: lock flags
>> + *
>> + * All operations on the same lock should use the same tag.
>> + */
>> +int ceph_cls_lock(struct ceph_osd_client *osdc,
>> +               struct ceph_object_id *oid,
>> +               struct ceph_object_locator *oloc,
>> +               char *lock_name, u8 type, char *cookie,
>> +               char *tag, char *desc, u8 flags)
>> +{
>> +     int lock_op_buf_size;
>> +     int name_len = strlen(lock_name);
>> +     int cookie_len = strlen(cookie);
>> +     int tag_len = strlen(tag);
>> +     int desc_len = strlen(desc);
>> +     void *p, *end;
>> +     struct page *lock_op_page;
>> +     struct timespec mtime;
>> +     int ret;
>> +
>> +     lock_op_buf_size = name_len + sizeof(__le32) +
>> +                        cookie_len + sizeof(__le32) +
>> +                        tag_len + sizeof(__le32) +
>> +                        desc_len + sizeof(__le32) +
>> +                        sizeof(struct ceph_timespec) +
>> +                        /* flag and type */
>> +                        sizeof(u8) + sizeof(u8) +
>> +                        CEPH_ENCODING_START_BLK_LEN;
>> +     if (lock_op_buf_size > PAGE_SIZE)
>> +             return -E2BIG;
>> +
>> +     lock_op_page = alloc_page(GFP_NOIO);
>> +     if (!lock_op_page)
>> +             return -ENOMEM;
>> +
>> +     p = page_address(lock_op_page);
>> +     end = p + lock_op_buf_size;
>> +
>> +     /* encode cls_lock_lock_op struct */
>> +     ceph_start_encoding(&p, 1, 1,
>> +                         lock_op_buf_size - CEPH_ENCODING_START_BLK_LEN);
>> +     ceph_encode_string(&p, end, lock_name, name_len);
>> +     ceph_encode_8(&p, type);
>> +     ceph_encode_string(&p, end, cookie, cookie_len);
>> +     ceph_encode_string(&p, end, tag, tag_len);
>> +     ceph_encode_string(&p, end, desc, desc_len);
>> +     /* only support infinite duration */
>> +     memset(&mtime, 0, sizeof(mtime));
>> +     ceph_encode_timespec(p, &mtime);
>
> This is another unfortunate encoding function.  It should take
> the address of the pointer, and advance the pointer internally
> like most of the other ceph_encode_*() functions do.

A large number of users encode into another structure and not a p/end
buffer.  Though in general I agree - unifying all encode/decode
functions to use consistent signatures and naming scheme would be
a good cleanup project.

Thanks,

                Ilya

^ permalink raw reply	[flat|nested] 29+ messages in thread

* Re: [PATCH 07/16] libceph: rename ceph_client_id() -> ceph_client_gid()
  2016-08-24 20:00   ` Alex Elder
@ 2016-08-24 20:56     ` Ilya Dryomov
  0 siblings, 0 replies; 29+ messages in thread
From: Ilya Dryomov @ 2016-08-24 20:56 UTC (permalink / raw)
  To: Alex Elder; +Cc: Ceph Development

On Wed, Aug 24, 2016 at 10:00 PM, Alex Elder <elder@ieee.org> wrote:
>
> Why?  There must be at least a one-line explanation.

It's gid / global_id in other places and one of the next commits
introduces a struct rbd_client_id.  I'll update the commit message.

Thanks,

                Ilya

^ permalink raw reply	[flat|nested] 29+ messages in thread

end of thread, other threads:[~2016-08-24 21:25 UTC | newest]

Thread overview: 29+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2016-08-24 13:18 [PATCH 00/16] rbd: support for exclusive-lock + mpath remap bits Ilya Dryomov
2016-08-24 13:18 ` [PATCH 01/16] libceph: rename ceph_entity_name_encode() -> ceph_auth_entity_name_encode() Ilya Dryomov
2016-08-24 18:49   ` Alex Elder
2016-08-24 20:15     ` Ilya Dryomov
2016-08-24 13:18 ` [PATCH 02/16] libceph: support for CEPH_OSD_OP_LIST_WATCHERS Ilya Dryomov
2016-08-24 19:29   ` Alex Elder
2016-08-24 20:43     ` Ilya Dryomov
2016-08-24 13:18 ` [PATCH 03/16] libceph: add ceph_osdc_call() single-page helper Ilya Dryomov
2016-08-24 19:37   ` Alex Elder
2016-08-24 13:18 ` [PATCH 04/16] libceph: support for advisory locking on RADOS objects Ilya Dryomov
2016-08-24 19:42   ` Alex Elder
2016-08-24 20:49     ` Ilya Dryomov
2016-08-24 13:18 ` [PATCH 05/16] libceph: support for lock.lock_info Ilya Dryomov
2016-08-24 19:56   ` Alex Elder
2016-08-24 13:18 ` [PATCH 06/16] libceph: support for blacklisting clients Ilya Dryomov
2016-08-24 19:59   ` Alex Elder
2016-08-24 13:18 ` [PATCH 07/16] libceph: rename ceph_client_id() -> ceph_client_gid() Ilya Dryomov
2016-08-24 20:00   ` Alex Elder
2016-08-24 20:56     ` Ilya Dryomov
2016-08-24 13:18 ` [PATCH 08/16] rbd: introduce a per-device ordered workqueue Ilya Dryomov
2016-08-24 13:18 ` [PATCH 09/16] rbd: retry watch re-registration periodically Ilya Dryomov
2016-08-24 13:18 ` [PATCH 10/16] rbd: support for exclusive-lock feature Ilya Dryomov
2016-08-24 13:18 ` [PATCH 11/16] rbd: print capacity in decimal and features in hex Ilya Dryomov
2016-08-24 13:18 ` [PATCH 12/16] rbd: add 'client_addr' sysfs rbd device attribute Ilya Dryomov
2016-08-24 13:18 ` [PATCH 13/16] rbd: add 'cluster_fsid' " Ilya Dryomov
2016-08-24 13:18 ` [PATCH 14/16] rbd: add 'snap_id' " Ilya Dryomov
2016-08-24 13:18 ` [PATCH 15/16] rbd: add 'config_info' " Ilya Dryomov
2016-08-24 13:18 ` [PATCH 16/16] rbd: add force close option Ilya Dryomov
2016-08-24 18:34 ` [PATCH 00/16] rbd: support for exclusive-lock + mpath remap bits Mike Christie

This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.