All of lore.kernel.org
 help / color / mirror / Atom feed
From: James Simmons <jsimmons@infradead.org>
To: Andreas Dilger <adilger@whamcloud.com>,
	Oleg Drokin <green@whamcloud.com>, NeilBrown <neilb@suse.de>
Cc: Vitaly Fertman <vitaly.fertman@hpe.com>,
	Lustre Development List <lustre-devel@lists.lustre.org>
Subject: [lustre-devel] [PATCH 16/22] lustre: ldlm: group lock unlock fix
Date: Sun, 20 Nov 2022 09:17:02 -0500	[thread overview]
Message-ID: <1668953828-10909-17-git-send-email-jsimmons@infradead.org> (raw)
In-Reply-To: <1668953828-10909-1-git-send-email-jsimmons@infradead.org>

From: Vitaly Fertman <vitaly.fertman@hpe.com>

The original LU-9964 fix had a problem because with many pages in
memory grouplock unlock takes 10+ seconds just to discard them.

The current patch makes grouplock unlock thread to be not atomic, but
makes a new grouplock enqueue to wait until previous CBPENDING lock
gets destroyed.

HPE-bug-id: LUS-10644

WC-bug-id: https://jira.whamcloud.com/browse/LU-16046
Lustre-commit: 3dc261c06434eceee ("LU-16046 ldlm: group lock unlock fix")
Lustre-commit: 62fd8f9b498ae3d16 ("Revert "LU-16046 revert: "LU-9964 llite: prevent mulitple group locks"")
Lustre-commit: dd609c6f31adeadab ("Revert "LU-16046 ldlm: group lock fix")
Signed-off-by: Vitaly Fertman <vitaly.fertman@hpe.com>
Reviewed-on: https://es-gerrit.dev.cray.com/161411
Reviewed-by: Andriy Skulysh <c17819@cray.com>
Reviewed-by: Alexander Boyko <alexander.boyko@hpe.com>
Tested-by: Alexander Lezhoev <alexander.lezhoev@hpe.com>
Reviewed-on: https://review.whamcloud.com/c/fs/lustre-release/+/49008
Reviewed-by: Alexander <alexander.boyko@hpe.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
Signed-off-by: James Simmons <jsimmons@infradead.org>
---
 fs/lustre/include/lustre_dlm.h   |   1 +
 fs/lustre/include/lustre_osc.h   |  15 ----
 fs/lustre/ldlm/ldlm_lock.c       |  28 ++++++-
 fs/lustre/llite/file.c           |  76 ++++++++++++-------
 fs/lustre/llite/llite_internal.h |   3 +
 fs/lustre/llite/llite_lib.c      |   3 +
 fs/lustre/mdc/mdc_dev.c          |  58 ++++-----------
 fs/lustre/osc/osc_lock.c         | 157 ++-------------------------------------
 fs/lustre/osc/osc_object.c       |  16 ----
 fs/lustre/osc/osc_request.c      |  14 ++--
 10 files changed, 110 insertions(+), 261 deletions(-)

diff --git a/fs/lustre/include/lustre_dlm.h b/fs/lustre/include/lustre_dlm.h
index 6053e01..d08c48f 100644
--- a/fs/lustre/include/lustre_dlm.h
+++ b/fs/lustre/include/lustre_dlm.h
@@ -855,6 +855,7 @@ enum ldlm_match_flags {
 	LDLM_MATCH_AST		= BIT(1),
 	LDLM_MATCH_AST_ANY	= BIT(2),
 	LDLM_MATCH_RIGHT	= BIT(3),
+	LDLM_MATCH_GROUP	= BIT(4),
 };
 
 /**
diff --git a/fs/lustre/include/lustre_osc.h b/fs/lustre/include/lustre_osc.h
index a0f1afc..d15f46b 100644
--- a/fs/lustre/include/lustre_osc.h
+++ b/fs/lustre/include/lustre_osc.h
@@ -319,11 +319,6 @@ struct osc_object {
 
 	const struct osc_object_operations *oo_obj_ops;
 	bool			oo_initialized;
-
-	wait_queue_head_t	oo_group_waitq;
-	struct mutex		oo_group_mutex;
-	u64			oo_group_users;
-	unsigned long		oo_group_gid;
 };
 
 static inline void osc_build_res_name(struct osc_object *osc,
@@ -660,16 +655,6 @@ int osc_object_glimpse(const struct lu_env *env, const struct cl_object *obj,
 int osc_object_find_cbdata(const struct lu_env *env, struct cl_object *obj,
 			   ldlm_iterator_t iter, void *data);
 int osc_object_prune(const struct lu_env *env, struct cl_object *obj);
-void osc_grouplock_inc_locked(struct osc_object *osc, struct ldlm_lock *lock);
-void osc_grouplock_dec(struct osc_object *osc, struct ldlm_lock *lock);
-int osc_grouplock_enqueue_init(const struct lu_env *env,
-			       struct osc_object *obj,
-			       struct osc_lock *oscl,
-			       struct lustre_handle *lh);
-void osc_grouplock_enqueue_fini(const struct lu_env *env,
-				struct osc_object *obj,
-				struct osc_lock *oscl,
-				struct lustre_handle *lh);
 
 /* osc_request.c */
 void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd);
diff --git a/fs/lustre/ldlm/ldlm_lock.c b/fs/lustre/ldlm/ldlm_lock.c
index 39ab2a0..8659aa5 100644
--- a/fs/lustre/ldlm/ldlm_lock.c
+++ b/fs/lustre/ldlm/ldlm_lock.c
@@ -324,6 +324,7 @@ static int ldlm_lock_destroy_internal(struct ldlm_lock *lock)
 		return 0;
 	}
 	ldlm_set_destroyed(lock);
