All of lore.kernel.org
 help / color / mirror / Atom feed
From: James Simmons <jsimmons@infradead.org>
To: Andreas Dilger <adilger@whamcloud.com>,
	Oleg Drokin <green@whamcloud.com>, NeilBrown <neilb@suse.de>
Cc: Lustre Development List <lustre-devel@lists.lustre.org>
Subject: [lustre-devel] [PATCH 07/25] lustre: llite: revert 'simplify callback handling for async getattr'
Date: Mon,  2 Aug 2021 15:50:27 -0400	[thread overview]
Message-ID: <1627933851-7603-8-git-send-email-jsimmons@infradead.org> (raw)
In-Reply-To: <1627933851-7603-1-git-send-email-jsimmons@infradead.org>

From: Andreas Dilger <adilger@whamcloud.com>

This reverts commit 248f68f27de7d18c58a44114a46259141ca53115.

This is causing process hangs and timeouts during file removal.

Fixes: 248f68f27d ("lustre: llite: simplify callback handling for async getattr")
WC-bug-id: https://jira.whamcloud.com/browse/LU-14868
Lustre-commit: e90794af4bfac3a5 ("U-14868 llite: revert 'simplify callback handling for async getattr'")
Reviewed-on: https://review.whamcloud.com/44371
Reviewed-by: Andreas Dilger <adilger@whamcloud.com>
Signed-off-by: James Simmons <jsimmons@infradead.org>
---
 fs/lustre/include/obd.h          |  32 ++--
 fs/lustre/include/obd_class.h    |   4 +-
 fs/lustre/llite/llite_internal.h |   7 +-
 fs/lustre/llite/statahead.c      | 319 ++++++++++++++++++++++++++-------------
 fs/lustre/lmv/lmv_obd.c          |   6 +-
 fs/lustre/mdc/mdc_internal.h     |   3 +-
 fs/lustre/mdc/mdc_locks.c        |  31 ++--
 7 files changed, 252 insertions(+), 150 deletions(-)

diff --git a/fs/lustre/include/obd.h b/fs/lustre/include/obd.h
index eeb6262..f619342 100644
--- a/fs/lustre/include/obd.h
+++ b/fs/lustre/include/obd.h
@@ -818,24 +818,18 @@ struct md_callback {
 			       void *data, int flag);
 };
 
