All of lore.kernel.org
 help / color / mirror / Atom feed
From: Jeff Layton <jlayton@redhat.com>
To: ceph-devel@vger.kernel.org
Cc: jspray@redhat.com, idryomov@gmail.com, zyan@redhat.com, sage@redhat.com
Subject: [PATCH v1 4/7] ceph: handle new osdmap epoch updates in CLIENT_CAPS and WRITE codepaths
Date: Fri, 20 Jan 2017 10:17:35 -0500	[thread overview]
Message-ID: <20170120151738.9584-5-jlayton@redhat.com> (raw)
In-Reply-To: <20170120151738.9584-1-jlayton@redhat.com>

This patch is heavily inspired by John Spray's earlier work, but
implemented in a different way.

Create and register a new map_cb for cephfs, to allow it to handle
changes to the osdmap.

In the version 5 of CLIENT_CAPS messages, the barrier field is added as an
instruction to clients that they may not use the attached capabilities
until they have a particular OSD map epoch.

When we get a message with such a field and don't have the requisite map
epoch yet, we put that message on a list in the session, to be run when
the map does come in.

When we get a new map update, the map_cb routine first checks to see
whether there may be an OSD or pool full condition. If so, then we walk
the list of OSD calls and kill off any writes to full OSDs or pools with
-ENOSPC.  While cancelling, we store the latest OSD epoch seen in each
request. This will be used later in the CAPRELEASE messages.

Then, it walks the session list and queues the workqueue job for each.
When the workqueue job runs, it walks the list of delayed caps and tries
to rerun each one. If the epoch is still not high enough, they just get
put back on the delay queue for when the map does come in.

Suggested-by: John Spray <john.spray@redhat.com>
Signed-off-by: Jeff Layton <jlayton@redhat.com>
---
 fs/ceph/caps.c       | 43 +++++++++++++++++++++++++++---
 fs/ceph/debugfs.c    |  3 +++
 fs/ceph/mds_client.c | 75 ++++++++++++++++++++++++++++++++++++++++++++++++++++
 fs/ceph/mds_client.h |  3 +++
 4 files changed, 120 insertions(+), 4 deletions(-)

diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index d941c48e8bff..f33d424b5e12 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -1077,7 +1077,7 @@ static int send_cap_msg(struct cap_msg_args *arg)
 	/* inline data size */
 	ceph_encode_32(&p, 0);
 	/* osd_epoch_barrier (version 5) */
-	ceph_encode_32(&p, 0);
+	ceph_encode_32(&p, arg->session->s_mdsc->cap_epoch_barrier);
 	/* oldest_flush_tid (version 6) */
 	ceph_encode_64(&p, arg->oldest_flush_tid);
 