+	wake_up(&lock->l_waitq);
 
 	ldlm_lock_remove_from_lru(lock);
 	class_handle_unhash(&lock->l_handle);
@@ -1067,10 +1068,12 @@ static bool lock_matches(struct ldlm_lock *lock, void *vdata)
 	 * can still happen.
 	 */
 	if (ldlm_is_cbpending(lock) &&
-	    !(data->lmd_flags & LDLM_FL_CBPENDING))
+	    !(data->lmd_flags & LDLM_FL_CBPENDING) &&
+	    !(data->lmd_match & LDLM_MATCH_GROUP))
 		return false;
 
-	if (!(data->lmd_match & LDLM_MATCH_UNREF) && ldlm_is_cbpending(lock) &&
+	if (!(data->lmd_match & (LDLM_MATCH_UNREF | LDLM_MATCH_GROUP)) &&
+	    ldlm_is_cbpending(lock) &&
 	    !lock->l_readers && !lock->l_writers)
 		return false;
 
@@ -1136,7 +1139,12 @@ static bool lock_matches(struct ldlm_lock *lock, void *vdata)
 		return false;
 
 matched:
-	if (data->lmd_flags & LDLM_FL_TEST_LOCK) {
+	/**
+	 * In case the lock is a CBPENDING grouplock, just pin it and return,
+	 * we need to wait until it gets to DESTROYED.
+	 */
+	if ((data->lmd_flags & LDLM_FL_TEST_LOCK) ||
+	    (ldlm_is_cbpending(lock) && (data->lmd_match & LDLM_MATCH_GROUP))) {
 		LDLM_LOCK_GET(lock);
 		ldlm_lock_touch_in_lru(lock);
 	} else {
@@ -1296,6 +1304,7 @@ enum ldlm_mode ldlm_lock_match_with_skip(struct ldlm_namespace *ns,
 	};
 	struct ldlm_resource *res;
 	struct ldlm_lock *lock;
+	struct ldlm_lock *group_lock;
 	int matched;
 
 	if (!ns) {
@@ -1314,6 +1323,8 @@ enum ldlm_mode ldlm_lock_match_with_skip(struct ldlm_namespace *ns,
 		return 0;
 	}
 
+repeat:
+	group_lock = NULL;
 	LDLM_RESOURCE_ADDREF(res);
 	lock_res(res);
 	if (res->lr_type == LDLM_EXTENT)
@@ -1323,8 +1334,19 @@ enum ldlm_mode ldlm_lock_match_with_skip(struct ldlm_namespace *ns,
 	if (!lock && !(flags & LDLM_FL_BLOCK_GRANTED))
 		lock = search_queue(&res->lr_waiting, &data);
 	matched = lock ? mode : 0;
+
+	if (lock && ldlm_is_cbpending(lock) &&
+	    (data.lmd_match & LDLM_MATCH_GROUP))
+		group_lock = lock;
 	unlock_res(res);
 	LDLM_RESOURCE_DELREF(res);
+
+	if (group_lock) {
+		l_wait_event_abortable(group_lock->l_waitq,
+				       ldlm_is_destroyed(lock));
+		LDLM_LOCK_RELEASE(lock);
+		goto repeat;
+	}
 	ldlm_resource_putref(res);
 
 	if (lock) {
diff --git a/fs/lustre/llite/file.c b/fs/lustre/llite/file.c
index 34a449e..dac829f 100644
--- a/fs/lustre/llite/file.c
+++ b/fs/lustre/llite/file.c
@@ -2522,15 +2522,30 @@ static int ll_lov_setstripe(struct inode *inode, struct file *file,
 	if (ll_file_nolock(file))
 		return -EOPNOTSUPP;
 
-	read_lock(&lli->lli_lock);
+retry:
+	if (file->f_flags & O_NONBLOCK) {
+		if (!mutex_trylock(&lli->lli_group_mutex))
+			return -EAGAIN;
+	} else
+		mutex_lock(&lli->lli_group_mutex);
+
 	if (fd->fd_flags & LL_FILE_GROUP_LOCKED) {
 		CWARN("group lock already existed with gid %lu\n",
 		      fd->fd_grouplock.lg_gid);
-		read_unlock(&lli->lli_lock);
-		return -EINVAL;
+		rc = -EINVAL;
+		goto out;
+	}
+	if (arg != lli->lli_group_gid && lli->lli_group_users != 0) {
+		if (file->f_flags & O_NONBLOCK) {
+			rc = -EAGAIN;
+			goto out;
+		}
+		mutex_unlock(&lli->lli_group_mutex);
+		wait_var_event(&lli->lli_group_users, !lli->lli_group_users);
+		rc = 0;
+		goto retry;
 	}
 	LASSERT(!fd->fd_grouplock.lg_lock);
-	read_unlock(&lli->lli_lock);
 
 	/**
 	 * XXX: group lock needs to protect all OST objects while PFL
@@ -2549,8 +2564,10 @@ static int ll_lov_setstripe(struct inode *inode, struct file *file,
 		u16 refcheck;
 
 		env = cl_env_get(&refcheck);
-		if (IS_ERR(env))
-			return PTR_ERR(env);
+		if (IS_ERR(env)) {
+			rc = PTR_ERR(env);
+			goto out;
+		}
 
 		rc = cl_object_layout_get(env, obj, &cl);
 		if (rc >= 0 && cl.cl_is_composite)
@@ -2559,28 +2576,26 @@ static int ll_lov_setstripe(struct inode *inode, struct file *file,
 
 		cl_env_put(env, &refcheck);
 		if (rc < 0)
-			return rc;
+			goto out;
 	}
 
 	rc = cl_get_grouplock(ll_i2info(inode)->lli_clob,
 			      arg, (file->f_flags & O_NONBLOCK), &grouplock);
-	if (rc)
-		return rc;
 
-	write_lock(&lli->lli_lock);
-	if (fd->fd_flags & LL_FILE_GROUP_LOCKED) {
-		write_unlock(&lli->lli_lock);
-		CERROR("another thread just won the race\n");
-		cl_put_grouplock(&grouplock);
-		return -EINVAL;
-	}
+	if (rc)
+		goto out;
 
 	fd->fd_flags |= LL_FILE_GROUP_LOCKED;
 	fd->fd_grouplock = grouplock;
-	write_unlock(&lli->lli_lock);
+	if (lli->lli_group_users == 0)
+		lli->lli_group_gid = grouplock.lg_gid;
+	lli->lli_group_users++;
 
 	CDEBUG(D_INFO, "group lock %lu obtained\n", arg);
-	return 0;
+out:
+	mutex_unlock(&lli->lli_group_mutex);
+
+	return rc;
 }
 
 static int ll_put_grouplock(struct inode *inode, struct file *file,
@@ -2589,31 +2604,40 @@ static int ll_put_grouplock(struct inode *inode, struct file *file,
 	struct ll_inode_info *lli = ll_i2info(inode);
 	struct ll_file_data *fd = file->private_data;
 	struct ll_grouplock grouplock;
+	int rc;
 
-	write_lock(&lli->lli_lock);
+	mutex_lock(&lli->lli_group_mutex);
 	if (!(fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
-		write_unlock(&lli->lli_lock);
 		CWARN("no group lock held\n");
-		return -EINVAL;
+		rc = -EINVAL;
+		goto out;
 	}
-
 	LASSERT(fd->fd_grouplock.lg_lock);
 
 	if (fd->fd_grouplock.lg_gid != arg) {
 		CWARN("group lock %lu doesn't match current id %lu\n",
 		      arg, fd->fd_grouplock.lg_gid);
-		write_unlock(&lli->lli_lock);
-		return -EINVAL;
+		rc = -EINVAL;
+		goto out;
 	}
 
 	grouplock = fd->fd_grouplock;
 	memset(&fd->fd_grouplock, 0, sizeof(fd->fd_grouplock));
 	fd->fd_flags &= ~LL_FILE_GROUP_LOCKED;
-	write_unlock(&lli->lli_lock);
 
 	cl_put_grouplock(&grouplock);
+
+	lli->lli_group_users--;
+	if (lli->lli_group_users == 0) {
+		lli->lli_group_gid = 0;
+		wake_up_var(&lli->lli_group_users);
+	}
 	CDEBUG(D_INFO, "group lock %lu released\n", arg);
-	return 0;
+	rc = 0;
+out:
+	mutex_unlock(&lli->lli_group_mutex);
+
+	return rc;
 }
 
 /**
diff --git a/fs/lustre/llite/llite_internal.h b/fs/lustre/llite/llite_internal.h
index d245dd8..998eed8 100644
--- a/fs/lustre/llite/llite_internal.h
+++ b/fs/lustre/llite/llite_internal.h
@@ -253,6 +253,9 @@ struct ll_inode_info {
 			u64				lli_pcc_generation;
 			enum pcc_dataset_flags		lli_pcc_dsflags;
 			struct pcc_inode		*lli_pcc_inode;
+			struct mutex			lli_group_mutex;
+			u64				lli_group_users;
+			unsigned long			lli_group_gid;
 
 			u64				lli_attr_valid;
 			u64				lli_lazysize;
diff --git a/fs/lustre/llite/llite_lib.c b/fs/lustre/llite/llite_lib.c
index 3dc0030..176e61b5 100644
--- a/fs/lustre/llite/llite_lib.c
+++ b/fs/lustre/llite/llite_lib.c
@@ -1194,6 +1194,9 @@ void ll_lli_init(struct ll_inode_info *lli)
 		lli->lli_pcc_inode = NULL;
 		lli->lli_pcc_dsflags = PCC_DATASET_INVALID;
 		lli->lli_pcc_generation = 0;
+		mutex_init(&lli->lli_group_mutex);
+		lli->lli_group_users = 0;
+		lli->lli_group_gid = 0;
 	}
 	mutex_init(&lli->lli_layout_mutex);
 	memset(lli->lli_jobid, 0, sizeof(lli->lli_jobid));
diff --git a/fs/lustre/mdc/mdc_dev.c b/fs/lustre/mdc/mdc_dev.c
index 978fee3..e0f5b45 100644
--- a/fs/lustre/mdc/mdc_dev.c
+++ b/fs/lustre/mdc/mdc_dev.c
@@ -330,7 +330,6 @@ static int mdc_dlm_canceling(const struct lu_env *env,
 	 */
 	if (obj) {
 		struct cl_attr *attr = &osc_env_info(env)->oti_attr;
-		void *data;
 
 		/* Destroy pages covered by the extent of the DLM lock */
 		result = mdc_lock_flush(env, cl2osc(obj), cl_index(obj, 0),
@@ -340,17 +339,12 @@ static int mdc_dlm_canceling(const struct lu_env *env,
 		 */
 		/* losing a lock, update kms */
 		lock_res_and_lock(dlmlock);
-		data = dlmlock->l_ast_data;
 		dlmlock->l_ast_data = NULL;
 		cl_object_attr_lock(obj);
 		attr->cat_kms = 0;
 		cl_object_attr_update(env, obj, attr, CAT_KMS);
 		cl_object_attr_unlock(obj);
 		unlock_res_and_lock(dlmlock);
-
-		/* Skip dec in case mdc_object_ast_clear() did it */
-		if (data && dlmlock->l_req_mode == LCK_GROUP)
-			osc_grouplock_dec(cl2osc(obj), dlmlock);
 		cl_object_put(env, obj);
 	}
 	return result;
@@ -457,7 +451,7 @@ void mdc_lock_lvb_update(const struct lu_env *env, struct osc_object *osc,
 }
 
 static void mdc_lock_granted(const struct lu_env *env, struct osc_lock *oscl,
-			     struct lustre_handle *lockh, int errcode)
+			     struct lustre_handle *lockh)
 {
 	struct osc_object *osc = cl2osc(oscl->ols_cl.cls_obj);
 	struct ldlm_lock *dlmlock;
@@ -510,9 +504,6 @@ static void mdc_lock_granted(const struct lu_env *env, struct osc_lock *oscl,
 
 	LASSERT(oscl->ols_state != OLS_GRANTED);
 	oscl->ols_state = OLS_GRANTED;
-
-	if (errcode != ELDLM_LOCK_MATCHED && dlmlock->l_req_mode == LCK_GROUP)
-		osc_grouplock_inc_locked(osc, dlmlock);
 }
 
 /**
@@ -544,7 +535,7 @@ static int mdc_lock_upcall(void *cookie, struct lustre_handle *lockh,
 
 	CDEBUG(D_INODE, "rc %d, err %d\n", rc, errcode);
 	if (rc == 0)
-		mdc_lock_granted(env, oscl, lockh, errcode);
+		mdc_lock_granted(env, oscl, lockh);
 
 	/* Error handling, some errors are tolerable. */
 	if (oscl->ols_glimpse && rc == -ENAVAIL) {
@@ -706,7 +697,8 @@ int mdc_enqueue_send(const struct lu_env *env, struct obd_export *exp,
 	struct ldlm_intent *lit;
 	enum ldlm_mode mode;
 	bool glimpse = *flags & LDLM_FL_HAS_INTENT;
-	u64 match_flags = *flags;
+	u64 search_flags = *flags;
+	u64 match_flags = 0;
 	LIST_HEAD(cancels);
 	int rc, count;
 	int lvb_size;
@@ -716,11 +708,14 @@ int mdc_enqueue_send(const struct lu_env *env, struct obd_export *exp,
 	if (einfo->ei_mode == LCK_PR)
 		mode |= LCK_PW;
 
-	match_flags |= LDLM_FL_LVB_READY;
+	search_flags |= LDLM_FL_LVB_READY;
 	if (glimpse)
-		match_flags |= LDLM_FL_BLOCK_GRANTED;
-	mode = ldlm_lock_match(obd->obd_namespace, match_flags, res_id,
-			       einfo->ei_type, policy, mode, &lockh);
+		search_flags |= LDLM_FL_BLOCK_GRANTED;
+	if (mode == LCK_GROUP)
+		match_flags = LDLM_MATCH_GROUP;
+	mode = ldlm_lock_match_with_skip(obd->obd_namespace, search_flags, 0,
+					 res_id, einfo->ei_type, policy, mode,
+					 &lockh, match_flags);
 	if (mode) {
 		struct ldlm_lock *matched;
 
@@ -833,9 +828,9 @@ int mdc_enqueue_send(const struct lu_env *env, struct obd_export *exp,
  *
  * This function does not wait for the network communication to complete.
  */
-static int __mdc_lock_enqueue(const struct lu_env *env,
-			      const struct cl_lock_slice *slice,
-			      struct cl_io *unused, struct cl_sync_io *anchor)
+static int mdc_lock_enqueue(const struct lu_env *env,
+			    const struct cl_lock_slice *slice,
+			    struct cl_io *unused, struct cl_sync_io *anchor)
 {
 	struct osc_thread_info *info = osc_env_info(env);
 	struct osc_io *oio = osc_env_io(env);
@@ -921,28 +916,6 @@ static int __mdc_lock_enqueue(const struct lu_env *env,
 	return result;
 }
 
-static int mdc_lock_enqueue(const struct lu_env *env,
-			    const struct cl_lock_slice *slice,
-			    struct cl_io *unused, struct cl_sync_io *anchor)
-{
-	struct osc_object *obj = cl2osc(slice->cls_obj);
-	struct osc_lock	*oscl = cl2osc_lock(slice);
-	struct lustre_handle lh = { 0 };
-	int rc;
-
-	if (oscl->ols_cl.cls_lock->cll_descr.cld_mode == CLM_GROUP) {
-		rc = osc_grouplock_enqueue_init(env, obj, oscl, &lh);
-		if (rc < 0)
-			return rc;
-	}
-
-	rc = __mdc_lock_enqueue(env, slice, unused, anchor);
-
-	if (oscl->ols_cl.cls_lock->cll_descr.cld_mode == CLM_GROUP)
-		osc_grouplock_enqueue_fini(env, obj, oscl, &lh);
-	return rc;
-}
-
 static const struct cl_lock_operations mdc_lock_lockless_ops = {
 	.clo_fini	= osc_lock_fini,
 	.clo_enqueue	= mdc_lock_enqueue,
@@ -1468,9 +1441,6 @@ static int mdc_object_ast_clear(struct ldlm_lock *lock, void *data)
 		memcpy(lvb, &oinfo->loi_lvb, sizeof(oinfo->loi_lvb));
 		cl_object_attr_unlock(&osc->oo_cl);
 		ldlm_clear_lvb_cached(lock);
-
-		if (lock->l_req_mode == LCK_GROUP)
-			osc_grouplock_dec(osc, lock);
 	}
 	return LDLM_ITER_CONTINUE;
 }
diff --git a/fs/lustre/osc/osc_lock.c b/fs/lustre/osc/osc_lock.c
index a3e72a6..3b22688 100644
--- a/fs/lustre/osc/osc_lock.c
+++ b/fs/lustre/osc/osc_lock.c
@@ -198,7 +198,7 @@ void osc_lock_lvb_update(const struct lu_env *env,
 }
 
 static void osc_lock_granted(const struct lu_env *env, struct osc_lock *oscl,
-			     struct lustre_handle *lockh, int errcode)
+			     struct lustre_handle *lockh)
 {
 	struct osc_object *osc = cl2osc(oscl->ols_cl.cls_obj);
 	struct ldlm_lock *dlmlock;
@@ -254,126 +254,7 @@ static void osc_lock_granted(const struct lu_env *env, struct osc_lock *oscl,
 
 	LASSERT(oscl->ols_state != OLS_GRANTED);
 	oscl->ols_state = OLS_GRANTED;
-
-	if (errcode != ELDLM_LOCK_MATCHED && dlmlock->l_req_mode == LCK_GROUP)
-		osc_grouplock_inc_locked(osc, dlmlock);
-}
-
-void osc_grouplock_inc_locked(struct osc_object *osc, struct ldlm_lock *lock)
-{
-	LASSERT(lock->l_req_mode == LCK_GROUP);
-
-	if (osc->oo_group_users == 0)
-		osc->oo_group_gid = lock->l_policy_data.l_extent.gid;
-	osc->oo_group_users++;
-
-	LDLM_DEBUG(lock, "users %llu gid %llu\n",
-		   osc->oo_group_users,
-		   lock->l_policy_data.l_extent.gid);
-}
-EXPORT_SYMBOL(osc_grouplock_inc_locked);
-
-void osc_grouplock_dec(struct osc_object *osc, struct ldlm_lock *lock)
-{
-	LASSERT(lock->l_req_mode == LCK_GROUP);
-
-	mutex_lock(&osc->oo_group_mutex);
-
-	LASSERT(osc->oo_group_users > 0);
-	osc->oo_group_users--;
-	if (osc->oo_group_users == 0) {
-		osc->oo_group_gid = 0;
-		wake_up_all(&osc->oo_group_waitq);
-	}
-	mutex_unlock(&osc->oo_group_mutex);
-
-	LDLM_DEBUG(lock, "users %llu gid %lu\n",
-		   osc->oo_group_users, osc->oo_group_gid);
 }
-EXPORT_SYMBOL(osc_grouplock_dec);
-
-int osc_grouplock_enqueue_init(const struct lu_env *env,
-			       struct osc_object *obj,
-			       struct osc_lock *oscl,
-			       struct lustre_handle *lh)
-{
-	struct cl_lock_descr *need = &oscl->ols_cl.cls_lock->cll_descr;
-	int rc = 0;
-
-	LASSERT(need->cld_mode == CLM_GROUP);
-
-	while (true) {
-		bool check_gid = true;
-
-		if (oscl->ols_flags & LDLM_FL_BLOCK_NOWAIT) {
-			if (!mutex_trylock(&obj->oo_group_mutex))
-				return -EAGAIN;
-		} else {
-			mutex_lock(&obj->oo_group_mutex);
-		}
-
-		/**
-		 * If a grouplock of the same gid already exists, match it
-		 * here in advance. Otherwise, if that lock is being cancelled
-		 * there is a chance to get 2 grouplocks for the same file.
-		 */
-		if (obj->oo_group_users &&
-		    obj->oo_group_gid == need->cld_gid) {
-			struct osc_thread_info *info = osc_env_info(env);
-			struct ldlm_res_id *resname = &info->oti_resname;
-			union ldlm_policy_data *policy = &info->oti_policy;
-			struct cl_lock *lock = oscl->ols_cl.cls_lock;
-			u64 flags = oscl->ols_flags | LDLM_FL_BLOCK_GRANTED;
-			struct ldlm_namespace *ns;
-			enum ldlm_mode mode;
-
-			ns = osc_export(obj)->exp_obd->obd_namespace;
-			ostid_build_res_name(&obj->oo_oinfo->loi_oi, resname);
-			osc_lock_build_policy(env, lock, policy);
-			mode = ldlm_lock_match(ns, flags, resname,
-					       oscl->ols_einfo.ei_type, policy,
-					       oscl->ols_einfo.ei_mode, lh);
-			if (mode)
-				oscl->ols_flags |= LDLM_FL_MATCH_LOCK;
-			else
-				check_gid = false;
-		}
-
-		/**
-		 * If a grouplock exists but cannot be matched, let it to flush
-		 * and wait just for zero users for now.
-		 */
-		if (obj->oo_group_users == 0 ||
-		    (check_gid && obj->oo_group_gid == need->cld_gid))
-			break;
-
-		mutex_unlock(&obj->oo_group_mutex);
-		if (oscl->ols_flags & LDLM_FL_BLOCK_NOWAIT)
-			return -EAGAIN;
-
-		rc = l_wait_event_abortable(obj->oo_group_waitq,
-					    !obj->oo_group_users);
-		if (rc)
-			return rc;
-	}
-
-	return 0;
-}
-EXPORT_SYMBOL(osc_grouplock_enqueue_init);
-
-void osc_grouplock_enqueue_fini(const struct lu_env *env,
-				struct osc_object *obj,
-				struct osc_lock *oscl,
-				struct lustre_handle *lh)
-{
-	LASSERT(oscl->ols_cl.cls_lock->cll_descr.cld_mode == CLM_GROUP);
-
-	/* If a user was added on enqueue_init, decref it */
-	if (lustre_handle_is_used(lh))
-		ldlm_lock_decref(lh, oscl->ols_einfo.ei_mode);
-	mutex_unlock(&obj->oo_group_mutex);
-}
-EXPORT_SYMBOL(osc_grouplock_enqueue_fini);
 
 /**
  * Lock upcall function that is executed either when a reply to ENQUEUE rpc is
@@ -403,7 +284,7 @@ static int osc_lock_upcall(void *cookie, struct lustre_handle *lockh,
 	}
 
 	if (rc == 0)
-		osc_lock_granted(env, oscl, lockh, errcode);
+		osc_lock_granted(env, oscl, lockh);
 
 	/* Error handling, some errors are tolerable. */
 	if (oscl->ols_glimpse && rc == -ENAVAIL) {
@@ -540,7 +421,6 @@ static int __osc_dlm_blocking_ast(const struct lu_env *env,
 		struct ldlm_extent *extent = &dlmlock->l_policy_data.l_extent;
 		struct cl_attr *attr = &osc_env_info(env)->oti_attr;
 		u64 old_kms;
-		void *data;
 
 		/* Destroy pages covered by the extent of the DLM lock */
 		result = osc_lock_flush(cl2osc(obj),
@@ -553,7 +433,6 @@ static int __osc_dlm_blocking_ast(const struct lu_env *env,
 		/* clearing l_ast_data after flushing data,
 		 * to let glimpse ast find the lock and the object
 		 */
-		data = dlmlock->l_ast_data;
 		dlmlock->l_ast_data = NULL;
 		cl_object_attr_lock(obj);
 		/* Must get the value under the lock to avoid race. */
@@ -567,9 +446,6 @@ static int __osc_dlm_blocking_ast(const struct lu_env *env,
 		cl_object_attr_unlock(obj);
 		unlock_res_and_lock(dlmlock);
 
-		/* Skip dec in case osc_object_ast_clear() did it */
-		if (data && dlmlock->l_req_mode == LCK_GROUP)
-			osc_grouplock_dec(cl2osc(obj), dlmlock);
 		cl_object_put(env, obj);
 	}
 	return result;
@@ -1055,9 +931,9 @@ int osc_lock_enqueue_wait(const struct lu_env *env, struct osc_object *obj,
  *
  * This function does not wait for the network communication to complete.
  */
-static int __osc_lock_enqueue(const struct lu_env *env,
-			      const struct cl_lock_slice *slice,
-			      struct cl_io *unused, struct cl_sync_io *anchor)
+static int osc_lock_enqueue(const struct lu_env *env,
+			    const struct cl_lock_slice *slice,
+			    struct cl_io *unused, struct cl_sync_io *anchor)
 {
 	struct osc_thread_info *info = osc_env_info(env);
 	struct osc_io *oio = osc_env_io(env);
@@ -1177,29 +1053,6 @@ static int __osc_lock_enqueue(const struct lu_env *env,
 	return result;
 }
 
-static int osc_lock_enqueue(const struct lu_env *env,
-			    const struct cl_lock_slice *slice,
-			    struct cl_io *unused, struct cl_sync_io *anchor)
-{
-	struct osc_object *obj = cl2osc(slice->cls_obj);
-	struct osc_lock	*oscl = cl2osc_lock(slice);
-	struct lustre_handle lh = { 0 };
-	int rc;
-
-	if (oscl->ols_cl.cls_lock->cll_descr.cld_mode == CLM_GROUP) {
-		rc = osc_grouplock_enqueue_init(env, obj, oscl, &lh);
-		if (rc < 0)
-			return rc;
-	}
-
-	rc = __osc_lock_enqueue(env, slice, unused, anchor);
-
-	if (oscl->ols_cl.cls_lock->cll_descr.cld_mode == CLM_GROUP)
-		osc_grouplock_enqueue_fini(env, obj, oscl, &lh);
-
-	return rc;
-}
-
 /**
  * Breaks a link between osc_lock and dlm_lock.
  */
diff --git a/fs/lustre/osc/osc_object.c b/fs/lustre/osc/osc_object.c
index c3667a3..efb0533 100644
--- a/fs/lustre/osc/osc_object.c
+++ b/fs/lustre/osc/osc_object.c
@@ -74,10 +74,6 @@ int osc_object_init(const struct lu_env *env, struct lu_object *obj,
 
 	atomic_set(&osc->oo_nr_ios, 0);
 	init_waitqueue_head(&osc->oo_io_waitq);
-	init_waitqueue_head(&osc->oo_group_waitq);
-	mutex_init(&osc->oo_group_mutex);
-	osc->oo_group_users = 0;
-	osc->oo_group_gid = 0;
 
 	osc->oo_root.rb_node = NULL;
 	INIT_LIST_HEAD(&osc->oo_hp_exts);
@@ -117,7 +113,6 @@ void osc_object_free(const struct lu_env *env, struct lu_object *obj)
 	LASSERT(atomic_read(&osc->oo_nr_writes) == 0);
 	LASSERT(list_empty(&osc->oo_ol_list));
 	LASSERT(!atomic_read(&osc->oo_nr_ios));
-	LASSERT(!osc->oo_group_users);
 
 	lu_object_fini(obj);
 	/* osc doen't contain an lu_object_header, so we don't need call_rcu */
@@ -230,17 +225,6 @@ static int osc_object_ast_clear(struct ldlm_lock *lock, void *data)
 		memcpy(lvb, &oinfo->loi_lvb, sizeof(oinfo->loi_lvb));
 		cl_object_attr_unlock(&osc->oo_cl);
 		ldlm_clear_lvb_cached(lock);
-
-		/**
-		 * Object is being destroyed and gets unlinked from the lock,
-		 * IO is finished and no cached data is left under the lock. As
-		 * grouplock is immediately marked CBPENDING it is not reused.
-		 * It will also be not possible to flush data later due to a
-		 * NULL l_ast_data - enough conditions to let new grouplocks to
-		 * be enqueued even if the lock still exists on client.
-		 */
-		if (lock->l_req_mode == LCK_GROUP)
-			osc_grouplock_dec(osc, lock);
 	}
 	return LDLM_ITER_CONTINUE;
 }
diff --git a/fs/lustre/osc/osc_request.c b/fs/lustre/osc/osc_request.c
index 7577fad..5a3f418 100644
--- a/fs/lustre/osc/osc_request.c
+++ b/fs/lustre/osc/osc_request.c
@@ -3009,7 +3009,8 @@ int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
 	struct lustre_handle lockh = { 0 };
 	struct ptlrpc_request *req = NULL;
 	int intent = *flags & LDLM_FL_HAS_INTENT;
-	u64 match_flags = *flags;
+	u64 search_flags = *flags;
+	u64 match_flags = 0;
 	enum ldlm_mode mode;
 	int rc;
 
@@ -3040,11 +3041,14 @@ int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
 	 * because they will not actually use the lock.
 	 */
 	if (!speculative)
-		match_flags |= LDLM_FL_LVB_READY;
+		search_flags |= LDLM_FL_LVB_READY;
 	if (intent != 0)
-		match_flags |= LDLM_FL_BLOCK_GRANTED;
-	mode = ldlm_lock_match(obd->obd_namespace, match_flags, res_id,
-			       einfo->ei_type, policy, mode, &lockh);
+		search_flags |= LDLM_FL_BLOCK_GRANTED;
+	if (mode == LCK_GROUP)
+		match_flags = LDLM_MATCH_GROUP;
+	mode = ldlm_lock_match_with_skip(obd->obd_namespace, search_flags, 0,
+					 res_id, einfo->ei_type, policy, mode,
+					 &lockh, match_flags);
 	if (mode) {
 		struct ldlm_lock *matched;
 
-- 
1.8.3.1

_______________________________________________
lustre-devel mailing list
lustre-devel@lists.lustre.org
http://lists.lustre.org/listinfo.cgi/lustre-devel-lustre.org

  parent reply	other threads:[~2022-11-20 14:39 UTC|newest]

Thread overview: 23+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2022-11-20 14:16 [lustre-devel] [PATCH 00/22] lustre: backport OpenSFS work as of Nov 20, 2022 James Simmons
2022-11-20 14:16 ` [lustre-devel] [PATCH 01/22] lustre: llite: clear stale page's uptodate bit James Simmons
2022-11-20 14:16 ` [lustre-devel] [PATCH 02/22] lustre: osc: Remove oap lock James Simmons
2022-11-20 14:16 ` [lustre-devel] [PATCH 03/22] lnet: Don't modify uptodate peer with temp NI James Simmons
2022-11-20 14:16 ` [lustre-devel] [PATCH 04/22] lustre: llite: Explicitly support .splice_write James Simmons
2022-11-20 14:16 ` [lustre-devel] [PATCH 05/22] lnet: o2iblnd: add verbose debug prints for rx/tx events James Simmons
2022-11-20 14:16 ` [lustre-devel] [PATCH 06/22] lnet: use Netlink to support old and new NI APIs James Simmons
2022-11-20 14:16 ` [lustre-devel] [PATCH 07/22] lustre: obdclass: improve precision of wakeups for mod_rpcs James Simmons
2022-11-20 14:16 ` [lustre-devel] [PATCH 08/22] lnet: allow ping packet to contain large nids James Simmons
2022-11-20 14:16 ` [lustre-devel] [PATCH 09/22] lustre: llog: skip bad records in llog James Simmons
2022-11-20 14:16 ` [lustre-devel] [PATCH 10/22] lnet: fix build issue when IPv6 is disabled James Simmons
2022-11-20 14:16 ` [lustre-devel] [PATCH 11/22] lustre: obdclass: fill jobid in a safe way James Simmons
2022-11-20 14:16 ` [lustre-devel] [PATCH 12/22] lustre: llite: remove linefeed from LDLM_DEBUG James Simmons
2022-11-20 14:16 ` [lustre-devel] [PATCH 13/22] lnet: selftest: migrate LNet selftest session handling to Netlink James Simmons
2022-11-20 14:17 ` [lustre-devel] [PATCH 14/22] lustre: clio: append to non-existent component James Simmons
2022-11-20 14:17 ` [lustre-devel] [PATCH 15/22] lnet: fix debug message in lnet_discovery_event_reply James Simmons
2022-11-20 14:17 ` James Simmons [this message]
2022-11-20 14:17 ` [lustre-devel] [PATCH 17/22] lnet: Signal completion on ping send failure James Simmons
2022-11-20 14:17 ` [lustre-devel] [PATCH 18/22] lnet: extend lnet_is_nid_in_ping_info() James Simmons
2022-11-20 14:17 ` [lustre-devel] [PATCH 19/22] lnet: find correct primary for peer James Simmons
2022-11-20 14:17 ` [lustre-devel] [PATCH 20/22] lnet: change lnet_notify() to take struct lnet_nid James Simmons
2022-11-20 14:17 ` [lustre-devel] [PATCH 21/22] lnet: discard lnet_nid2ni_*() James Simmons
2022-11-20 14:17 ` [lustre-devel] [PATCH 22/22] lnet: change lnet_debug_peer() to struct lnet_nid James Simmons

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=1668953828-10909-17-git-send-email-jsimmons@infradead.org \
    --to=jsimmons@infradead.org \
    --cc=adilger@whamcloud.com \
    --cc=green@whamcloud.com \
    --cc=lustre-devel@lists.lustre.org \
    --cc=neilb@suse.de \
    --cc=vitaly.fertman@hpe.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.