-enum md_opcode {
-	MD_OP_NONE	= 0,
-	MD_OP_GETATTR	= 1,
-	MD_OP_MAX,
-};
-
-struct md_op_item {
-	enum md_opcode			mop_opc;
-	struct md_op_data		mop_data;
-	struct lookup_intent		mop_it;
-	struct lustre_handle		mop_lockh;
-	struct ldlm_enqueue_info	mop_einfo;
-	int (*mop_cb)(struct req_capsule *pill,
-		      struct md_op_item *item,
-		      int rc);
-	void			       *mop_cbdata;
-	struct inode		       *mop_dir;
-	u64				mop_lock_flags;
+struct md_enqueue_info;
+/* metadata stat-ahead */
+
+struct md_enqueue_info {
+	struct md_op_data		mi_data;
+	struct lookup_intent		mi_it;
+	struct lustre_handle		mi_lockh;
+	struct inode		       *mi_dir;
+	struct ldlm_enqueue_info	mi_einfo;
+	int (*mi_cb)(struct ptlrpc_request *req,
+		     struct md_enqueue_info *minfo, int rc);
+	void			       *mi_cbdata;
 };
 
 struct obd_ops {
@@ -1067,7 +1061,7 @@ struct md_ops {
 				struct lu_fid *fid);
 
 	int (*intent_getattr_async)(struct obd_export *exp,
-				    struct md_op_item *item);
+				    struct md_enqueue_info *minfo);
 
 	int (*revalidate_lock)(struct obd_export *, struct lookup_intent *,
 			       struct lu_fid *, u64 *bits);
diff --git a/fs/lustre/include/obd_class.h b/fs/lustre/include/obd_class.h
index ad9b2fc..f2a3d2b 100644
--- a/fs/lustre/include/obd_class.h
+++ b/fs/lustre/include/obd_class.h
@@ -1594,7 +1594,7 @@ static inline int md_init_ea_size(struct obd_export *exp, u32 easize,
 }
 
 static inline int md_intent_getattr_async(struct obd_export *exp,
-					  struct md_op_item *item)
+					  struct md_enqueue_info *minfo)
 {
 	int rc;
 
@@ -1605,7 +1605,7 @@ static inline int md_intent_getattr_async(struct obd_export *exp,
 	lprocfs_counter_incr(exp->exp_obd->obd_md_stats,
 			     LPROC_MD_INTENT_GETATTR_ASYNC);
 
-	return MDP(exp->exp_obd, intent_getattr_async)(exp, item);
+	return MDP(exp->exp_obd, intent_getattr_async)(exp, minfo);
 }
 
 static inline int md_revalidate_lock(struct obd_export *exp,
diff --git a/fs/lustre/llite/llite_internal.h b/fs/lustre/llite/llite_internal.h
index 6cae741..2247806 100644
--- a/fs/lustre/llite/llite_internal.h
+++ b/fs/lustre/llite/llite_internal.h
@@ -1480,12 +1480,17 @@ struct ll_statahead_info {
 					     * is not a hidden one
 					     */
 	unsigned int	    sai_skip_hidden;/* skipped hidden dentry count */
-	unsigned int	    sai_ls_all:1;   /* "ls -al", do stat-ahead for
+	unsigned int	    sai_ls_all:1,   /* "ls -al", do stat-ahead for
 					     * hidden entries
 					     */
+				sai_in_readpage:1;/* statahead in readdir() */
 	wait_queue_head_t	sai_waitq;      /* stat-ahead wait queue */
 	struct task_struct     *sai_task;       /* stat-ahead thread */
 	struct task_struct     *sai_agl_task;   /* AGL thread */
+	struct list_head	sai_interim_entries; /* entries which got async
+						      * stat reply, but not
+						      * instantiated
+						      */
 	struct list_head	sai_entries;	/* completed entries */
 	struct list_head	sai_agls;	/* AGLs to be sent */
 	struct list_head	sai_cache[LL_SA_CACHE_SIZE];
diff --git a/fs/lustre/llite/statahead.c b/fs/lustre/llite/statahead.c
index becd0e1..8930f61 100644
--- a/fs/lustre/llite/statahead.c
+++ b/fs/lustre/llite/statahead.c
@@ -32,6 +32,7 @@
 
 #include <linux/fs.h>
 #include <linux/sched.h>
+#include <linux/kthread.h>
 #include <linux/mm.h>
 #include <linux/highmem.h>
 #include <linux/pagemap.h>
@@ -55,12 +56,13 @@ enum se_stat {
 
 /*
  * sa_entry is not refcounted: statahead thread allocates it and do async stat,
- * and in async stat callback ll_statahead_interpret() will prepare the inode
- * and set lock data in the ptlrpcd context. Then the scanner process will be
- * woken up if this entry is the waiting one, can access and free it.
+ * and in async stat callback ll_statahead_interpret() will add it into
+ * sai_interim_entries, later statahead thread will call sa_handle_callback() to
+ * instantiate entry and move it into sai_entries, and then only scanner process
+ * can access and free it.
  */
 struct sa_entry {
-	/* link into sai_entries */
+	/* link into sai_interim_entries or sai_entries */
 	struct list_head	se_list;
 	/* link into sai hash table locally */
 	struct list_head	se_hash;
@@ -72,6 +74,10 @@ struct sa_entry {
 	enum se_stat		se_state;
 	/* entry size, contains name */
 	int			se_size;
+	/* pointer to async getattr enqueue info */
+	struct md_enqueue_info	*se_minfo;
+	/* pointer to the async getattr request */
+	struct ptlrpc_request	*se_req;
 	/* pointer to the target inode */
 	struct inode		*se_inode;
 	/* entry name */
@@ -131,6 +137,12 @@ static inline int sa_sent_full(struct ll_statahead_info *sai)
 	return atomic_read(&sai->sai_cache_count) >= sai->sai_max;
 }
 
+/* got async stat replies */
+static inline int sa_has_callback(struct ll_statahead_info *sai)
+{
+	return !list_empty(&sai->sai_interim_entries);
+}
+
 static inline int agl_list_empty(struct ll_statahead_info *sai)
 {
 	return list_empty(&sai->sai_agls);
@@ -316,55 +328,55 @@ static void sa_free(struct ll_statahead_info *sai, struct sa_entry *entry)
 }
 
 /* finish async stat RPC arguments */
-static void sa_fini_data(struct md_op_item *item)
+static void sa_fini_data(struct md_enqueue_info *minfo)
 {
-	ll_unlock_md_op_lsm(&item->mop_data);
-	iput(item->mop_dir);
-	kfree(item);
+	ll_unlock_md_op_lsm(&minfo->mi_data);
+	iput(minfo->mi_dir);
+	kfree(minfo);
 }
 
-static int ll_statahead_interpret(struct req_capsule *pill,
-				  struct md_op_item *item, int rc);
+static int ll_statahead_interpret(struct ptlrpc_request *req,
+				  struct md_enqueue_info *minfo, int rc);
 
 /*
  * prepare arguments for async stat RPC.
  */
-static struct md_op_item *
+static struct md_enqueue_info *
 sa_prep_data(struct inode *dir, struct inode *child, struct sa_entry *entry)
 {
-	struct md_op_item *item;
+	struct md_enqueue_info   *minfo;
 	struct ldlm_enqueue_info *einfo;
-	struct md_op_data *op_data;
+	struct md_op_data        *op_data;
 
-	item = kzalloc(sizeof(*item), GFP_NOFS);
-	if (!item)
+	minfo = kzalloc(sizeof(*minfo), GFP_NOFS);
+	if (!minfo)
 		return ERR_PTR(-ENOMEM);
 
-	op_data = ll_prep_md_op_data(&item->mop_data, dir, child,
+	op_data = ll_prep_md_op_data(&minfo->mi_data, dir, child,
 				     entry->se_qstr.name, entry->se_qstr.len, 0,
 				     LUSTRE_OPC_ANY, NULL);
 	if (IS_ERR(op_data)) {
-		kfree(item);
-		return ERR_CAST(item);
+		kfree(minfo);
+		return (struct md_enqueue_info *)op_data;
 	}
 
 	if (!child)
 		op_data->op_fid2 = entry->se_fid;
 
-	item->mop_it.it_op = IT_GETATTR;
-	item->mop_dir = igrab(dir);
-	item->mop_cb = ll_statahead_interpret;
-	item->mop_cbdata = entry;
-
-	einfo = &item->mop_einfo;
-	einfo->ei_type = LDLM_IBITS;
-	einfo->ei_mode = it_to_lock_mode(&item->mop_it);
-	einfo->ei_cb_bl = ll_md_blocking_ast;
-	einfo->ei_cb_cp = ldlm_completion_ast;
-	einfo->ei_cb_gl = NULL;
+	minfo->mi_it.it_op = IT_GETATTR;
+	minfo->mi_dir = igrab(dir);
+	minfo->mi_cb = ll_statahead_interpret;
+	minfo->mi_cbdata = entry;
+
+	einfo = &minfo->mi_einfo;
+	einfo->ei_type   = LDLM_IBITS;
+	einfo->ei_mode   = it_to_lock_mode(&minfo->mi_it);
+	einfo->ei_cb_bl  = ll_md_blocking_ast;
+	einfo->ei_cb_cp  = ldlm_completion_ast;
+	einfo->ei_cb_gl  = NULL;
 	einfo->ei_cbdata = NULL;
 
-	return item;
+	return minfo;
 }
 
 /*
@@ -375,8 +387,22 @@ static int ll_statahead_interpret(struct req_capsule *pill,
 sa_make_ready(struct ll_statahead_info *sai, struct sa_entry *entry, int ret)
 {
 	struct ll_inode_info *lli = ll_i2info(sai->sai_dentry->d_inode);
+	struct md_enqueue_info *minfo = entry->se_minfo;
+	struct ptlrpc_request *req = entry->se_req;
 	bool wakeup;
 
+	/* release resources used in RPC */
+	if (minfo) {
+		entry->se_minfo = NULL;
+		ll_intent_release(&minfo->mi_it);
+		sa_fini_data(minfo);
+	}
+
+	if (req) {
+		entry->se_req = NULL;
+		ptlrpc_req_finished(req);
+	}
+
 	spin_lock(&lli->lli_sa_lock);
 	wakeup = __sa_make_ready(sai, entry, ret);
 	spin_unlock(&lli->lli_sa_lock);
@@ -433,6 +459,7 @@ static struct ll_statahead_info *ll_sai_alloc(struct dentry *dentry)
 	sai->sai_index = 1;
 	init_waitqueue_head(&sai->sai_waitq);
 
+	INIT_LIST_HEAD(&sai->sai_interim_entries);
 	INIT_LIST_HEAD(&sai->sai_entries);
 	INIT_LIST_HEAD(&sai->sai_agls);
 
@@ -495,6 +522,7 @@ static void ll_sai_put(struct ll_statahead_info *sai)
 		LASSERT(sai->sai_task == NULL);
 		LASSERT(sai->sai_agl_task == NULL);
 		LASSERT(sai->sai_sent == sai->sai_replied);
+		LASSERT(!sa_has_callback(sai));
 
 		list_for_each_entry_safe(entry, next, &sai->sai_entries,
 					 se_list)
@@ -585,63 +613,26 @@ static void ll_agl_trigger(struct inode *inode, struct ll_statahead_info *sai)
 }
 
 /*
- * Callback for async stat RPC, this is called in ptlrpcd context. It prepares
- * the inode and set lock data directly in the ptlrpcd context. It will wake up
- * the directory listing process if the dentry is the waiting one.
+ * prepare inode for sa entry, add it into agl list, now sa_entry is ready
+ * to be used by scanner process.
  */
-static int ll_statahead_interpret(struct req_capsule *pill,
-				  struct md_op_item *item, int rc)
+static void sa_instantiate(struct ll_statahead_info *sai,
+			   struct sa_entry *entry)
 {
-	struct lookup_intent *it = &item->mop_it;
-	struct inode *dir = item->mop_dir;
-	struct ll_inode_info *lli = ll_i2info(dir);
-	struct ll_statahead_info *sai = lli->lli_sai;
-	struct sa_entry *entry = (struct sa_entry *)item->mop_cbdata;
-	struct mdt_body	*body;
+	struct inode *dir = sai->sai_dentry->d_inode;
 	struct inode *child;
-	u64 handle = 0;
-
-	if (it_disposition(it, DISP_LOOKUP_NEG))
-		rc = -ENOENT;
-
-	/*
-	 * because statahead thread will wait for all inflight RPC to finish,
-	 * sai should be always valid, no need to refcount
-	 */
-	LASSERT(sai);
-	LASSERT(entry);
-
-	CDEBUG(D_READA, "sa_entry %.*s rc %d\n",
-	       entry->se_qstr.len, entry->se_qstr.name, rc);
-
-	if (rc != 0) {
-		ll_intent_release(it);
-		sa_fini_data(item);
-	} else {
-		/*
-		 * release ibits lock ASAP to avoid deadlock when statahead
-		 * thread enqueues lock on parent in readdir and another
-		 * process enqueues lock on child with parent lock held, eg.
-		 * unlink.
-		 */
-		handle = it->it_lock_handle;
-		ll_intent_drop_lock(it);
-		ll_unlock_md_op_lsm(&item->mop_data);
-	}
-
-	if (rc != 0) {
-		spin_lock(&lli->lli_sa_lock);
-		if (__sa_make_ready(sai, entry, rc))
-			wake_up(&sai->sai_waitq);
-
-		sai->sai_replied++;
-		spin_unlock(&lli->lli_sa_lock);
+	struct md_enqueue_info *minfo;
+	struct lookup_intent *it;
+	struct ptlrpc_request *req;
+	struct mdt_body	*body;
+	int rc = 0;
 
-		return rc;
-	}
+	LASSERT(entry->se_handle != 0);
 
-	entry->se_handle = handle;
-	body = req_capsule_server_get(pill, &RMF_MDT_BODY);
+	minfo = entry->se_minfo;
+	it = &minfo->mi_it;
+	req = entry->se_req;
+	body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
 	if (!body) {
 		rc = -EFAULT;
 		goto out;
@@ -649,7 +640,7 @@ static int ll_statahead_interpret(struct req_capsule *pill,
 
 	child = entry->se_inode;
 	/* revalidate; unlinked and re-created with the same name */
-	if (unlikely(!lu_fid_eq(&item->mop_data.op_fid2, &body->mbo_fid1))) {
+	if (unlikely(!lu_fid_eq(&minfo->mi_data.op_fid2, &body->mbo_fid1))) {
 		if (child) {
 			entry->se_inode = NULL;
 			iput(child);
@@ -666,7 +657,7 @@ static int ll_statahead_interpret(struct req_capsule *pill,
 		goto out;
 	}
 
-	rc = ll_prep_inode(&child, pill, dir->i_sb, it);
+	rc = ll_prep_inode(&child, &req->rq_pill, dir->i_sb, it);
 	if (rc)
 		goto out;
 
@@ -679,18 +670,107 @@ static int ll_statahead_interpret(struct req_capsule *pill,
 
 	if (agl_should_run(sai, child))
 		ll_agl_add(sai, child, entry->se_index);
+
 out:
 	/*
-	 * First it will drop ldlm ibits lock refcount by calling
+	 * sa_make_ready() will drop ldlm ibits lock refcount by calling
 	 * ll_intent_drop_lock() in spite of failures. Do not worry about
 	 * calling ll_intent_drop_lock() more than once.
 	 */
-	ll_intent_release(&item->mop_it);
-	sa_fini_data(item);
 	sa_make_ready(sai, entry, rc);
+}
+
+/* once there are async stat replies, instantiate sa_entry from replies */
+static void sa_handle_callback(struct ll_statahead_info *sai)
+{
+	struct ll_inode_info *lli;
+
+	lli = ll_i2info(sai->sai_dentry->d_inode);
 
 	spin_lock(&lli->lli_sa_lock);
+	while (sa_has_callback(sai)) {
+		struct sa_entry *entry;
+
+		entry = list_first_entry(&sai->sai_interim_entries,
+					 struct sa_entry, se_list);
+		list_del_init(&entry->se_list);
+		spin_unlock(&lli->lli_sa_lock);
+
+		sa_instantiate(sai, entry);
+		spin_lock(&lli->lli_sa_lock);
+	}
+	spin_unlock(&lli->lli_sa_lock);
+}
+
+/*
+ * callback for async stat RPC, because this is called in ptlrpcd context, we
+ * only put sa_entry in sai_interim_entries, and wake up statahead thread to
+ * really prepare inode and instantiate sa_entry later.
+ */
+static int ll_statahead_interpret(struct ptlrpc_request *req,
+				  struct md_enqueue_info *minfo, int rc)
+{
+	struct lookup_intent *it = &minfo->mi_it;
+	struct inode *dir = minfo->mi_dir;
+	struct ll_inode_info *lli = ll_i2info(dir);
+	struct ll_statahead_info *sai = lli->lli_sai;
+	struct sa_entry *entry = (struct sa_entry *)minfo->mi_cbdata;
+	u64 handle = 0;
+
+	if (it_disposition(it, DISP_LOOKUP_NEG))
+		rc = -ENOENT;
+
+	/*
+	 * because statahead thread will wait for all inflight RPC to finish,
+	 * sai should be always valid, no need to refcount
+	 */
+	LASSERT(sai);
+	LASSERT(entry);
+
+	CDEBUG(D_READA, "sa_entry %.*s rc %d\n",
+	       entry->se_qstr.len, entry->se_qstr.name, rc);
+
+	if (rc) {
+		ll_intent_release(it);
+		sa_fini_data(minfo);
+	} else {
+		/*
+		 * release ibits lock ASAP to avoid deadlock when statahead
+		 * thread enqueues lock on parent in readdir and another
+		 * process enqueues lock on child with parent lock held, eg.
+		 * unlink.
+		 */
+		handle = it->it_lock_handle;
+		ll_intent_drop_lock(it);
+		ll_unlock_md_op_lsm(&minfo->mi_data);
+	}
+
+	spin_lock(&lli->lli_sa_lock);
+	if (rc) {
+		if (__sa_make_ready(sai, entry, rc))
+			wake_up(&sai->sai_waitq);
+	} else {
+		int first = 0;
+
+		entry->se_minfo = minfo;
+		entry->se_req = ptlrpc_request_addref(req);
+		/*
+		 * Release the async ibits lock ASAP to avoid deadlock
+		 * when statahead thread tries to enqueue lock on parent
+		 * for readpage and other tries to enqueue lock on child
+		 * with parent's lock held, for example: unlink.
+		 */
+		entry->se_handle = handle;
+		if (!sa_has_callback(sai))
+			first = 1;
+
+		list_add_tail(&entry->se_list, &sai->sai_interim_entries);
+
+		if (first && sai->sai_task)
+			wake_up_process(sai->sai_task);
+	}
 	sai->sai_replied++;
+
 	spin_unlock(&lli->lli_sa_lock);
 
 	return rc;
@@ -699,16 +779,16 @@ static int ll_statahead_interpret(struct req_capsule *pill,
 /* async stat for file not found in dcache */
 static int sa_lookup(struct inode *dir, struct sa_entry *entry)
 {
-	struct md_op_item *item;
+	struct md_enqueue_info *minfo;
 	int rc;
 
-	item = sa_prep_data(dir, NULL, entry);
-	if (IS_ERR(item))
-		return PTR_ERR(item);
+	minfo = sa_prep_data(dir, NULL, entry);
+	if (IS_ERR(minfo))
+		return PTR_ERR(minfo);
 
-	rc = md_intent_getattr_async(ll_i2mdexp(dir), item);
+	rc = md_intent_getattr_async(ll_i2mdexp(dir), minfo);
 	if (rc)
-		sa_fini_data(item);
+		sa_fini_data(minfo);
 
 	return rc;
 }
@@ -728,7 +808,7 @@ static int sa_revalidate(struct inode *dir, struct sa_entry *entry,
 		.it_op = IT_GETATTR,
 		.it_lock_handle = 0
 	};
-	struct md_op_item *item;
+	struct md_enqueue_info *minfo;
 	int rc;
 
 	if (unlikely(!inode))
@@ -737,9 +817,9 @@ static int sa_revalidate(struct inode *dir, struct sa_entry *entry,
 	if (d_mountpoint(dentry))
 		return 1;
 
-	item = sa_prep_data(dir, inode, entry);
-	if (IS_ERR(item))
-		return PTR_ERR(item);
+	minfo = sa_prep_data(dir, inode, entry);
+	if (IS_ERR(minfo))
+		return PTR_ERR(minfo);
 
 	entry->se_inode = igrab(inode);
 	rc = md_revalidate_lock(ll_i2mdexp(dir), &it, ll_inode2fid(inode),
@@ -747,15 +827,15 @@ static int sa_revalidate(struct inode *dir, struct sa_entry *entry,
 	if (rc == 1) {
 		entry->se_handle = it.it_lock_handle;
 		ll_intent_release(&it);
-		sa_fini_data(item);
+		sa_fini_data(minfo);
 		return 1;
 	}
 
-	rc = md_intent_getattr_async(ll_i2mdexp(dir), item);
+	rc = md_intent_getattr_async(ll_i2mdexp(dir), minfo);
 	if (rc) {
 		entry->se_inode = NULL;
 		iput(inode);
-		sa_fini_data(item);
+		sa_fini_data(minfo);
 	}
 
 	return rc;
@@ -815,6 +895,9 @@ static int ll_agl_thread(void *arg)
 	while (({set_current_state(TASK_IDLE);
 		 !kthread_should_stop(); })) {
 		spin_lock(&plli->lli_agl_lock);
+		/* The statahead thread maybe help to process AGL entries,
+		 * so check whether list empty again.
+		 */
 		clli = list_first_entry_or_null(&sai->sai_agls,
 						struct ll_inode_info,
 						lli_agl_list);
@@ -852,10 +935,9 @@ static void ll_stop_agl(struct ll_statahead_info *sai)
 	kthread_stop(agl_task);
 
 	spin_lock(&plli->lli_agl_lock);
-	clli = list_first_entry_or_null(&sai->sai_agls,
-					struct ll_inode_info,
-					lli_agl_list);
-	if (clli) {
+	while ((clli = list_first_entry_or_null(&sai->sai_agls,
+						struct ll_inode_info,
+						lli_agl_list)) != NULL) {
 		list_del_init(&clli->lli_agl_list);
 		spin_unlock(&plli->lli_agl_lock);
 		clli->lli_agl_index = 0;
@@ -928,8 +1010,10 @@ static int ll_statahead_thread(void *arg)
 			break;
 		}
 
+		sai->sai_in_readpage = 1;
 		page = ll_get_dir_page(dir, op_data, pos);
 		ll_unlock_md_op_lsm(op_data);
+		sai->sai_in_readpage = 0;
 		if (IS_ERR(page)) {
 			rc = PTR_ERR(page);
 			CDEBUG(D_READA,
@@ -993,9 +1077,14 @@ static int ll_statahead_thread(void *arg)
 
 			while (({set_current_state(TASK_IDLE);
 				 sai->sai_task; })) {
+				if (sa_has_callback(sai)) {
+					__set_current_state(TASK_RUNNING);
+					sa_handle_callback(sai);
+				}
+
 				spin_lock(&lli->lli_agl_lock);
 				while (sa_sent_full(sai) &&
-				       !list_empty(&sai->sai_agls)) {
+				       !agl_list_empty(sai)) {
 					struct ll_inode_info *clli;
 
 					__set_current_state(TASK_RUNNING);
@@ -1047,11 +1136,16 @@ static int ll_statahead_thread(void *arg)
 
 	/*
 	 * statahead is finished, but statahead entries need to be cached, wait
-	 * for file release closedir() call to stop me.
+	 * for file release to stop me.
 	 */
 	while (({set_current_state(TASK_IDLE);
 		 sai->sai_task; })) {
-		schedule();
+		if (sa_has_callback(sai)) {
+			__set_current_state(TASK_RUNNING);
+			sa_handle_callback(sai);
+		} else {
+			schedule();
+		}
 	}
 	__set_current_state(TASK_RUNNING);
 out:
@@ -1061,9 +1155,13 @@ static int ll_statahead_thread(void *arg)
 	 * wait for inflight statahead RPCs to finish, and then we can free sai
 	 * safely because statahead RPC will access sai data
 	 */
-	while (sai->sai_sent != sai->sai_replied)
+	while (sai->sai_sent != sai->sai_replied) {
 		/* in case we're not woken up, timeout wait */
 		msleep(125);
+	}
+
+	/* release resources held by statahead RPCs */
+	sa_handle_callback(sai);
 
 	CDEBUG(D_READA, "statahead thread stopped: sai %p, parent %pd\n",
 	       sai, parent);
@@ -1325,6 +1423,10 @@ static int revalidate_statahead_dentry(struct inode *dir,
 		goto out_unplug;
 	}
 
+	/* if statahead is busy in readdir, help it do post-work */
+	if (!sa_ready(entry) && sai->sai_in_readpage)
+		sa_handle_callback(sai);
+
 	if (!sa_ready(entry)) {
 		spin_lock(&lli->lli_sa_lock);
 		sai->sai_index_wait = entry->se_index;
@@ -1497,7 +1599,6 @@ static int start_statahead_thread(struct inode *dir, struct dentry *dentry,
 	sai->sai_task = task;
 
 	wake_up_process(task);
-
 	/*
 	 * We don't stat-ahead for the first dirent since we are already in
 	 * lookup.
diff --git a/fs/lustre/lmv/lmv_obd.c b/fs/lustre/lmv/lmv_obd.c
index 1d9b830..71bf7811 100644
--- a/fs/lustre/lmv/lmv_obd.c
+++ b/fs/lustre/lmv/lmv_obd.c
@@ -3438,9 +3438,9 @@ static int lmv_clear_open_replay_data(struct obd_export *exp,
 }
 
 static int lmv_intent_getattr_async(struct obd_export *exp,
-				    struct md_op_item *item)
+				    struct md_enqueue_info *minfo)
 {
-	struct md_op_data *op_data = &item->mop_data;
+	struct md_op_data *op_data = &minfo->mi_data;
 	struct obd_device *obd = exp->exp_obd;
 	struct lmv_obd *lmv = &obd->u.lmv;
 	struct lmv_tgt_desc *ptgt = NULL;
@@ -3464,7 +3464,7 @@ static int lmv_intent_getattr_async(struct obd_export *exp,
 	if (ctgt != ptgt)
 		return -EREMOTE;
 
-	return md_intent_getattr_async(ptgt->ltd_exp, item);
+	return md_intent_getattr_async(ptgt->ltd_exp, minfo);
 }
 
 static int lmv_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
diff --git a/fs/lustre/mdc/mdc_internal.h b/fs/lustre/mdc/mdc_internal.h
index 2416607..fab40bd 100644
--- a/fs/lustre/mdc/mdc_internal.h
+++ b/fs/lustre/mdc/mdc_internal.h
@@ -130,7 +130,8 @@ int mdc_cancel_unused(struct obd_export *exp, const struct lu_fid *fid,
 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
 			struct lu_fid *fid, u64 *bits);
 
-int mdc_intent_getattr_async(struct obd_export *exp, struct md_op_item *item);
+int mdc_intent_getattr_async(struct obd_export *exp,
+			     struct md_enqueue_info *minfo);
 
 enum ldlm_mode mdc_lock_match(struct obd_export *exp, u64 flags,
 			      const struct lu_fid *fid, enum ldlm_type type,
diff --git a/fs/lustre/mdc/mdc_locks.c b/fs/lustre/mdc/mdc_locks.c
index a0fcab0..4135c3a 100644
--- a/fs/lustre/mdc/mdc_locks.c
+++ b/fs/lustre/mdc/mdc_locks.c
@@ -49,7 +49,7 @@
 
 struct mdc_getattr_args {
 	struct obd_export	*ga_exp;
-	struct md_op_item	*ga_item;
+	struct md_enqueue_info	*ga_minfo;
 };
 
 int it_open_error(int phase, struct lookup_intent *it)
@@ -1360,10 +1360,10 @@ static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
 {
 	struct mdc_getattr_args *ga = args;
 	struct obd_export *exp = ga->ga_exp;
-	struct md_op_item *item = ga->ga_item;
-	struct ldlm_enqueue_info *einfo = &item->mop_einfo;
-	struct lookup_intent *it = &item->mop_it;
-	struct lustre_handle *lockh = &item->mop_lockh;
+	struct md_enqueue_info *minfo = ga->ga_minfo;
+	struct ldlm_enqueue_info *einfo = &minfo->mi_einfo;
+	struct lookup_intent *it = &minfo->mi_it;
+	struct lustre_handle *lockh = &minfo->mi_lockh;
 	struct ldlm_reply *lockrep;
 	u64 flags = LDLM_FL_HAS_INTENT;
 
@@ -1388,17 +1388,18 @@ static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
 	if (rc)
 		goto out;
 
-	rc = mdc_finish_intent_lock(exp, req, &item->mop_data, it, lockh);
+	rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
+
 out:
-	item->mop_cb(&req->rq_pill, item, rc);
+	minfo->mi_cb(req, minfo, rc);
 	return 0;
 }
 
 int mdc_intent_getattr_async(struct obd_export *exp,
-			     struct md_op_item *item)
+			     struct md_enqueue_info *minfo)
 {
-	struct md_op_data *op_data = &item->mop_data;
-	struct lookup_intent *it = &item->mop_it;
+	struct md_op_data *op_data = &minfo->mi_data;
+	struct lookup_intent *it = &minfo->mi_it;
 	struct ptlrpc_request *req;
 	struct mdc_getattr_args *ga;
 	struct ldlm_res_id res_id;
@@ -1427,11 +1428,11 @@ int mdc_intent_getattr_async(struct obd_export *exp,
 	 * to avoid possible races. It is safe to have glimpse handler
 	 * for non-DOM locks and costs nothing.
 	 */
-	if (!item->mop_einfo.ei_cb_gl)
-		item->mop_einfo.ei_cb_gl = mdc_ldlm_glimpse_ast;
+	if (!minfo->mi_einfo.ei_cb_gl)
+		minfo->mi_einfo.ei_cb_gl = mdc_ldlm_glimpse_ast;
 
-	rc = ldlm_cli_enqueue(exp, &req, &item->mop_einfo, &res_id, &policy,
-			      &flags, NULL, 0, LVB_T_NONE, &item->mop_lockh, 1);
+	rc = ldlm_cli_enqueue(exp, &req, &minfo->mi_einfo, &res_id, &policy,
+			      &flags, NULL, 0, LVB_T_NONE, &minfo->mi_lockh, 1);
 	if (rc < 0) {
 		ptlrpc_req_finished(req);
 		return rc;
@@ -1439,7 +1440,7 @@ int mdc_intent_getattr_async(struct obd_export *exp,
 
 	ga = ptlrpc_req_async_args(ga, req);
 	ga->ga_exp = exp;
-	ga->ga_item = item;
+	ga->ga_minfo = minfo;
 
 	req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
 	ptlrpcd_add_req(req);
-- 
1.8.3.1

_______________________________________________
lustre-devel mailing list
lustre-devel@lists.lustre.org
http://lists.lustre.org/listinfo.cgi/lustre-devel-lustre.org

  parent reply	other threads:[~2021-08-02 19:51 UTC|newest]

Thread overview: 32+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2021-08-02 19:50 [lustre-devel] [PATCH 00/25] Sync to OpenSFS tree as of Aug 2, 2021 James Simmons
2021-08-02 19:50 ` [lustre-devel] [PATCH 01/25] lustre: llite: avoid stale data reading James Simmons
2021-08-02 19:50 ` [lustre-devel] [PATCH 02/25] lustre: llite: No locked parallel DIO James Simmons
2021-08-02 19:50 ` [lustre-devel] [PATCH 03/25] lnet: discard lnet_current_net_count James Simmons
2021-08-02 19:50 ` [lustre-devel] [PATCH 04/25] lnet: convert kiblnd/ksocknal_thread_start to vararg James Simmons
2021-08-02 19:50 ` [lustre-devel] [PATCH 05/25] lnet: print device status in net show command James Simmons
2021-08-02 19:50 ` [lustre-devel] [PATCH 06/25] lustre: lmv: getattr_name("..") under striped directory James Simmons
2021-08-02 19:50 ` James Simmons [this message]
2021-08-02 19:50 ` [lustre-devel] [PATCH 08/25] lnet: Protect lpni deref in lnet_health_check James Simmons
2021-08-02 19:50 ` [lustre-devel] [PATCH 09/25] lustre: uapi: remove MDS_SETATTR_PORTAL and service James Simmons
2021-08-02 19:50 ` [lustre-devel] [PATCH 10/25] lustre: llite: Modify AIO/DIO reference counting James Simmons
2021-08-02 19:50 ` [lustre-devel] [PATCH 11/25] lustre: llite: Remove transient page counting James Simmons
2021-08-02 19:50 ` [lustre-devel] [PATCH 12/25] lustre: lov: Improve DIO submit James Simmons
2021-08-02 19:50 ` [lustre-devel] [PATCH 13/25] lustre: llite: Adjust dio refcounting James Simmons
2021-08-02 19:50 ` [lustre-devel] [PATCH 14/25] lustre: clio: Skip prep for transients James Simmons
2021-08-02 19:50 ` [lustre-devel] [PATCH 15/25] lustre: osc: Improve osc_queue_sync_pages James Simmons
2021-08-02 19:50 ` [lustre-devel] [PATCH 16/25] lustre: llite: avoid project quota overflow James Simmons
2021-08-02 19:50 ` [lustre-devel] [PATCH 17/25] lnet: check memdup_user_nul using IS_ERR James Simmons
2021-08-02 19:50 ` [lustre-devel] [PATCH 18/25] lustre: osc: Remove lockless truncate James Simmons
2021-08-02 19:50 ` [lustre-devel] [PATCH 19/25] lustre: osc: Remove client contention support James Simmons
2021-08-02 19:50 ` [lustre-devel] [PATCH 20/25] lustre: osc: osc: Do not flush on lockless cancel James Simmons
2021-08-02 19:50 ` [lustre-devel] [PATCH 21/26] lustre: pcc: add LCM_FL_PCC_RDONLY layout flag James Simmons
2021-08-02 19:50 ` [lustre-devel] [PATCH 21/25] lustre: update version to 2.14.53 James Simmons
2021-08-02 19:50 ` [lustre-devel] [PATCH 22/25] lustre: mdc: set default LMV on ROOT James Simmons
2021-08-02 19:50 ` [lustre-devel] [PATCH 22/26] lustre: update version to 2.14.53 James Simmons
2021-08-02 19:50 ` [lustre-devel] [PATCH 23/25] lustre: llite: enable filesystem-wide default LMV James Simmons
2021-08-02 19:50 ` [lustre-devel] [PATCH 23/26] lustre: mdc: set default LMV on ROOT James Simmons
2021-08-02 19:50 ` [lustre-devel] [PATCH 24/25] lnet: o2iblnd: clear fatal error on successful failover James Simmons
2021-08-02 19:50 ` [lustre-devel] [PATCH 24/26] lustre: llite: enable filesystem-wide default LMV James Simmons
2021-08-02 19:50 ` [lustre-devel] [PATCH 25/25] lnet: add "stats reset" to lnetctl James Simmons
2021-08-02 19:50 ` [lustre-devel] [PATCH 25/26] lnet: o2iblnd: clear fatal error on successful failover James Simmons
2021-08-02 19:50 ` [lustre-devel] [PATCH 26/26] lnet: add "stats reset" to lnetctl James Simmons

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=1627933851-7603-8-git-send-email-jsimmons@infradead.org \
    --to=jsimmons@infradead.org \
    --cc=adilger@whamcloud.com \
    --cc=green@whamcloud.com \
    --cc=lustre-devel@lists.lustre.org \
    --cc=neilb@suse.de \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.