@@ -3577,9 +3577,12 @@ void ceph_handle_caps(struct ceph_mds_session *session,
 	void *snaptrace;
 	size_t snaptrace_len;
 	void *p, *end;
+	u32 epoch_barrier = 0;
 
 	dout("handle_caps from mds%d\n", mds);
 
+	WARN_ON_ONCE(!list_empty(&msg->list_head));
+
 	/* decode */
 	end = msg->front.iov_base + msg->front.iov_len;
 	tid = le64_to_cpu(msg->hdr.tid);
@@ -3625,13 +3628,45 @@ void ceph_handle_caps(struct ceph_mds_session *session,
 		p += inline_len;
 	}
 
+	if (le16_to_cpu(msg->hdr.version) >= 5) {
+		struct ceph_osd_client *osdc = &mdsc->fsc->client->osdc;
+
+		ceph_decode_32_safe(&p, end, epoch_barrier, bad);
+
+		/* Do lockless check first to avoid mutex if we can */
+		if (epoch_barrier > mdsc->cap_epoch_barrier) {
+			mutex_lock(&mdsc->mutex);
+			if (epoch_barrier > mdsc->cap_epoch_barrier)
+				mdsc->cap_epoch_barrier = epoch_barrier;
+			mutex_unlock(&mdsc->mutex);
+		}
+
+		down_read(&osdc->lock);
+		if (osdc->osdmap->epoch < epoch_barrier) {
+			dout("handle_caps delaying message until OSD epoch %d\n", epoch_barrier);
+			ceph_msg_get(msg);
+			spin_lock(&session->s_cap_lock);
+			list_add(&msg->list_head, &session->s_delayed_caps);
+			spin_unlock(&session->s_cap_lock);
+
+			// Kick OSD client to get the latest map
+			__ceph_osdc_maybe_request_map(osdc);
+
+			up_read(&osdc->lock);
+			return;
+		}
+
+		dout("handle_caps barrier %d already satisfied (%d)\n", epoch_barrier, osdc->osdmap->epoch);
+		up_read(&osdc->lock);
+	}
+
+	dout("handle_caps v=%d barrier=%d\n", le16_to_cpu(msg->hdr.version), epoch_barrier);
+
 	if (le16_to_cpu(msg->hdr.version) >= 8) {
 		u64 flush_tid;
 		u32 caller_uid, caller_gid;
-		u32 osd_epoch_barrier;
 		u32 pool_ns_len;
-		/* version >= 5 */
-		ceph_decode_32_safe(&p, end, osd_epoch_barrier, bad);
+
 		/* version >= 6 */
 		ceph_decode_64_safe(&p, end, flush_tid, bad);
 		/* version >= 7 */
diff --git a/fs/ceph/debugfs.c b/fs/ceph/debugfs.c
index 39ff678e567f..825df757fba5 100644
--- a/fs/ceph/debugfs.c
+++ b/fs/ceph/debugfs.c
@@ -172,6 +172,9 @@ static int mds_sessions_show(struct seq_file *s, void *ptr)
 	/* The -o name mount argument */
 	seq_printf(s, "name \"%s\"\n", opt->name ? opt->name : "");
 
+	/* The latest OSD epoch barrier known to this client */
+	seq_printf(s, "osd_epoch_barrier \"%d\"\n", mdsc->cap_epoch_barrier);
+
 	/* The list of MDS session rank+state */
 	for (mds = 0; mds < mdsc->max_sessions; mds++) {
 		struct ceph_mds_session *session =
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index 176512960b14..7055b499c08b 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -393,6 +393,7 @@ void ceph_put_mds_session(struct ceph_mds_session *s)
 	dout("mdsc put_session %p %d -> %d\n", s,
 	     atomic_read(&s->s_ref), atomic_read(&s->s_ref)-1);
 	if (atomic_dec_and_test(&s->s_ref)) {
+		WARN_ON_ONCE(cancel_work_sync(&s->s_delayed_caps_work));
 		if (s->s_auth.authorizer)
 			ceph_auth_destroy_authorizer(s->s_auth.authorizer);
 		kfree(s);
@@ -432,6 +433,74 @@ static int __verify_registered_session(struct ceph_mds_client *mdsc,
 	return 0;
 }
 
+static void handle_osd_map(struct ceph_osd_client *osdc, void *p)
+{
+	struct ceph_mds_client *mdsc = (struct ceph_mds_client*)p;
+	u32 cancelled_epoch = 0;
+	int mds_id;
+
+	lockdep_assert_held(&osdc->lock);
+
+	if ((osdc->osdmap->flags & CEPH_OSDMAP_FULL) ||
+	    ceph_osdc_have_pool_full(osdc))
+		cancelled_epoch = ceph_osdc_complete_writes(osdc, -ENOSPC);
+
+	dout("handle_osd_map: epoch=%d\n", osdc->osdmap->epoch);
+
+	mutex_lock(&mdsc->mutex);
+	if (cancelled_epoch)
+		mdsc->cap_epoch_barrier = max(cancelled_epoch + 1,
+					      mdsc->cap_epoch_barrier);
+
+	/* Schedule the workqueue job for any sessions */
+	for (mds_id = 0; mds_id < mdsc->max_sessions; ++mds_id) {
+		struct ceph_mds_session *session = mdsc->sessions[mds_id];
+		bool empty;
+
+		if (session == NULL)
+			continue;
+
+		/* Any delayed messages? */
+		spin_lock(&session->s_cap_lock);
+		empty = list_empty(&session->s_delayed_caps);
+		spin_unlock(&session->s_cap_lock);
+		if (empty)
+			continue;
+
+		/* take a reference -- if we can't get one, move on */
+		if (!get_session(session))
+			continue;
+
+		/*
+		 * Try to schedule work. If it's already queued, then just
+		 * drop the session reference.
+		 */
+		if (!schedule_work(&session->s_delayed_caps_work))
+			ceph_put_mds_session(session);
+	}
+	mutex_unlock(&mdsc->mutex);
+}
+
+static void
+run_delayed_caps(struct work_struct *work)
+{
+	struct ceph_mds_session *session = container_of(work,
+			struct ceph_mds_session, s_delayed_caps_work);
+	LIST_HEAD(delayed);
+
+	spin_lock(&session->s_cap_lock);
+	list_splice_init(&session->s_delayed_caps, &delayed);
+	spin_unlock(&session->s_cap_lock);
+
+	while (!list_empty(&delayed)) {
+		struct ceph_msg *msg = list_first_entry(&delayed,
+						struct ceph_msg, list_head);
+		list_del_init(&msg->list_head);
+		ceph_handle_caps(session, msg);
+		ceph_msg_put(msg);
+	}
+}
+
 /*
  * create+register a new session for given mds.
  * called under mdsc->mutex.
@@ -469,11 +538,13 @@ static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
 	atomic_set(&s->s_ref, 1);
 	INIT_LIST_HEAD(&s->s_waiting);
 	INIT_LIST_HEAD(&s->s_unsafe);
+	INIT_LIST_HEAD(&s->s_delayed_caps);
 	s->s_num_cap_releases = 0;
 	s->s_cap_reconnect = 0;
 	s->s_cap_iterator = NULL;
 	INIT_LIST_HEAD(&s->s_cap_releases);
 	INIT_LIST_HEAD(&s->s_cap_flushing);
+	INIT_WORK(&s->s_delayed_caps_work, run_delayed_caps);
 
 	dout("register_session mds%d\n", mds);
 	if (mds >= mdsc->max_sessions) {
@@ -3480,6 +3551,10 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc)
 
 	ceph_caps_init(mdsc);
 	ceph_adjust_min_caps(mdsc, fsc->min_caps);
+	mdsc->cap_epoch_barrier = 0;
+
+	ceph_osdc_register_map_cb(&fsc->client->osdc,
+				  handle_osd_map, (void*)mdsc);
 
 	init_rwsem(&mdsc->pool_perm_rwsem);
 	mdsc->pool_perm_tree = RB_ROOT;
diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h
index 3c6f77b7bb02..eb8144ab4995 100644
--- a/fs/ceph/mds_client.h
+++ b/fs/ceph/mds_client.h
@@ -159,6 +159,8 @@ struct ceph_mds_session {
 	atomic_t          s_ref;
 	struct list_head  s_waiting;  /* waiting requests */
 	struct list_head  s_unsafe;   /* unsafe requests */
+	struct list_head	s_delayed_caps;
+	struct work_struct	s_delayed_caps_work;
 };
 
 /*
@@ -331,6 +333,7 @@ struct ceph_mds_client {
 	int               num_cap_flushing; /* # caps we are flushing */
 	spinlock_t        cap_dirty_lock;   /* protects above items */
 	wait_queue_head_t cap_flushing_wq;
+	u32               cap_epoch_barrier;
 
 	/*
 	 * Cap reservations
-- 
2.9.3


  parent reply	other threads:[~2017-01-20 15:17 UTC|newest]

Thread overview: 16+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2017-01-20 15:17 [PATCH v1 0/7] ceph: implement new-style ENOSPC handling in kcephfs Jeff Layton
2017-01-20 15:17 ` [PATCH v1 1/7] libceph: add ceph_osdc_cancel_writes Jeff Layton
2017-01-20 15:17 ` [PATCH v1 2/7] libceph: rename and export have_pool_full Jeff Layton
2017-01-20 15:17 ` [PATCH v1 3/7] libceph: rename and export maybe_request_map Jeff Layton
2017-01-20 15:17 ` Jeff Layton [this message]
2017-01-22  9:40   ` [PATCH v1 4/7] ceph: handle new osdmap epoch updates in CLIENT_CAPS and WRITE codepaths Yan, Zheng
2017-01-22 15:38     ` Jeff Layton
2017-01-23  1:38       ` Yan, Zheng
2017-02-01 19:50     ` Jeff Layton
2017-02-01 19:55       ` John Spray
2017-02-01 20:55         ` Jeff Layton
2017-02-02 16:07         ` Jeff Layton
2017-02-02 16:35           ` John Spray
2017-01-20 15:17 ` [PATCH v1 5/7] ceph: update CAPRELEASE message format Jeff Layton
2017-01-20 15:17 ` [PATCH v1 6/7] ceph: clean out delayed caps when destroying session Jeff Layton
2017-01-20 15:17 ` [PATCH v1 7/7] libceph: allow requests to return immediately on full conditions if caller wishes Jeff Layton

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20170120151738.9584-5-jlayton@redhat.com \
    --to=jlayton@redhat.com \
    --cc=ceph-devel@vger.kernel.org \
    --cc=idryomov@gmail.com \
    --cc=jspray@redhat.com \
    --cc=sage@redhat.com \
    --cc=zyan@redhat.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.