All of lore.kernel.org
 help / color / mirror / Atom feed
* [PATCH -next 0/5] ipc/sem: semop(2) improvements
@ 2016-09-12 11:53 Davidlohr Bueso
  2016-09-12 11:53 ` [PATCH 1/5] ipc/sem: do not call wake_sem_queue_do() prematurely Davidlohr Bueso
                   ` (4 more replies)
  0 siblings, 5 replies; 18+ messages in thread
From: Davidlohr Bueso @ 2016-09-12 11:53 UTC (permalink / raw)
  To: akpm; +Cc: manfred, dave, linux-kernel

Hi,

Here are a few updates around the semop syscall handling that I noticed while
reviewing Manfred's simple vs complex ops fixes. Changes are on top of -next,
which means that Manfred's pending patches to ipc/sem.c that remove the redundant
barrier(s) would probably have to be rebased.

The patchset has survived the following testscases:
- ltp
- ipcsemtest (https://github.com/manfred-colorfu/ipcsemtest)
- ipcscale (https://github.com/manfred-colorfu/ipcscale)

Details are in each individual patch. Please consider for v4.9.

Thanks!

Davidlohr Bueso (5):
  ipc/sem: do not call wake_sem_queue_do() prematurely
  ipc/sem: rework task wakeups
  ipc/sem: optimize perform_atomic_semop()
  ipc/sem: explicitly inline check_restart
  ipc/sem: use proper list api for pending_list wakeups

 ipc/sem.c | 404 ++++++++++++++++++++++++++++----------------------------------
 1 file changed, 185 insertions(+), 219 deletions(-)

-- 
2.6.6

^ permalink raw reply	[flat|nested] 18+ messages in thread

* [PATCH 1/5] ipc/sem: do not call wake_sem_queue_do() prematurely
  2016-09-12 11:53 [PATCH -next 0/5] ipc/sem: semop(2) improvements Davidlohr Bueso
@ 2016-09-12 11:53 ` Davidlohr Bueso
  2016-09-13  4:17   ` Manfred Spraul
  2016-09-12 11:53 ` [PATCH 2/5] ipc/sem: rework task wakeups Davidlohr Bueso
                   ` (3 subsequent siblings)
  4 siblings, 1 reply; 18+ messages in thread
From: Davidlohr Bueso @ 2016-09-12 11:53 UTC (permalink / raw)
  To: akpm; +Cc: manfred, dave, linux-kernel, Davidlohr Bueso

... as this call should obviously be paired with its _prepare()
counterpart. At least whenever possible, as there is no harm in
calling it bogusly as we do now in a few places. Immediate error
semop(2) paths that are far from ever having the task block can
be simplified and avoid a few unnecessary loads on their way out
of the call as it is not deeply nested.

Signed-off-by: Davidlohr Bueso <dbueso@suse.de>
---
 ipc/sem.c | 19 ++++++++++++-------
 1 file changed, 12 insertions(+), 7 deletions(-)

diff --git a/ipc/sem.c b/ipc/sem.c
index 5e318c5f749d..a4e8bb2fae38 100644
--- a/ipc/sem.c
+++ b/ipc/sem.c
@@ -1887,16 +1887,22 @@ SYSCALL_DEFINE4(semtimedop, int, semid, struct sembuf __user *, tsops,
 	}
 
 	error = -EFBIG;
-	if (max >= sma->sem_nsems)
-		goto out_rcu_wakeup;
+	if (max >= sma->sem_nsems) {
+		rcu_read_unlock();
+		goto out_free;
+	}
 
 	error = -EACCES;
-	if (ipcperms(ns, &sma->sem_perm, alter ? S_IWUGO : S_IRUGO))
-		goto out_rcu_wakeup;
+	if (ipcperms(ns, &sma->sem_perm, alter ? S_IWUGO : S_IRUGO)) {
+		rcu_read_unlock();
+		goto out_free;
+	}
 
 	error = security_sem_semop(sma, sops, nsops, alter);
-	if (error)
-		goto out_rcu_wakeup;
+	if (error) {
+		rcu_read_unlock();
+		goto out_free;
+	}
 
 	error = -EIDRM;
 	locknum = sem_lock(sma, sops, nsops);
@@ -2039,7 +2045,6 @@ sleep_again:
 
 out_unlock_free:
 	sem_unlock(sma, locknum);
-out_rcu_wakeup:
 	rcu_read_unlock();
 	wake_up_sem_queue_do(&tasks);
 out_free:
-- 
2.6.6

^ permalink raw reply related	[flat|nested] 18+ messages in thread

* [PATCH 2/5] ipc/sem: rework task wakeups
  2016-09-12 11:53 [PATCH -next 0/5] ipc/sem: semop(2) improvements Davidlohr Bueso
  2016-09-12 11:53 ` [PATCH 1/5] ipc/sem: do not call wake_sem_queue_do() prematurely Davidlohr Bueso
@ 2016-09-12 11:53 ` Davidlohr Bueso
  2016-09-13 18:04   ` Manfred Spraul
  2016-09-18 14:37   ` Manfred Spraul
  2016-09-12 11:53 ` [PATCH 3/5] ipc/sem: optimize perform_atomic_semop() Davidlohr Bueso
                   ` (2 subsequent siblings)
  4 siblings, 2 replies; 18+ messages in thread
From: Davidlohr Bueso @ 2016-09-12 11:53 UTC (permalink / raw)
  To: akpm; +Cc: manfred, dave, linux-kernel, Davidlohr Bueso

Our sysv sems have been using the notion of lockless wakeups for a while,
ever since 0a2b9d4c796 (ipc/sem.c: move wake_up_process out of the spinlock
section), in order to reduce the sem_lock hold times. This in-house pending
queue can be replaced by wake_q (just like all the rest of ipc now), in that
it provides the following advantages:

o Simplifies and gets rid of unnecessary code.

o We get rid of the IN_WAKEUP complexities. Given that wake_q_add() grabs
reference to the task, if awoken due to an unrelated event, between the
wake_q_add() and wake_up_q() window, we cannot race with sys_exit and the
imminent call to wake_up_process().

o By not spinning IN_WAKEUP, we no longer need to disable preemption.

In consequence, the wakeup paths (after schedule(), that is) must acknowledge
an external signal/event, as well spurious wakeup occurring during the pending
wakeup window. Obviously no changes in semantics that could be visible to the
user. The fastpath is _only_ for when we know for sure that we were awoken due
to a the waker's successful semop call (queue.status is not -EINTR).

On a 48-core Haswell, running the ipcscale 'waitforzero' test, the following
is seen with increasing thread counts:

                               v4.8-rc5                v4.8-rc5
                                                        semopv1
Hmean    sembench-sem-2      574733.00 (  0.00%)   578322.00 (  0.62%)
Hmean    sembench-sem-8      811708.00 (  0.00%)   824689.00 (  1.59%)
Hmean    sembench-sem-12     842448.00 (  0.00%)   845409.00 (  0.35%)
Hmean    sembench-sem-21     933003.00 (  0.00%)   977748.00 (  4.80%)
Hmean    sembench-sem-48     935910.00 (  0.00%)  1004759.00 (  7.36%)
Hmean    sembench-sem-79     937186.00 (  0.00%)   983976.00 (  4.99%)
Hmean    sembench-sem-234    974256.00 (  0.00%)  1060294.00 (  8.83%)
Hmean    sembench-sem-265    975468.00 (  0.00%)  1016243.00 (  4.18%)
Hmean    sembench-sem-296    991280.00 (  0.00%)  1042659.00 (  5.18%)
Hmean    sembench-sem-327    975415.00 (  0.00%)  1029977.00 (  5.59%)
Hmean    sembench-sem-358   1014286.00 (  0.00%)  1049624.00 (  3.48%)
Hmean    sembench-sem-389    972939.00 (  0.00%)  1043127.00 (  7.21%)
Hmean    sembench-sem-420    981909.00 (  0.00%)  1056747.00 (  7.62%)
Hmean    sembench-sem-451    990139.00 (  0.00%)  1051609.00 (  6.21%)
Hmean    sembench-sem-482    965735.00 (  0.00%)  1040313.00 (  7.72%)
  
Signed-off-by: Davidlohr Bueso <dbueso@suse.de>
---
 ipc/sem.c | 268 +++++++++++++++++++-------------------------------------------
 1 file changed, 83 insertions(+), 185 deletions(-)

diff --git a/ipc/sem.c b/ipc/sem.c
index a4e8bb2fae38..86467b5b78ad 100644
--- a/ipc/sem.c
+++ b/ipc/sem.c
@@ -13,6 +13,7 @@
  * (c) 2003 Manfred Spraul <manfred@colorfullife.com>
  * Further wakeup optimizations, documentation
  * (c) 2010 Manfred Spraul <manfred@colorfullife.com>
+ * (c) 2016 Davidlohr Bueso <dave@stgolabs.net>
  *
  * support for audit of ipc object properties and permission changes
  * Dustin Kirkland <dustin.kirkland@us.ibm.com>
@@ -53,15 +54,11 @@
  *   Semaphores are actively given to waiting tasks (necessary for FIFO).
  *   (see update_queue())
  * - To improve the scalability, the actual wake-up calls are performed after
- *   dropping all locks. (see wake_up_sem_queue_prepare(),
- *   wake_up_sem_queue_do())
+ *   dropping all locks. (see wake_up_sem_queue_prepare())
  * - All work is done by the waker, the woken up task does not have to do
  *   anything - not even acquiring a lock or dropping a refcount.
  * - A woken up task may not even touch the semaphore array anymore, it may
  *   have been destroyed already by a semctl(RMID).
- * - The synchronizations between wake-ups due to a timeout/signal and a
- *   wake-up due to a completed semaphore operation is achieved by using an
- *   intermediate state (IN_WAKEUP).
  * - UNDO values are stored in an array (one per process and per
  *   semaphore array, lazily allocated). For backwards compatibility, multiple
  *   modes for the UNDO variables are supported (per process, per thread)
@@ -471,40 +468,6 @@ static inline void sem_rmid(struct ipc_namespace *ns, struct sem_array *s)
 	ipc_rmid(&sem_ids(ns), &s->sem_perm);
 }
 
-/*
- * Lockless wakeup algorithm:
- * Without the check/retry algorithm a lockless wakeup is possible:
- * - queue.status is initialized to -EINTR before blocking.
- * - wakeup is performed by
- *	* unlinking the queue entry from the pending list
- *	* setting queue.status to IN_WAKEUP
- *	  This is the notification for the blocked thread that a
- *	  result value is imminent.
- *	* call wake_up_process
- *	* set queue.status to the final value.
- * - the previously blocked thread checks queue.status:
- *	* if it's IN_WAKEUP, then it must wait until the value changes
- *	* if it's not -EINTR, then the operation was completed by
- *	  update_queue. semtimedop can return queue.status without
- *	  performing any operation on the sem array.
- *	* otherwise it must acquire the spinlock and check what's up.
- *
- * The two-stage algorithm is necessary to protect against the following
- * races:
- * - if queue.status is set after wake_up_process, then the woken up idle
- *   thread could race forward and try (and fail) to acquire sma->lock
- *   before update_queue had a chance to set queue.status
- * - if queue.status is written before wake_up_process and if the
- *   blocked process is woken up by a signal between writing
- *   queue.status and the wake_up_process, then the woken up
- *   process could return from semtimedop and die by calling
- *   sys_exit before wake_up_process is called. Then wake_up_process
- *   will oops, because the task structure is already invalid.
- *   (yes, this happened on s390 with sysv msg).
- *
- */
-#define IN_WAKEUP	1
-
 /**
  * newary - Create a new semaphore set
  * @ns: namespace
@@ -703,51 +666,11 @@ undo:
 	return result;
 }
 
-/** wake_up_sem_queue_prepare(q, error): Prepare wake-up
- * @q: queue entry that must be signaled
- * @error: Error value for the signal
- *
- * Prepare the wake-up of the queue entry q.
- */
-static void wake_up_sem_queue_prepare(struct list_head *pt,
-				struct sem_queue *q, int error)
-{
-	if (list_empty(pt)) {
-		/*
-		 * Hold preempt off so that we don't get preempted and have the
-		 * wakee busy-wait until we're scheduled back on.
-		 */
-		preempt_disable();
-	}
-	q->status = IN_WAKEUP;
-	q->pid = error;
-
-	list_add_tail(&q->list, pt);
-}
-
-/**
- * wake_up_sem_queue_do - do the actual wake-up
- * @pt: list of tasks to be woken up
- *
- * Do the actual wake-up.
- * The function is called without any locks held, thus the semaphore array
- * could be destroyed already and the tasks can disappear as soon as the
- * status is set to the actual return code.
- */
-static void wake_up_sem_queue_do(struct list_head *pt)
+static inline void wake_up_sem_queue_prepare(struct sem_queue *q, int error,
+					     struct wake_q_head *wake_q)
 {
-	struct sem_queue *q, *t;
-	int did_something;
-
-	did_something = !list_empty(pt);
-	list_for_each_entry_safe(q, t, pt, list) {
-		wake_up_process(q->sleeper);
-		/* q can disappear immediately after writing q->status. */
-		smp_wmb();
-		q->status = q->pid;
-	}
-	if (did_something)
-		preempt_enable();
+	wake_q_add(wake_q, q->sleeper);
+	q->status = error;
 }
 
 static void unlink_queue(struct sem_array *sma, struct sem_queue *q)
@@ -795,18 +718,18 @@ static int check_restart(struct sem_array *sma, struct sem_queue *q)
  * wake_const_ops - wake up non-alter tasks
  * @sma: semaphore array.
  * @semnum: semaphore that was modified.
- * @pt: list head for the tasks that must be woken up.
+ * @wake_q: lockless wake-queue head.
  *
  * wake_const_ops must be called after a semaphore in a semaphore array
  * was set to 0. If complex const operations are pending, wake_const_ops must
  * be called with semnum = -1, as well as with the number of each modified
  * semaphore.
- * The tasks that must be woken up are added to @pt. The return code
+ * The tasks that must be woken up are added to @wake_q. The return code
  * is stored in q->pid.
  * The function returns 1 if at least one operation was completed successfully.
  */
 static int wake_const_ops(struct sem_array *sma, int semnum,
-				struct list_head *pt)
+			  struct wake_q_head *wake_q)
 {
 	struct sem_queue *q;
 	struct list_head *walk;
@@ -832,7 +755,7 @@ static int wake_const_ops(struct sem_array *sma, int semnum,
 
 			unlink_queue(sma, q);
 
-			wake_up_sem_queue_prepare(pt, q, error);
+			wake_up_sem_queue_prepare(q, error, wake_q);
 			if (error == 0)
 				semop_completed = 1;
 		}
@@ -845,14 +768,14 @@ static int wake_const_ops(struct sem_array *sma, int semnum,
  * @sma: semaphore array
  * @sops: operations that were performed
  * @nsops: number of operations
- * @pt: list head of the tasks that must be woken up.
+ * @wake_q: lockless wake-queue head
  *
  * Checks all required queue for wait-for-zero operations, based
  * on the actual changes that were performed on the semaphore array.
  * The function returns 1 if at least one operation was completed successfully.
  */
 static int do_smart_wakeup_zero(struct sem_array *sma, struct sembuf *sops,
-					int nsops, struct list_head *pt)
+				int nsops, struct wake_q_head *wake_q)
 {
 	int i;
 	int semop_completed = 0;
@@ -865,7 +788,7 @@ static int do_smart_wakeup_zero(struct sem_array *sma, struct sembuf *sops,
 
 			if (sma->sem_base[num].semval == 0) {
 				got_zero = 1;
-				semop_completed |= wake_const_ops(sma, num, pt);
+				semop_completed |= wake_const_ops(sma, num, wake_q);
 			}
 		}
 	} else {
@@ -876,7 +799,7 @@ static int do_smart_wakeup_zero(struct sem_array *sma, struct sembuf *sops,
 		for (i = 0; i < sma->sem_nsems; i++) {
 			if (sma->sem_base[i].semval == 0) {
 				got_zero = 1;
-				semop_completed |= wake_const_ops(sma, i, pt);
+				semop_completed |= wake_const_ops(sma, i, wake_q);
 			}
 		}
 	}
@@ -885,7 +808,7 @@ static int do_smart_wakeup_zero(struct sem_array *sma, struct sembuf *sops,
 	 * then check the global queue, too.
 	 */
 	if (got_zero)
-		semop_completed |= wake_const_ops(sma, -1, pt);
+		semop_completed |= wake_const_ops(sma, -1, wake_q);
 
 	return semop_completed;
 }
@@ -895,19 +818,19 @@ static int do_smart_wakeup_zero(struct sem_array *sma, struct sembuf *sops,
  * update_queue - look for tasks that can be completed.
  * @sma: semaphore array.
  * @semnum: semaphore that was modified.
- * @pt: list head for the tasks that must be woken up.
+ * @wake_q: lockless wake-queue head.
  *
  * update_queue must be called after a semaphore in a semaphore array
  * was modified. If multiple semaphores were modified, update_queue must
  * be called with semnum = -1, as well as with the number of each modified
  * semaphore.
- * The tasks that must be woken up are added to @pt. The return code
+ * The tasks that must be woken up are added to @wake_q. The return code
  * is stored in q->pid.
  * The function internally checks if const operations can now succeed.
  *
  * The function return 1 if at least one semop was completed successfully.
  */
-static int update_queue(struct sem_array *sma, int semnum, struct list_head *pt)
+static int update_queue(struct sem_array *sma, int semnum, struct wake_q_head *wake_q)
 {
 	struct sem_queue *q;
 	struct list_head *walk;
@@ -949,11 +872,11 @@ again:
 			restart = 0;
 		} else {
 			semop_completed = 1;
-			do_smart_wakeup_zero(sma, q->sops, q->nsops, pt);
+			do_smart_wakeup_zero(sma, q->sops, q->nsops, wake_q);
 			restart = check_restart(sma, q);
 		}
 
-		wake_up_sem_queue_prepare(pt, q, error);
+		wake_up_sem_queue_prepare(q, error, wake_q);
 		if (restart)
 			goto again;
 	}
@@ -984,24 +907,24 @@ static void set_semotime(struct sem_array *sma, struct sembuf *sops)
  * @sops: operations that were performed
  * @nsops: number of operations
  * @otime: force setting otime
- * @pt: list head of the tasks that must be woken up.
+ * @wake_q: lockless wake-queue head
  *
  * do_smart_update() does the required calls to update_queue and wakeup_zero,
  * based on the actual changes that were performed on the semaphore array.
  * Note that the function does not do the actual wake-up: the caller is
- * responsible for calling wake_up_sem_queue_do(@pt).
+ * responsible for calling wake_up_q().
  * It is safe to perform this call after dropping all locks.
  */
 static void do_smart_update(struct sem_array *sma, struct sembuf *sops, int nsops,
-			int otime, struct list_head *pt)
+			    int otime, struct wake_q_head *wake_q)
 {
 	int i;
 
-	otime |= do_smart_wakeup_zero(sma, sops, nsops, pt);
+	otime |= do_smart_wakeup_zero(sma, sops, nsops, wake_q);
 
 	if (!list_empty(&sma->pending_alter)) {
 		/* semaphore array uses the global queue - just process it. */
-		otime |= update_queue(sma, -1, pt);
+		otime |= update_queue(sma, -1, wake_q);
 	} else {
 		if (!sops) {
 			/*
@@ -1009,7 +932,7 @@ static void do_smart_update(struct sem_array *sma, struct sembuf *sops, int nsop
 			 * known. Check all.
 			 */
 			for (i = 0; i < sma->sem_nsems; i++)
-				otime |= update_queue(sma, i, pt);
+				otime |= update_queue(sma, i, wake_q);
 		} else {
 			/*
 			 * Check the semaphores that were increased:
@@ -1023,7 +946,7 @@ static void do_smart_update(struct sem_array *sma, struct sembuf *sops, int nsop
 			for (i = 0; i < nsops; i++) {
 				if (sops[i].sem_op > 0) {
 					otime |= update_queue(sma,
-							sops[i].sem_num, pt);
+							      sops[i].sem_num, wake_q);
 				}
 			}
 		}
@@ -1111,8 +1034,8 @@ static void freeary(struct ipc_namespace *ns, struct kern_ipc_perm *ipcp)
 	struct sem_undo *un, *tu;
 	struct sem_queue *q, *tq;
 	struct sem_array *sma = container_of(ipcp, struct sem_array, sem_perm);
-	struct list_head tasks;
 	int i;
+	WAKE_Q(wake_q);
 
 	/* Free the existing undo structures for this semaphore set.  */
 	ipc_assert_locked_object(&sma->sem_perm);
@@ -1126,25 +1049,24 @@ static void freeary(struct ipc_namespace *ns, struct kern_ipc_perm *ipcp)
 	}
 
 	/* Wake up all pending processes and let them fail with EIDRM. */
-	INIT_LIST_HEAD(&tasks);
 	list_for_each_entry_safe(q, tq, &sma->pending_const, list) {
 		unlink_queue(sma, q);
-		wake_up_sem_queue_prepare(&tasks, q, -EIDRM);
+		wake_up_sem_queue_prepare(q, -EIDRM, &wake_q);
 	}
 
 	list_for_each_entry_safe(q, tq, &sma->pending_alter, list) {
 		unlink_queue(sma, q);
-		wake_up_sem_queue_prepare(&tasks, q, -EIDRM);
+		wake_up_sem_queue_prepare(q, -EIDRM, &wake_q);
 	}
 	for (i = 0; i < sma->sem_nsems; i++) {
 		struct sem *sem = sma->sem_base + i;
 		list_for_each_entry_safe(q, tq, &sem->pending_const, list) {
 			unlink_queue(sma, q);
-			wake_up_sem_queue_prepare(&tasks, q, -EIDRM);
+			wake_up_sem_queue_prepare(q, -EIDRM, &wake_q);
 		}
 		list_for_each_entry_safe(q, tq, &sem->pending_alter, list) {
 			unlink_queue(sma, q);
-			wake_up_sem_queue_prepare(&tasks, q, -EIDRM);
+			wake_up_sem_queue_prepare(q, -EIDRM, &wake_q);
 		}
 	}
 
@@ -1153,7 +1075,7 @@ static void freeary(struct ipc_namespace *ns, struct kern_ipc_perm *ipcp)
 	sem_unlock(sma, -1);
 	rcu_read_unlock();
 
-	wake_up_sem_queue_do(&tasks);
+	wake_up_q(&wake_q);
 	ns->used_sems -= sma->sem_nsems;
 	ipc_rcu_putref(sma, sem_rcu_free);
 }
@@ -1292,9 +1214,9 @@ static int semctl_setval(struct ipc_namespace *ns, int semid, int semnum,
 	struct sem_undo *un;
 	struct sem_array *sma;
 	struct sem *curr;
-	int err;
-	struct list_head tasks;
-	int val;
+	int err, val;
+	WAKE_Q(wake_q);
+
 #if defined(CONFIG_64BIT) && defined(__BIG_ENDIAN)
 	/* big-endian 64bit */
 	val = arg >> 32;
@@ -1306,8 +1228,6 @@ static int semctl_setval(struct ipc_namespace *ns, int semid, int semnum,
 	if (val > SEMVMX || val < 0)
 		return -ERANGE;
 
-	INIT_LIST_HEAD(&tasks);
-
 	rcu_read_lock();
 	sma = sem_obtain_object_check(ns, semid);
 	if (IS_ERR(sma)) {
@@ -1350,10 +1270,10 @@ static int semctl_setval(struct ipc_namespace *ns, int semid, int semnum,
 	curr->sempid = task_tgid_vnr(current);
 	sma->sem_ctime = get_seconds();
 	/* maybe some queued-up processes were waiting for this */
-	do_smart_update(sma, NULL, 0, 0, &tasks);
+	do_smart_update(sma, NULL, 0, 0, &wake_q);
 	sem_unlock(sma, -1);
 	rcu_read_unlock();
-	wake_up_sem_queue_do(&tasks);
+	wake_up_q(&wake_q);
 	return 0;
 }
 
@@ -1365,9 +1285,7 @@ static int semctl_main(struct ipc_namespace *ns, int semid, int semnum,
 	int err, nsems;
 	ushort fast_sem_io[SEMMSL_FAST];
 	ushort *sem_io = fast_sem_io;
-	struct list_head tasks;
-
-	INIT_LIST_HEAD(&tasks);
+	WAKE_Q(wake_q);
 
 	rcu_read_lock();
 	sma = sem_obtain_object_check(ns, semid);
@@ -1478,7 +1396,7 @@ static int semctl_main(struct ipc_namespace *ns, int semid, int semnum,
 		}
 		sma->sem_ctime = get_seconds();
 		/* maybe some queued-up processes were waiting for this */
-		do_smart_update(sma, NULL, 0, 0, &tasks);
+		do_smart_update(sma, NULL, 0, 0, &wake_q);
 		err = 0;
 		goto out_unlock;
 	}
@@ -1514,7 +1432,7 @@ out_unlock:
 	sem_unlock(sma, -1);
 out_rcu_wakeup:
 	rcu_read_unlock();
-	wake_up_sem_queue_do(&tasks);
+	wake_up_q(&wake_q);
 out_free:
 	if (sem_io != fast_sem_io)
 		ipc_free(sem_io);
@@ -1787,32 +1705,6 @@ out:
 	return un;
 }
 
-
-/**
- * get_queue_result - retrieve the result code from sem_queue
- * @q: Pointer to queue structure
- *
- * Retrieve the return code from the pending queue. If IN_WAKEUP is found in
- * q->status, then we must loop until the value is replaced with the final
- * value: This may happen if a task is woken up by an unrelated event (e.g.
- * signal) and in parallel the task is woken up by another task because it got
- * the requested semaphores.
- *
- * The function can be called with or without holding the semaphore spinlock.
- */
-static int get_queue_result(struct sem_queue *q)
-{
-	int error;
-
-	error = q->status;
-	while (unlikely(error == IN_WAKEUP)) {
-		cpu_relax();
-		error = q->status;
-	}
-
-	return error;
-}
-
 SYSCALL_DEFINE4(semtimedop, int, semid, struct sembuf __user *, tsops,
 		unsigned, nsops, const struct timespec __user *, timeout)
 {
@@ -1825,7 +1717,6 @@ SYSCALL_DEFINE4(semtimedop, int, semid, struct sembuf __user *, tsops,
 	struct sem_queue queue;
 	unsigned long jiffies_left = 0;
 	struct ipc_namespace *ns;
-	struct list_head tasks;
 
 	ns = current->nsproxy->ipc_ns;
 
@@ -1865,7 +1756,6 @@ SYSCALL_DEFINE4(semtimedop, int, semid, struct sembuf __user *, tsops,
 			alter = 1;
 	}
 
-	INIT_LIST_HEAD(&tasks);
 
 	if (undos) {
 		/* On success, find_alloc_undo takes the rcu_read_lock */
@@ -1933,22 +1823,32 @@ SYSCALL_DEFINE4(semtimedop, int, semid, struct sembuf __user *, tsops,
 	queue.alter = alter;
 
 	error = perform_atomic_semop(sma, &queue);
-	if (error == 0) {
-		/* If the operation was successful, then do
+	if (error <= 0) { /* non-blocking path */
+		WAKE_Q(wake_q);
+
+		/*
+		 * If the operation was successful, then do
 		 * the required updates.
 		 */
-		if (alter)
-			do_smart_update(sma, sops, nsops, 1, &tasks);
-		else
-			set_semotime(sma, sops);
+		if (error == 0) {
+			if (alter)
+				do_smart_update(sma, sops, nsops, 1, &wake_q);
+			else
+				set_semotime(sma, sops);
+
+		}
+
+		sem_unlock(sma, locknum);
+		rcu_read_unlock();
+		wake_up_q(&wake_q);
+
+		goto out_free;
 	}
-	if (error <= 0)
-		goto out_unlock_free;
 
-	/* We need to sleep on this operation, so we put the current
+	/*
+	 * We need to sleep on this operation, so we put the current
 	 * task into the pending queue and go to sleep.
 	 */
-
 	if (nsops == 1) {
 		struct sem *curr;
 		curr = &sma->sem_base[sops->sem_num];
@@ -1977,10 +1877,10 @@ SYSCALL_DEFINE4(semtimedop, int, semid, struct sembuf __user *, tsops,
 		sma->complex_count++;
 	}
 
+sleep_again:
 	queue.status = -EINTR;
 	queue.sleeper = current;
 
-sleep_again:
 	__set_current_state(TASK_INTERRUPTIBLE);
 	sem_unlock(sma, locknum);
 	rcu_read_unlock();
@@ -1990,28 +1890,29 @@ sleep_again:
 	else
 		schedule();
 
-	error = get_queue_result(&queue);
-
-	if (error != -EINTR) {
-		/* fast path: update_queue already obtained all requested
-		 * resources.
-		 * Perform a smp_mb(): User space could assume that semop()
-		 * is a memory barrier: Without the mb(), the cpu could
-		 * speculatively read in user space stale data that was
-		 * overwritten by the previous owner of the semaphore.
+	/*
+	 * fastpath: the semop has completed, either successfully or not, from
+	 * the syscall pov, is quite irrelevant to us at this point; we're done.
+	 *
+	 * We _do_ care, nonetheless, about being awoken by a signal or spuriously.
+	 * The queue.status is checked again in the slowpath (aka after taking
+	 * sem_lock), such that we can detect scenarios where we were awakened
+	 * externally, during the window between wake_q_add() and wake_up_q().
+	 */
+	if ((error = queue.status) != -EINTR && !signal_pending(current)) {
+		/*
+		 * User space could assume that semop() is a memory barrier:
+		 * Without the mb(), the cpu could speculatively read in user
+		 * space stale data that was overwritten by the previous owner
+		 * of the semaphore.
 		 */
 		smp_mb();
-
 		goto out_free;
 	}
 
 	rcu_read_lock();
 	sma = sem_obtain_lock(ns, semid, sops, nsops, &locknum);
-
-	/*
-	 * Wait until it's guaranteed that no wakeup_sem_queue_do() is ongoing.
-	 */
-	error = get_queue_result(&queue);
+	error = queue.status;
 
 	/*
 	 * Array removed? If yes, leave without sem_unlock().
@@ -2021,7 +1922,6 @@ sleep_again:
 		goto out_free;
 	}
 
-
 	/*
 	 * If queue.status != -EINTR we are woken up by another process.
 	 * Leave without unlink_queue(), but with sem_unlock().
@@ -2030,13 +1930,13 @@ sleep_again:
 		goto out_unlock_free;
 
 	/*
-	 * If an interrupt occurred we have to clean up the queue
+	 * If an interrupt occurred we have to clean up the queue.
 	 */
 	if (timeout && jiffies_left == 0)
 		error = -EAGAIN;
 
 	/*
-	 * If the wakeup was spurious, just retry
+	 * If the wakeup was spurious, just retry.
 	 */
 	if (error == -EINTR && !signal_pending(current))
 		goto sleep_again;
@@ -2046,7 +1946,6 @@ sleep_again:
 out_unlock_free:
 	sem_unlock(sma, locknum);
 	rcu_read_unlock();
-	wake_up_sem_queue_do(&tasks);
 out_free:
 	if (sops != fast_sops)
 		kfree(sops);
@@ -2107,8 +2006,8 @@ void exit_sem(struct task_struct *tsk)
 	for (;;) {
 		struct sem_array *sma;
 		struct sem_undo *un;
-		struct list_head tasks;
 		int semid, i;
+		WAKE_Q(wake_q);
 
 		rcu_read_lock();
 		un = list_entry_rcu(ulp->list_proc.next,
@@ -2194,11 +2093,10 @@ void exit_sem(struct task_struct *tsk)
 			}
 		}
 		/* maybe some queued-up processes were waiting for this */
-		INIT_LIST_HEAD(&tasks);
-		do_smart_update(sma, NULL, 0, 1, &tasks);
+		do_smart_update(sma, NULL, 0, 1, &wake_q);
 		sem_unlock(sma, -1);
 		rcu_read_unlock();
-		wake_up_sem_queue_do(&tasks);
+		wake_up_q(&wake_q);
 
 		kfree_rcu(un, rcu);
 	}
--
2.6.6

^ permalink raw reply related	[flat|nested] 18+ messages in thread

* [PATCH 3/5] ipc/sem: optimize perform_atomic_semop()
  2016-09-12 11:53 [PATCH -next 0/5] ipc/sem: semop(2) improvements Davidlohr Bueso
  2016-09-12 11:53 ` [PATCH 1/5] ipc/sem: do not call wake_sem_queue_do() prematurely Davidlohr Bueso
  2016-09-12 11:53 ` [PATCH 2/5] ipc/sem: rework task wakeups Davidlohr Bueso
@ 2016-09-12 11:53 ` Davidlohr Bueso
  2016-09-12 17:56   ` Manfred Spraul
  2016-09-12 11:53 ` [PATCH 4/5] ipc/sem: explicitly inline check_restart Davidlohr Bueso
  2016-09-12 11:53 ` [PATCH 5/5] ipc/sem: use proper list api for pending_list wakeups Davidlohr Bueso
  4 siblings, 1 reply; 18+ messages in thread
From: Davidlohr Bueso @ 2016-09-12 11:53 UTC (permalink / raw)
  To: akpm; +Cc: manfred, dave, linux-kernel, Davidlohr Bueso

This is the main workhorse that deals with semop user calls
such that the waitforzero or semval update operations, on the
set, can complete on not as the sma currently stands. Currently,
the set is iterated twice (setting semval, then backwards for
the sempid value). Slowpaths, and particularly SEM_UNDO calls,
must undo any altered sem when it is detected that the caller
must block or has errored-out.

With larger sets, there can occur situations where this involves
a lot of cycles and can obviously be a suboptimal use of cached
resources in shared memory. Ie, discarding CPU caches that are
also calling semop and have the sembuf cached (and can complete),
while the current lock holder doing the semop will block, error,
or does a waitforzero operation.

This patch proposes still iterating the set twice, but the first
scan is read-only, and we perform the actual updates afterward,
once we know that the call will succeed. In order to not suffer
from the overhead of dealing with sops that act on the same sem_num,
such (rare )cases use perform_atomic_semop_slow(), which is exactly
what we have now. Duplicates are detected before grabbing sem_lock,
and uses simple a 64-bit variable to enable the sem_num-th bit.
Of course, this means that semops calls with a sem_num larger than
64 (SEMOPM_FAST, for now, as this is really about the nsops), will
take the _slow() alternative; but many real-world workloads only
work on a handful of semaphores in a given set, thus good enough
for the common case.

In addition add some comments to when we expect to the caller
to block.

Signed-off-by: Davidlohr Bueso <dbueso@suse.de>
---
 ipc/sem.c | 89 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-----
 1 file changed, 82 insertions(+), 7 deletions(-)

diff --git a/ipc/sem.c b/ipc/sem.c
index 86467b5b78ad..d9c743ac17ff 100644
--- a/ipc/sem.c
+++ b/ipc/sem.c
@@ -115,7 +115,8 @@ struct sem_queue {
 	struct sembuf		*sops;	 /* array of pending operations */
 	struct sembuf		*blocking; /* the operation that blocked */
 	int			nsops;	 /* number of operations */
-	int			alter;	 /* does *sops alter the array? */
+	bool			alter;   /* does *sops alter the array? */
+	bool                    dupsop;	 /* sops on more than one sem_num */
 };
 
 /* Each task has a list of undo requests. They are executed automatically
@@ -595,7 +596,8 @@ SYSCALL_DEFINE3(semget, key_t, key, int, nsems, int, semflg)
  * Returns 1 if the operation is impossible, the caller must sleep.
  * Negative values are error codes.
  */
-static int perform_atomic_semop(struct sem_array *sma, struct sem_queue *q)
+static int perform_atomic_semop_slow(struct sem_array *sma,
+				     struct sem_queue *q)
 {
 	int result, sem_op, nsops, pid;
 	struct sembuf *sop;
@@ -666,6 +668,72 @@ undo:
 	return result;
 }
 
+static int perform_atomic_semop(struct sem_array *sma, struct sem_queue *q)
+{
+	int result, sem_op, nsops;
+	struct sembuf *sop;
+	struct sem *curr;
+	struct sembuf *sops;
+	struct sem_undo *un;
+
+	sops = q->sops;
+	nsops = q->nsops;
+	un = q->undo;
+
+	if (q->dupsop)
+		return perform_atomic_semop_slow(sma, q);
+
+	/*
+	 * We scan the semaphore set twice, first to ensure that the entire
+	 * operation can succeed, therefore avoiding any pointless writes
+	 * to shared memory and having to undo such changes in order to block
+	 * until the operations can go through.
+	 */
+	for (sop = sops; sop < sops + nsops; sop++) {
+		curr = sma->sem_base + sop->sem_num;
+		sem_op = sop->sem_op;
+		result = curr->semval;
+
+		if (!sem_op && result)
+			goto would_block; /* wait-for-zero */
+
+		result += sem_op;
+		if (result < 0)
+			goto would_block;
+
+		if (result > SEMVMX)
+			return -ERANGE;
+
+		if (sop->sem_flg & SEM_UNDO) {
+			int undo = un->semadj[sop->sem_num] - sem_op;
+
+			/* Exceeding the undo range is an error. */
+			if (undo < (-SEMAEM - 1) || undo > SEMAEM)
+				return -ERANGE;
+		}
+	}
+
+	for (sop = sops; sop < sops + nsops; sop++) {
+		curr = sma->sem_base + sop->sem_num;
+		sem_op = sop->sem_op;
+		result = curr->semval;
+
+		if (sop->sem_flg & SEM_UNDO) {
+			int undo = un->semadj[sop->sem_num] - sem_op;
+			un->semadj[sop->sem_num] = undo;
+		}
+
+		curr->semval += sem_op;
+		curr->sempid = q->pid;
+	}
+
+	return 0;
+
+would_block:
+	q->blocking = sop;
+	return sop->sem_flg & IPC_NOWAIT? -EAGAIN : 1;
+}
+
 static inline void wake_up_sem_queue_prepare(struct sem_queue *q, int error,
 					     struct wake_q_head *wake_q)
 {
@@ -1713,9 +1781,10 @@ SYSCALL_DEFINE4(semtimedop, int, semid, struct sembuf __user *, tsops,
 	struct sembuf fast_sops[SEMOPM_FAST];
 	struct sembuf *sops = fast_sops, *sop;
 	struct sem_undo *un;
-	int undos = 0, alter = 0, max, locknum;
+	int max, locknum;
+	bool undos = false, alter = false, dupsop = false;
 	struct sem_queue queue;
-	unsigned long jiffies_left = 0;
+	unsigned long dup = 0, jiffies_left = 0;
 	struct ipc_namespace *ns;
 
 	ns = current->nsproxy->ipc_ns;
@@ -1751,12 +1820,17 @@ SYSCALL_DEFINE4(semtimedop, int, semid, struct sembuf __user *, tsops,
 		if (sop->sem_num >= max)
 			max = sop->sem_num;
 		if (sop->sem_flg & SEM_UNDO)
-			undos = 1;
+			undos = true;
 		if (sop->sem_op != 0)
-			alter = 1;
+			alter = true;
+		if (sop->sem_num < SEMOPM_FAST && !dupsop) {
+			if (dup & (1 << sop->sem_num))
+				dupsop = 1;
+			else
+				dup |= 1 << sop->sem_num;
+		}
 	}
 
-
 	if (undos) {
 		/* On success, find_alloc_undo takes the rcu_read_lock */
 		un = find_alloc_undo(ns, semid);
@@ -1821,6 +1895,7 @@ SYSCALL_DEFINE4(semtimedop, int, semid, struct sembuf __user *, tsops,
 	queue.undo = un;
 	queue.pid = task_tgid_vnr(current);
 	queue.alter = alter;
+	queue.dupsop = dupsop;
 
 	error = perform_atomic_semop(sma, &queue);
 	if (error <= 0) { /* non-blocking path */
-- 
2.6.6

^ permalink raw reply related	[flat|nested] 18+ messages in thread

* [PATCH 4/5] ipc/sem: explicitly inline check_restart
  2016-09-12 11:53 [PATCH -next 0/5] ipc/sem: semop(2) improvements Davidlohr Bueso
                   ` (2 preceding siblings ...)
  2016-09-12 11:53 ` [PATCH 3/5] ipc/sem: optimize perform_atomic_semop() Davidlohr Bueso
@ 2016-09-12 11:53 ` Davidlohr Bueso
  2016-09-12 11:53 ` [PATCH 5/5] ipc/sem: use proper list api for pending_list wakeups Davidlohr Bueso
  4 siblings, 0 replies; 18+ messages in thread
From: Davidlohr Bueso @ 2016-09-12 11:53 UTC (permalink / raw)
  To: akpm; +Cc: manfred, dave, linux-kernel, Davidlohr Bueso

The compiler already does this, but make it explicit. This helper
is really small and also used in update_queue's main loop, which is
O(N^2) scanning. Inline and avoid the function overhead.

Signed-off-by: Davidlohr Bueso <dbueso@suse.de>
---
 ipc/sem.c | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/ipc/sem.c b/ipc/sem.c
index d9c743ac17ff..3774b21c54d4 100644
--- a/ipc/sem.c
+++ b/ipc/sem.c
@@ -758,7 +758,7 @@ static void unlink_queue(struct sem_array *sma, struct sem_queue *q)
  * modified the array.
  * Note that wait-for-zero operations are handled without restart.
  */
-static int check_restart(struct sem_array *sma, struct sem_queue *q)
+static inline int check_restart(struct sem_array *sma, struct sem_queue *q)
 {
 	/* pending complex alter operations are too difficult to analyse */
 	if (!list_empty(&sma->pending_alter))
-- 
2.6.6

^ permalink raw reply related	[flat|nested] 18+ messages in thread

* [PATCH 5/5] ipc/sem: use proper list api for pending_list wakeups
  2016-09-12 11:53 [PATCH -next 0/5] ipc/sem: semop(2) improvements Davidlohr Bueso
                   ` (3 preceding siblings ...)
  2016-09-12 11:53 ` [PATCH 4/5] ipc/sem: explicitly inline check_restart Davidlohr Bueso
@ 2016-09-12 11:53 ` Davidlohr Bueso
  2016-09-18 17:51   ` Manfred Spraul
  4 siblings, 1 reply; 18+ messages in thread
From: Davidlohr Bueso @ 2016-09-12 11:53 UTC (permalink / raw)
  To: akpm; +Cc: manfred, dave, linux-kernel, Davidlohr Bueso

... saves some LoC and looks cleaner than re-implementing the
calls.

Signed-off-by: Davidlohr Bueso <dbueso@suse.de>
---
 ipc/sem.c | 38 +++++++++++++-------------------------
 1 file changed, 13 insertions(+), 25 deletions(-)

diff --git a/ipc/sem.c b/ipc/sem.c
index 3774b21c54d4..64c9d143b300 100644
--- a/ipc/sem.c
+++ b/ipc/sem.c
@@ -799,8 +799,7 @@ static inline int check_restart(struct sem_array *sma, struct sem_queue *q)
 static int wake_const_ops(struct sem_array *sma, int semnum,
 			  struct wake_q_head *wake_q)
 {
-	struct sem_queue *q;
-	struct list_head *walk;
+	struct sem_queue *q, *tmp;
 	struct list_head *pending_list;
 	int semop_completed = 0;
 
@@ -809,25 +808,19 @@ static int wake_const_ops(struct sem_array *sma, int semnum,
 	else
 		pending_list = &sma->sem_base[semnum].pending_const;
 
-	walk = pending_list->next;
-	while (walk != pending_list) {
-		int error;
-
-		q = container_of(walk, struct sem_queue, list);
-		walk = walk->next;
-
-		error = perform_atomic_semop(sma, q);
-
-		if (error <= 0) {
-			/* operation completed, remove from queue & wakeup */
+	list_for_each_entry_safe(q, tmp, pending_list, list) {
+		int error = perform_atomic_semop(sma, q);
 
-			unlink_queue(sma, q);
+		if (error > 0)
+			continue;
+		/* operation completed, remove from queue & wakeup */
+		unlink_queue(sma, q);
 
-			wake_up_sem_queue_prepare(q, error, wake_q);
-			if (error == 0)
-				semop_completed = 1;
-		}
+		wake_up_sem_queue_prepare(q, error, wake_q);
+		if (error == 0)
+			semop_completed = 1;
 	}
+
 	return semop_completed;
 }
 
@@ -900,8 +893,7 @@ static int do_smart_wakeup_zero(struct sem_array *sma, struct sembuf *sops,
  */
 static int update_queue(struct sem_array *sma, int semnum, struct wake_q_head *wake_q)
 {
-	struct sem_queue *q;
-	struct list_head *walk;
+	struct sem_queue *q, *tmp;
 	struct list_head *pending_list;
 	int semop_completed = 0;
 
@@ -911,13 +903,9 @@ static int update_queue(struct sem_array *sma, int semnum, struct wake_q_head *w
 		pending_list = &sma->sem_base[semnum].pending_alter;
 
 again:
-	walk = pending_list->next;
-	while (walk != pending_list) {
+	list_for_each_entry_safe(q, tmp, pending_list, list) {
 		int error, restart;
 
-		q = container_of(walk, struct sem_queue, list);
-		walk = walk->next;
-
 		/* If we are scanning the single sop, per-semaphore list of
 		 * one semaphore and that semaphore is 0, then it is not
 		 * necessary to scan further: simple increments
-- 
2.6.6

^ permalink raw reply related	[flat|nested] 18+ messages in thread

* Re: [PATCH 3/5] ipc/sem: optimize perform_atomic_semop()
  2016-09-12 11:53 ` [PATCH 3/5] ipc/sem: optimize perform_atomic_semop() Davidlohr Bueso
@ 2016-09-12 17:56   ` Manfred Spraul
  2016-09-13  8:33     ` Davidlohr Bueso
  0 siblings, 1 reply; 18+ messages in thread
From: Manfred Spraul @ 2016-09-12 17:56 UTC (permalink / raw)
  To: Davidlohr Bueso, akpm; +Cc: linux-kernel, Davidlohr Bueso

Hi Davidlohr,

On 09/12/2016 01:53 PM, Davidlohr Bueso wrote:
> This is the main workhorse that deals with semop user calls
> such that the waitforzero or semval update operations, on the
> set, can complete on not as the sma currently stands. Currently,
> the set is iterated twice (setting semval, then backwards for
> the sempid value). Slowpaths, and particularly SEM_UNDO calls,
> must undo any altered sem when it is detected that the caller
> must block or has errored-out.
>
> With larger sets, there can occur situations where this involves
> a lot of cycles and can obviously be a suboptimal use of cached
> resources in shared memory. Ie, discarding CPU caches that are
> also calling semop and have the sembuf cached (and can complete),
> while the current lock holder doing the semop will block, error,
> or does a waitforzero operation.
>
> This patch proposes still iterating the set twice, but the first
> scan is read-only, and we perform the actual updates afterward,
> once we know that the call will succeed. In order to not suffer
> from the overhead of dealing with sops that act on the same sem_num,
> such (rare )cases use perform_atomic_semop_slow(), which is exactly
> what we have now. Duplicates are detected before grabbing sem_lock,
> and uses simple a 64-bit variable to enable the sem_num-th bit.
> Of course, this means that semops calls with a sem_num larger than
> 64 (SEMOPM_FAST, for now, as this is really about the nsops), will
> take the _slow() alternative; but many real-world workloads only
> work on a handful of semaphores in a given set, thus good enough
> for the common case.
Can you create a 2nd definition, instead of reusing SEMOPM_FAST?
SEMOPM_FAST is about nsops, to limit stack usage.
Now you introduce a limit regarding sem_num.


> In addition add some comments to when we expect to the caller
> to block.
>
> Signed-off-by: Davidlohr Bueso <dbueso@suse.de>
> ---
>   ipc/sem.c | 89 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-----
>   1 file changed, 82 insertions(+), 7 deletions(-)
>
> diff --git a/ipc/sem.c b/ipc/sem.c
> index 86467b5b78ad..d9c743ac17ff 100644
> --- a/ipc/sem.c
> +++ b/ipc/sem.c
> @@ -115,7 +115,8 @@ struct sem_queue {
>   	struct sembuf		*sops;	 /* array of pending operations */
>   	struct sembuf		*blocking; /* the operation that blocked */
>   	int			nsops;	 /* number of operations */
> -	int			alter;	 /* does *sops alter the array? */
> +	bool			alter;   /* does *sops alter the array? */
> +	bool                    dupsop;	 /* sops on more than one sem_num */
>   };
>   
>   /* Each task has a list of undo requests. They are executed automatically
> @@ -595,7 +596,8 @@ SYSCALL_DEFINE3(semget, key_t, key, int, nsems, int, semflg)
>    * Returns 1 if the operation is impossible, the caller must sleep.
>    * Negative values are error codes.
>    */
> -static int perform_atomic_semop(struct sem_array *sma, struct sem_queue *q)
> +static int perform_atomic_semop_slow(struct sem_array *sma,
> +				     struct sem_queue *q)
>   {
>   	int result, sem_op, nsops, pid;
>   	struct sembuf *sop;
> @@ -666,6 +668,72 @@ undo:
>   	return result;
>   }
>   
> +static int perform_atomic_semop(struct sem_array *sma, struct sem_queue *q)
> +{
Do we really have to copy the whole function? Would it be possible to 
leave it as one function, with tests inside?

> @@ -1751,12 +1820,17 @@ SYSCALL_DEFINE4(semtimedop, int, semid, struct sembuf __user *, tsops,
>   		if (sop->sem_num >= max)
>   			max = sop->sem_num;
>   		if (sop->sem_flg & SEM_UNDO)
> -			undos = 1;
> +			undos = true;
>   		if (sop->sem_op != 0)
> -			alter = 1;
> +			alter = true;
> +		if (sop->sem_num < SEMOPM_FAST && !dupsop) {
> +			if (dup & (1 << sop->sem_num))
> +				dupsop = 1;
> +			else
> +				dup |= 1 << sop->sem_num;
> +		}
>   	}
At least for nsops=2, sops[0].sem_num !=sops[1].sem_num can detect 
absense of duplicated ops regardless of the array size.
Should we support that?

--
     Manfred

^ permalink raw reply	[flat|nested] 18+ messages in thread

* Re: [PATCH 1/5] ipc/sem: do not call wake_sem_queue_do() prematurely
  2016-09-12 11:53 ` [PATCH 1/5] ipc/sem: do not call wake_sem_queue_do() prematurely Davidlohr Bueso
@ 2016-09-13  4:17   ` Manfred Spraul
  2016-09-13  8:14     ` Davidlohr Bueso
  0 siblings, 1 reply; 18+ messages in thread
From: Manfred Spraul @ 2016-09-13  4:17 UTC (permalink / raw)
  To: Davidlohr Bueso, akpm; +Cc: linux-kernel, Davidlohr Bueso

Hi Davidlohr,

On 09/12/2016 01:53 PM, Davidlohr Bueso wrote:
> ... as this call should obviously be paired with its _prepare()
> counterpart. At least whenever possible, as there is no harm in
> calling it bogusly as we do now in a few places.
I would define the interface differently:
WAKE_Q creates an initialized wake queue. There is no need to track if 
any tasks were added to the wake queue, it is safe to call wake_up_q().
So especially for error paths, there is no need to optimize out calls to 
wake_up_q()
>   Immediate error
> semop(2) paths that are far from ever having the task block can
> be simplified and avoid a few unnecessary loads on their way out
> of the call as it is not deeply nested.
> Signed-off-by: Davidlohr Bueso <dbueso@suse.de>
> ---
>   ipc/sem.c | 19 ++++++++++++-------
>   1 file changed, 12 insertions(+), 7 deletions(-)
>
> diff --git a/ipc/sem.c b/ipc/sem.c
> index 5e318c5f749d..a4e8bb2fae38 100644
> --- a/ipc/sem.c
> +++ b/ipc/sem.c
> @@ -1887,16 +1887,22 @@ SYSCALL_DEFINE4(semtimedop, int, semid, struct sembuf __user *, tsops,
>   	}
>   
>   	error = -EFBIG;
> -	if (max >= sma->sem_nsems)
> -		goto out_rcu_wakeup;
> +	if (max >= sma->sem_nsems) {
> +		rcu_read_unlock();
> +		goto out_free;
> +	}
>   
>   	error = -EACCES;
> -	if (ipcperms(ns, &sma->sem_perm, alter ? S_IWUGO : S_IRUGO))
> -		goto out_rcu_wakeup;
> +	if (ipcperms(ns, &sma->sem_perm, alter ? S_IWUGO : S_IRUGO)) {
> +		rcu_read_unlock();
> +		goto out_free;
> +	}
>   
Is this really better/simpler?
You replace "if (error) goto cleanup" with "if (error) {cleanup_1(); 
goto cleanup_2()}".

 From my point of view, this just increases the risks that some cleanup 
steps are forgotten.

--
     Manfred

^ permalink raw reply	[flat|nested] 18+ messages in thread

* Re: [PATCH 1/5] ipc/sem: do not call wake_sem_queue_do() prematurely
  2016-09-13  4:17   ` Manfred Spraul
@ 2016-09-13  8:14     ` Davidlohr Bueso
  0 siblings, 0 replies; 18+ messages in thread
From: Davidlohr Bueso @ 2016-09-13  8:14 UTC (permalink / raw)
  To: Manfred Spraul; +Cc: akpm, linux-kernel, Davidlohr Bueso

On Tue, 13 Sep 2016, Manfred Spraul wrote:

>>-	if (ipcperms(ns, &sma->sem_perm, alter ? S_IWUGO : S_IRUGO))
>>-		goto out_rcu_wakeup;
>>+	if (ipcperms(ns, &sma->sem_perm, alter ? S_IWUGO : S_IRUGO)) {
>>+		rcu_read_unlock();
>>+		goto out_free;
>>+	}
>Is this really better/simpler?
>You replace "if (error) goto cleanup" with "if (error) {cleanup_1(); 
>goto cleanup_2()}".

I believe it is better as it clearly separates blocking from non-blocking
exit paths. Overhead of course is irrelevant in both in-house wake_up_sem_queue_do
and wake_up_q.

Thanks,
Davidlohr

^ permalink raw reply	[flat|nested] 18+ messages in thread

* Re: [PATCH 3/5] ipc/sem: optimize perform_atomic_semop()
  2016-09-12 17:56   ` Manfred Spraul
@ 2016-09-13  8:33     ` Davidlohr Bueso
  2016-09-19  4:41       ` Manfred Spraul
  0 siblings, 1 reply; 18+ messages in thread
From: Davidlohr Bueso @ 2016-09-13  8:33 UTC (permalink / raw)
  To: Manfred Spraul; +Cc: akpm, linux-kernel, Davidlohr Bueso

On Mon, 12 Sep 2016, Manfred Spraul wrote:

>>This patch proposes still iterating the set twice, but the first
>>scan is read-only, and we perform the actual updates afterward,
>>once we know that the call will succeed. In order to not suffer
>>from the overhead of dealing with sops that act on the same sem_num,
>>such (rare )cases use perform_atomic_semop_slow(), which is exactly
>>what we have now. Duplicates are detected before grabbing sem_lock,
>>and uses simple a 64-bit variable to enable the sem_num-th bit.
>>Of course, this means that semops calls with a sem_num larger than
>>64 (SEMOPM_FAST, for now, as this is really about the nsops), will
>>take the _slow() alternative; but many real-world workloads only
>>work on a handful of semaphores in a given set, thus good enough
>>for the common case.

>Can you create a 2nd definition, instead of reusing SEMOPM_FAST?
>SEMOPM_FAST is about nsops, to limit stack usage.
>Now you introduce a limit regarding sem_num.

Sure, I didn't really like using SEMOPM_FAST anyway (hence the 'for
now'), it was just handy at the time. I can do something like:

#define SEMNUM_FAST_MAX 64

>>+static int perform_atomic_semop(struct sem_array *sma, struct sem_queue *q)
>>+{
>Do we really have to copy the whole function? Would it be possible to 
>leave it as one function, with tests inside?

I think that having two perform_atomic_semop calls is actually keeping things
simpler, as for the common case we need not worry about the undo stuff. That said
the tests are the same for both, so let me see how I can factor them out, maybe
using callbacks...

>
>>@@ -1751,12 +1820,17 @@ SYSCALL_DEFINE4(semtimedop, int, semid, struct sembuf __user *, tsops,
>>  		if (sop->sem_num >= max)
>>  			max = sop->sem_num;
>>  		if (sop->sem_flg & SEM_UNDO)
>>-			undos = 1;
>>+			undos = true;
>>  		if (sop->sem_op != 0)
>>-			alter = 1;
>>+			alter = true;
>>+		if (sop->sem_num < SEMOPM_FAST && !dupsop) {
>>+			if (dup & (1 << sop->sem_num))
>>+				dupsop = 1;
>>+			else
>>+				dup |= 1 << sop->sem_num;
>>+		}
>>  	}
>At least for nsops=2, sops[0].sem_num !=sops[1].sem_num can detect 
>absense of duplicated ops regardless of the array size.
>Should we support that?

There are various individual cases like that (ie obviously nsops == 1, alter == 0, etc)
where the dup detection would be unnecessary, but it seems like a stretch to go
at it like this. The above will work on the common case (assuming lower sem_num
of course). So I'm not particularly worried about being too smart at the dup detection.

Thanks,
Davidlohr

^ permalink raw reply	[flat|nested] 18+ messages in thread

* Re: [PATCH 2/5] ipc/sem: rework task wakeups
  2016-09-12 11:53 ` [PATCH 2/5] ipc/sem: rework task wakeups Davidlohr Bueso
@ 2016-09-13 18:04   ` Manfred Spraul
  2016-09-14 15:45     ` Davidlohr Bueso
  2016-09-18 14:37   ` Manfred Spraul
  1 sibling, 1 reply; 18+ messages in thread
From: Manfred Spraul @ 2016-09-13 18:04 UTC (permalink / raw)
  To: Davidlohr Bueso, akpm; +Cc: linux-kernel, Davidlohr Bueso

Hi Davidlohr,

On 09/12/2016 01:53 PM, Davidlohr Bueso wrote:
> Hmean    sembench-sem-482    965735.00 (  0.00%)  1040313.00 (  7.72%)

[...]

> Signed-off-by: Davidlohr Bueso <dbueso@suse.de>
> ---
>   ipc/sem.c | 268 +++++++++++++++++++-------------------------------------------
>   1 file changed, 83 insertions(+), 185 deletions(-)
Nice speedup, nice amount of removed duplicated code.
> diff --git a/ipc/sem.c b/ipc/sem.c
> index a4e8bb2fae38..86467b5b78ad 100644
[...]
>   
> -	error = get_queue_result(&queue);
> -
> -	if (error != -EINTR) {
> -		/* fast path: update_queue already obtained all requested
> -		 * resources.
> -		 * Perform a smp_mb(): User space could assume that semop()
> -		 * is a memory barrier: Without the mb(), the cpu could
> -		 * speculatively read in user space stale data that was
> -		 * overwritten by the previous owner of the semaphore.
> +	/*
> +	 * fastpath: the semop has completed, either successfully or not, from
> +	 * the syscall pov, is quite irrelevant to us at this point; we're done.
> +	 *
> +	 * We _do_ care, nonetheless, about being awoken by a signal or spuriously.
> +	 * The queue.status is checked again in the slowpath (aka after taking
> +	 * sem_lock), such that we can detect scenarios where we were awakened
> +	 * externally, during the window between wake_q_add() and wake_up_q().
> +	 */
> +	if ((error = queue.status) != -EINTR && !signal_pending(current)) {
> +		/*
> +		 * User space could assume that semop() is a memory barrier:
> +		 * Without the mb(), the cpu could speculatively read in user
> +		 * space stale data that was overwritten by the previous owner
> +		 * of the semaphore.
>   		 */
>   		smp_mb();
> -
>   		goto out_free;
>   	}
>   
>   	rcu_read_lock();
>   	sma = sem_obtain_lock(ns, semid, sops, nsops, &locknum);
What is the purpose of the !signal_pending()?
Even if there is a signal: If someone has set queue.status, then our 
semaphore operations completed and we must return that result code.
Obviously: At syscall return, the syscall return code will notice the 
pending signal and immediately the signal handler is called, but I don't 
see that this prevents us from using the fast path.

And, at least my opinion: I would avoid placing the error= into the if().

--
     Manfred

--

^ permalink raw reply	[flat|nested] 18+ messages in thread

* Re: [PATCH 2/5] ipc/sem: rework task wakeups
  2016-09-13 18:04   ` Manfred Spraul
@ 2016-09-14 15:45     ` Davidlohr Bueso
  0 siblings, 0 replies; 18+ messages in thread
From: Davidlohr Bueso @ 2016-09-14 15:45 UTC (permalink / raw)
  To: Manfred Spraul; +Cc: akpm, linux-kernel, Davidlohr Bueso

On Tue, 13 Sep 2016, Manfred Spraul wrote:

>>+	if ((error = queue.status) != -EINTR && !signal_pending(current)) {
>>+		/*
>>+		 * User space could assume that semop() is a memory barrier:
>>+		 * Without the mb(), the cpu could speculatively read in user
>>+		 * space stale data that was overwritten by the previous owner
>>+		 * of the semaphore.
>>  		 */
>>  		smp_mb();
>>-
>>  		goto out_free;
>>  	}
>>  	rcu_read_lock();
>>  	sma = sem_obtain_lock(ns, semid, sops, nsops, &locknum);

>What is the purpose of the !signal_pending()?
>Even if there is a signal: If someone has set queue.status, then our 
>semaphore operations completed and we must return that result code.

It was a way of detecting being awoken by an unrelated event while the task
is marked for wakeup and wake_up_process is still not called and force the
slowpath. The same window that get_queue_result deals with by busy waiting
when IN_WAKEUP.

>Obviously: At syscall return, the syscall return code will notice the 
>pending signal and immediately the signal handler is called, but I 
>don't see that this prevents us from using the fast path.

Right, and we cannot race with sys_exit. Will drop the signal check.

>And, at least my opinion: I would avoid placing the error= into the if().

Sure, agreed.

Thanks,
Davidlohr

^ permalink raw reply	[flat|nested] 18+ messages in thread

* Re: [PATCH 2/5] ipc/sem: rework task wakeups
  2016-09-12 11:53 ` [PATCH 2/5] ipc/sem: rework task wakeups Davidlohr Bueso
  2016-09-13 18:04   ` Manfred Spraul
@ 2016-09-18 14:37   ` Manfred Spraul
  2016-09-18 18:26     ` Davidlohr Bueso
  1 sibling, 1 reply; 18+ messages in thread
From: Manfred Spraul @ 2016-09-18 14:37 UTC (permalink / raw)
  To: Davidlohr Bueso, akpm; +Cc: linux-kernel, Davidlohr Bueso

Hi Davidlohr,

On 09/12/2016 01:53 PM, Davidlohr Bueso wrote:
> @@ -1933,22 +1823,32 @@ SYSCALL_DEFINE4(semtimedop, int, semid, struct sembuf __user *, tsops,
>   	queue.alter = alter;
>   
>   	error = perform_atomic_semop(sma, &queue);
> -	if (error == 0) {
> -		/* If the operation was successful, then do
> +	if (error <= 0) { /* non-blocking path */
> +		WAKE_Q(wake_q);
> +
> +		/*
> +		 * If the operation was successful, then do
>   		 * the required updates.
>   		 */
> -		if (alter)
> -			do_smart_update(sma, sops, nsops, 1, &tasks);
> -		else
> -			set_semotime(sma, sops);
> +		if (error == 0) {
> +			if (alter)
> +				do_smart_update(sma, sops, nsops, 1, &wake_q);
> +			else
> +				set_semotime(sma, sops);
> + <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<< Why this empty line?
> +		}
> +
> +		sem_unlock(sma, locknum);
> +		rcu_read_unlock();
> +		wake_up_q(&wake_q);
> +
> +		goto out_free;
>   	}
> -	if (error <= 0)
> -		goto out_unlock_free;
I don't see the strategy:
I've used the approach that cleanup is at the end, to reduce duplicated 
code, even if it means that error codepaths unnecessarily call wakeup 
for an empty list and that the list is always initialized.

With patch 1 of the series, you start to optimize for that.
Now this patch reintroduces some wake_up_q calls for error paths.

So: What is the aim?
I would propose to skip patch 1 and leave the wake_up_q at the end.

Or, if we really want to avoid the wakeup calls, then do it entirely.
Perhaps:
 > if(error == 0) { /* nonblocking codepath 1, with wakeups */
 > [...]
 > }
 > if (error < 0} goto out_unlock_free;
 >
This would have an advantage, because the WAKE_Q would be initialized 
only when needed

--
     Manfred

^ permalink raw reply	[flat|nested] 18+ messages in thread

* Re: [PATCH 5/5] ipc/sem: use proper list api for pending_list wakeups
  2016-09-12 11:53 ` [PATCH 5/5] ipc/sem: use proper list api for pending_list wakeups Davidlohr Bueso
@ 2016-09-18 17:51   ` Manfred Spraul
  0 siblings, 0 replies; 18+ messages in thread
From: Manfred Spraul @ 2016-09-18 17:51 UTC (permalink / raw)
  To: Davidlohr Bueso, akpm; +Cc: linux-kernel, Davidlohr Bueso

On 09/12/2016 01:53 PM, Davidlohr Bueso wrote:
> ... saves some LoC and looks cleaner than re-implementing the
> calls.
>
> Signed-off-by: Davidlohr Bueso <dbueso@suse.de>
Acked-by: Manfred Spraul <manfred@colorfullife.com>

--
     Manfred

^ permalink raw reply	[flat|nested] 18+ messages in thread

* Re: [PATCH 2/5] ipc/sem: rework task wakeups
  2016-09-18 14:37   ` Manfred Spraul
@ 2016-09-18 18:26     ` Davidlohr Bueso
  0 siblings, 0 replies; 18+ messages in thread
From: Davidlohr Bueso @ 2016-09-18 18:26 UTC (permalink / raw)
  To: Manfred Spraul; +Cc: akpm, linux-kernel, Davidlohr Bueso

On Sun, 18 Sep 2016, Manfred Spraul wrote:
>>+ <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<< Why this empty line?

That's my fat fingers, will remove it.

>>+		}
>>+
>>+		sem_unlock(sma, locknum);
>>+		rcu_read_unlock();
>>+		wake_up_q(&wake_q);
>>+
>>+		goto out_free;
>>  	}
>>-	if (error <= 0)
>>-		goto out_unlock_free;
>I don't see the strategy:
>I've used the approach that cleanup is at the end, to reduce 
>duplicated code, even if it means that error codepaths unnecessarily 
>call wakeup for an empty list and that the list is always initialized.
>
>With patch 1 of the series, you start to optimize for that.
>Now this patch reintroduces some wake_up_q calls for error paths.

Well yes, but this is a much more self contained than what we currently have
in that at least perform_atomic_semop() was called. Yes, an error path will
still call wake_up_q unnecessarily, but its pretty obvious what's going on within
that error <= 0 condition. I really don't think this is a big deal. In addition
the general exit path of the function is also slightly cleaned up as a consequence.

>So: What is the aim?
>I would propose to skip patch 1 and leave the wake_up_q at the end.
>
>Or, if we really want to avoid the wakeup calls, then do it entirely.
>Perhaps:
>> if(error == 0) { /* nonblocking codepath 1, with wakeups */
>> [...]
>> }
>> if (error < 0} goto out_unlock_free;
>>
>This would have an advantage, because the WAKE_Q would be initialized 
>only when needed

Sure. Note that we can even get picky with this in semctl calls, but I'm
ok with some unnecessary initialization and wake_up_q paths. Please shout
if you really want me to change them and I can add followup patches, although
I suspect you'll agree.

Thanks,
Davidlohr

^ permalink raw reply	[flat|nested] 18+ messages in thread

* Re: [PATCH 3/5] ipc/sem: optimize perform_atomic_semop()
  2016-09-13  8:33     ` Davidlohr Bueso
@ 2016-09-19  4:41       ` Manfred Spraul
  0 siblings, 0 replies; 18+ messages in thread
From: Manfred Spraul @ 2016-09-19  4:41 UTC (permalink / raw)
  To: Davidlohr Bueso; +Cc: akpm, linux-kernel, Davidlohr Bueso

[-- Attachment #1: Type: text/plain, Size: 1248 bytes --]

Hi Davidlohr,

On 09/13/2016 10:33 AM, Davidlohr Bueso wrote:
>
>>
>>> @@ -1751,12 +1820,17 @@ SYSCALL_DEFINE4(semtimedop, int, semid, 
>>> struct sembuf __user *, tsops,
>>>          if (sop->sem_num >= max)
>>>              max = sop->sem_num;
>>>          if (sop->sem_flg & SEM_UNDO)
>>> -            undos = 1;
>>> +            undos = true;
>>>          if (sop->sem_op != 0)
>>> -            alter = 1;
>>> +            alter = true;
>>> +        if (sop->sem_num < SEMOPM_FAST && !dupsop) {
>>> +            if (dup & (1 << sop->sem_num))
>>> +                dupsop = 1;
>>> +            else
>>> +                dup |= 1 << sop->sem_num;
>>> +        }
>>>      }
>> At least for nsops=2, sops[0].sem_num !=sops[1].sem_num can detect 
>> absense of duplicated ops regardless of the array size.
>> Should we support that?
>
> There are various individual cases like that (ie obviously nsops == 1, 
> alter == 0, etc)
> where the dup detection would be unnecessary, but it seems like a 
> stretch to go
> at it like this. The above will work on the common case (assuming 
> lower sem_num
> of course). So I'm not particularly worried about being too smart at 
> the dup detection.
>
What about the attached dup detection?

--
     Manfred

[-- Attachment #2: 0001-ipc-sem-Update-duplicate-sop-detection.patch --]
[-- Type: text/x-patch, Size: 2246 bytes --]

>From 140340a358dbf66b3bc6f848ca9b860e3e957e84 Mon Sep 17 00:00:00 2001
From: Manfred Spraul <manfred@colorfullife.com>
Date: Mon, 19 Sep 2016 06:25:20 +0200
Subject: [PATCH] ipc/sem: Update duplicate sop detection

The duplicated sop detection can be improved:
- use uint64_t instead of unsigned long for the bit array
  storage, otherwise we break 32-bit archs
- support large arrays, just interpret the bit array
  as a hash array (i.e.: an operation that accesses semaphore
  0 and 64 would trigger the dupsop code, but that is
  far better than not trying at all for semnum >=64)
- support test-for-zero-and-increase, this case can use the
  fast codepath.

Untested! S-O-B only for the code, needs testing.

Signed-off-by: Manfred Spraul <manfred@colorfullife.com>
---
 ipc/sem.c | 29 ++++++++++++++++++++++-------
 1 file changed, 22 insertions(+), 7 deletions(-)

diff --git a/ipc/sem.c b/ipc/sem.c
index d9c743a..eda9e46 100644
--- a/ipc/sem.c
+++ b/ipc/sem.c
@@ -1784,7 +1784,8 @@ SYSCALL_DEFINE4(semtimedop, int, semid, struct sembuf __user *, tsops,
 	int max, locknum;
 	bool undos = false, alter = false, dupsop = false;
 	struct sem_queue queue;
-	unsigned long dup = 0, jiffies_left = 0;
+	unsigned long jiffies_left = 0;
+	uint64_t dup;
 	struct ipc_namespace *ns;
 
 	ns = current->nsproxy->ipc_ns;
@@ -1816,18 +1817,32 @@ SYSCALL_DEFINE4(semtimedop, int, semid, struct sembuf __user *, tsops,
 		jiffies_left = timespec_to_jiffies(&_timeout);
 	}
 	max = 0;
+
+	dup = 0;
 	for (sop = sops; sop < sops + nsops; sop++) {
+		uint64_t mask;
+
 		if (sop->sem_num >= max)
 			max = sop->sem_num;
 		if (sop->sem_flg & SEM_UNDO)
 			undos = true;
-		if (sop->sem_op != 0)
+
+		/* 64: BITS_PER_UNIT64 */
+		mask = 1<<((sop->sem_num)%64);
+
+		if (dup & mask) {
+			/*
+			 * There was a previous alter access that appears
+			 * to have accessed the same semaphore, thus
+			 * use the dupsop logic.
+			 * "appears", because the detection can only check
+			 * % BITS_PER_UNIT64.
+			 */
+			dupsop = 1;
+		}
+		if (sop->sem_op != 0) {
 			alter = true;
-		if (sop->sem_num < SEMOPM_FAST && !dupsop) {
-			if (dup & (1 << sop->sem_num))
-				dupsop = 1;
-			else
-				dup |= 1 << sop->sem_num;
+			dup |= mask;
 		}
 	}
 
-- 
2.7.4


^ permalink raw reply related	[flat|nested] 18+ messages in thread

* Re: [PATCH 2/5] ipc/sem: rework task wakeups
  2016-09-18 19:11 ` [PATCH 2/5] ipc/sem: rework task wakeups Davidlohr Bueso
@ 2016-09-19 18:26   ` Manfred Spraul
  0 siblings, 0 replies; 18+ messages in thread
From: Manfred Spraul @ 2016-09-19 18:26 UTC (permalink / raw)
  To: Davidlohr Bueso, akpm; +Cc: linux-kernel, Davidlohr Bueso

On 09/18/2016 09:11 PM, Davidlohr Bueso wrote:
> Our sysv sems have been using the notion of lockless wakeups for a while,
> ever since 0a2b9d4c796 (ipc/sem.c: move wake_up_process out of the spinlock
> section), in order to reduce the sem_lock hold times. This in-house pending
> queue can be replaced by wake_q (just like all the rest of ipc now), in that
> it provides the following advantages:
>
> o Simplifies and gets rid of unnecessary code.
>
> o We get rid of the IN_WAKEUP complexities. Given that wake_q_add() grabs
> reference to the task, if awoken due to an unrelated event, between the
> wake_q_add() and wake_up_q() window, we cannot race with sys_exit and the
> imminent call to wake_up_process().
>
> o By not spinning IN_WAKEUP, we no longer need to disable preemption.
>
> In consequence, the wakeup paths (after schedule(), that is) must acknowledge
> an external signal/event, as well spurious wakeup occurring during the pending
> wakeup window. Obviously no changes in semantics that could be visible to the
> user. The fastpath is _only_ for when we know for sure that we were awoken due
> to a the waker's successful semop call (queue.status is not -EINTR).
>
> On a 48-core Haswell, running the ipcscale 'waitforzero' test, the following
> is seen with increasing thread counts:
>
>                                 v4.8-rc5                v4.8-rc5
>                                                          semopv2
> Hmean    sembench-sem-2      574733.00 (  0.00%)   578322.00 (  0.62%)
> Hmean    sembench-sem-8      811708.00 (  0.00%)   824689.00 (  1.59%)
> Hmean    sembench-sem-12     842448.00 (  0.00%)   845409.00 (  0.35%)
> Hmean    sembench-sem-21     933003.00 (  0.00%)   977748.00 (  4.80%)
> Hmean    sembench-sem-48     935910.00 (  0.00%)  1004759.00 (  7.36%)
> Hmean    sembench-sem-79     937186.00 (  0.00%)   983976.00 (  4.99%)
> Hmean    sembench-sem-234    974256.00 (  0.00%)  1060294.00 (  8.83%)
> Hmean    sembench-sem-265    975468.00 (  0.00%)  1016243.00 (  4.18%)
> Hmean    sembench-sem-296    991280.00 (  0.00%)  1042659.00 (  5.18%)
> Hmean    sembench-sem-327    975415.00 (  0.00%)  1029977.00 (  5.59%)
> Hmean    sembench-sem-358   1014286.00 (  0.00%)  1049624.00 (  3.48%)
> Hmean    sembench-sem-389    972939.00 (  0.00%)  1043127.00 (  7.21%)
> Hmean    sembench-sem-420    981909.00 (  0.00%)  1056747.00 (  7.62%)
> Hmean    sembench-sem-451    990139.00 (  0.00%)  1051609.00 (  6.21%)
> Hmean    sembench-sem-482    965735.00 (  0.00%)  1040313.00 (  7.72%)
>
> Signed-off-by: Davidlohr Bueso <dbueso@suse.de>
Acked-by: Manfred Spraul <manfred@colorfullife.com>

--
     Manfred

^ permalink raw reply	[flat|nested] 18+ messages in thread

* [PATCH 2/5] ipc/sem: rework task wakeups
  2016-09-18 19:11 [PATCH -next v2 0/5] ipc/sem: semop(2) improvements Davidlohr Bueso
@ 2016-09-18 19:11 ` Davidlohr Bueso
  2016-09-19 18:26   ` Manfred Spraul
  0 siblings, 1 reply; 18+ messages in thread
From: Davidlohr Bueso @ 2016-09-18 19:11 UTC (permalink / raw)
  To: akpm; +Cc: manfred, dave, linux-kernel, Davidlohr Bueso

Our sysv sems have been using the notion of lockless wakeups for a while,
ever since 0a2b9d4c796 (ipc/sem.c: move wake_up_process out of the spinlock
section), in order to reduce the sem_lock hold times. This in-house pending
queue can be replaced by wake_q (just like all the rest of ipc now), in that
it provides the following advantages:

o Simplifies and gets rid of unnecessary code.

o We get rid of the IN_WAKEUP complexities. Given that wake_q_add() grabs
reference to the task, if awoken due to an unrelated event, between the
wake_q_add() and wake_up_q() window, we cannot race with sys_exit and the
imminent call to wake_up_process().

o By not spinning IN_WAKEUP, we no longer need to disable preemption.

In consequence, the wakeup paths (after schedule(), that is) must acknowledge
an external signal/event, as well spurious wakeup occurring during the pending
wakeup window. Obviously no changes in semantics that could be visible to the
user. The fastpath is _only_ for when we know for sure that we were awoken due
to a the waker's successful semop call (queue.status is not -EINTR).

On a 48-core Haswell, running the ipcscale 'waitforzero' test, the following
is seen with increasing thread counts:

                               v4.8-rc5                v4.8-rc5
                                                        semopv2
Hmean    sembench-sem-2      574733.00 (  0.00%)   578322.00 (  0.62%)
Hmean    sembench-sem-8      811708.00 (  0.00%)   824689.00 (  1.59%)
Hmean    sembench-sem-12     842448.00 (  0.00%)   845409.00 (  0.35%)
Hmean    sembench-sem-21     933003.00 (  0.00%)   977748.00 (  4.80%)
Hmean    sembench-sem-48     935910.00 (  0.00%)  1004759.00 (  7.36%)
Hmean    sembench-sem-79     937186.00 (  0.00%)   983976.00 (  4.99%)
Hmean    sembench-sem-234    974256.00 (  0.00%)  1060294.00 (  8.83%)
Hmean    sembench-sem-265    975468.00 (  0.00%)  1016243.00 (  4.18%)
Hmean    sembench-sem-296    991280.00 (  0.00%)  1042659.00 (  5.18%)
Hmean    sembench-sem-327    975415.00 (  0.00%)  1029977.00 (  5.59%)
Hmean    sembench-sem-358   1014286.00 (  0.00%)  1049624.00 (  3.48%)
Hmean    sembench-sem-389    972939.00 (  0.00%)  1043127.00 (  7.21%)
Hmean    sembench-sem-420    981909.00 (  0.00%)  1056747.00 (  7.62%)
Hmean    sembench-sem-451    990139.00 (  0.00%)  1051609.00 (  6.21%)
Hmean    sembench-sem-482    965735.00 (  0.00%)  1040313.00 (  7.72%)

Signed-off-by: Davidlohr Bueso <dbueso@suse.de>
---
 ipc/sem.c | 265 ++++++++++++++++++++------------------------------------------
 1 file changed, 85 insertions(+), 180 deletions(-)

diff --git a/ipc/sem.c b/ipc/sem.c
index a4e8bb2fae38..4d836035f9bb 100644
--- a/ipc/sem.c
+++ b/ipc/sem.c
@@ -11,6 +11,7 @@
  * (c) 2001 Red Hat Inc
  * Lockless wakeup
  * (c) 2003 Manfred Spraul <manfred@colorfullife.com>
+ * (c) 2016 Davidlohr Bueso <dave@stgolabs.net>
  * Further wakeup optimizations, documentation
  * (c) 2010 Manfred Spraul <manfred@colorfullife.com>
  *
@@ -53,15 +54,11 @@
  *   Semaphores are actively given to waiting tasks (necessary for FIFO).
  *   (see update_queue())
  * - To improve the scalability, the actual wake-up calls are performed after
- *   dropping all locks. (see wake_up_sem_queue_prepare(),
- *   wake_up_sem_queue_do())
+ *   dropping all locks. (see wake_up_sem_queue_prepare())
  * - All work is done by the waker, the woken up task does not have to do
  *   anything - not even acquiring a lock or dropping a refcount.
  * - A woken up task may not even touch the semaphore array anymore, it may
  *   have been destroyed already by a semctl(RMID).
- * - The synchronizations between wake-ups due to a timeout/signal and a
- *   wake-up due to a completed semaphore operation is achieved by using an
- *   intermediate state (IN_WAKEUP).
  * - UNDO values are stored in an array (one per process and per
  *   semaphore array, lazily allocated). For backwards compatibility, multiple
  *   modes for the UNDO variables are supported (per process, per thread)
@@ -471,40 +468,6 @@ static inline void sem_rmid(struct ipc_namespace *ns, struct sem_array *s)
 	ipc_rmid(&sem_ids(ns), &s->sem_perm);
 }
 
-/*
- * Lockless wakeup algorithm:
- * Without the check/retry algorithm a lockless wakeup is possible:
- * - queue.status is initialized to -EINTR before blocking.
- * - wakeup is performed by
- *	* unlinking the queue entry from the pending list
- *	* setting queue.status to IN_WAKEUP
- *	  This is the notification for the blocked thread that a
- *	  result value is imminent.
- *	* call wake_up_process
- *	* set queue.status to the final value.
- * - the previously blocked thread checks queue.status:
- *	* if it's IN_WAKEUP, then it must wait until the value changes
- *	* if it's not -EINTR, then the operation was completed by
- *	  update_queue. semtimedop can return queue.status without
- *	  performing any operation on the sem array.
- *	* otherwise it must acquire the spinlock and check what's up.
- *
- * The two-stage algorithm is necessary to protect against the following
- * races:
- * - if queue.status is set after wake_up_process, then the woken up idle
- *   thread could race forward and try (and fail) to acquire sma->lock
- *   before update_queue had a chance to set queue.status
- * - if queue.status is written before wake_up_process and if the
- *   blocked process is woken up by a signal between writing
- *   queue.status and the wake_up_process, then the woken up
- *   process could return from semtimedop and die by calling
- *   sys_exit before wake_up_process is called. Then wake_up_process
- *   will oops, because the task structure is already invalid.
- *   (yes, this happened on s390 with sysv msg).
- *
- */
-#define IN_WAKEUP	1
-
 /**
  * newary - Create a new semaphore set
  * @ns: namespace
@@ -703,51 +666,18 @@ undo:
 	return result;
 }
 
-/** wake_up_sem_queue_prepare(q, error): Prepare wake-up
- * @q: queue entry that must be signaled
- * @error: Error value for the signal
- *
- * Prepare the wake-up of the queue entry q.
- */
-static void wake_up_sem_queue_prepare(struct list_head *pt,
-				struct sem_queue *q, int error)
-{
-	if (list_empty(pt)) {
-		/*
-		 * Hold preempt off so that we don't get preempted and have the
-		 * wakee busy-wait until we're scheduled back on.
-		 */
-		preempt_disable();
-	}
-	q->status = IN_WAKEUP;
-	q->pid = error;
-
-	list_add_tail(&q->list, pt);
-}
-
-/**
- * wake_up_sem_queue_do - do the actual wake-up
- * @pt: list of tasks to be woken up
- *
- * Do the actual wake-up.
- * The function is called without any locks held, thus the semaphore array
- * could be destroyed already and the tasks can disappear as soon as the
- * status is set to the actual return code.
- */
-static void wake_up_sem_queue_do(struct list_head *pt)
+static inline void wake_up_sem_queue_prepare(struct sem_queue *q, int error,
+					     struct wake_q_head *wake_q)
 {
-	struct sem_queue *q, *t;
-	int did_something;
-
-	did_something = !list_empty(pt);
-	list_for_each_entry_safe(q, t, pt, list) {
-		wake_up_process(q->sleeper);
-		/* q can disappear immediately after writing q->status. */
-		smp_wmb();
-		q->status = q->pid;
-	}
-	if (did_something)
-		preempt_enable();
+	wake_q_add(wake_q, q->sleeper);
+	/*
+	 * Rely on the above implicit barrier, such that we can
+	 * ensure that we hold reference to the task before setting
+	 * q->status. Otherwise we could race with do_exit if the
+	 * task is awoken by an external event before calling
+	 * wake_up_process().
+	 */
+	WRITE_ONCE(q->status, error);
 }
 
 static void unlink_queue(struct sem_array *sma, struct sem_queue *q)
@@ -795,18 +725,18 @@ static int check_restart(struct sem_array *sma, struct sem_queue *q)
  * wake_const_ops - wake up non-alter tasks
  * @sma: semaphore array.
  * @semnum: semaphore that was modified.
- * @pt: list head for the tasks that must be woken up.
+ * @wake_q: lockless wake-queue head.
  *
  * wake_const_ops must be called after a semaphore in a semaphore array
  * was set to 0. If complex const operations are pending, wake_const_ops must
  * be called with semnum = -1, as well as with the number of each modified
  * semaphore.
- * The tasks that must be woken up are added to @pt. The return code
+ * The tasks that must be woken up are added to @wake_q. The return code
  * is stored in q->pid.
  * The function returns 1 if at least one operation was completed successfully.
  */
 static int wake_const_ops(struct sem_array *sma, int semnum,
-				struct list_head *pt)
+			  struct wake_q_head *wake_q)
 {
 	struct sem_queue *q;
 	struct list_head *walk;
@@ -832,7 +762,7 @@ static int wake_const_ops(struct sem_array *sma, int semnum,
 
 			unlink_queue(sma, q);
 
-			wake_up_sem_queue_prepare(pt, q, error);
+			wake_up_sem_queue_prepare(q, error, wake_q);
 			if (error == 0)
 				semop_completed = 1;
 		}
@@ -845,14 +775,14 @@ static int wake_const_ops(struct sem_array *sma, int semnum,
  * @sma: semaphore array
  * @sops: operations that were performed
  * @nsops: number of operations
- * @pt: list head of the tasks that must be woken up.
+ * @wake_q: lockless wake-queue head
  *
  * Checks all required queue for wait-for-zero operations, based
  * on the actual changes that were performed on the semaphore array.
  * The function returns 1 if at least one operation was completed successfully.
  */
 static int do_smart_wakeup_zero(struct sem_array *sma, struct sembuf *sops,
-					int nsops, struct list_head *pt)
+				int nsops, struct wake_q_head *wake_q)
 {
 	int i;
 	int semop_completed = 0;
@@ -865,7 +795,7 @@ static int do_smart_wakeup_zero(struct sem_array *sma, struct sembuf *sops,
 
 			if (sma->sem_base[num].semval == 0) {
 				got_zero = 1;
-				semop_completed |= wake_const_ops(sma, num, pt);
+				semop_completed |= wake_const_ops(sma, num, wake_q);
 			}
 		}
 	} else {
@@ -876,7 +806,7 @@ static int do_smart_wakeup_zero(struct sem_array *sma, struct sembuf *sops,
 		for (i = 0; i < sma->sem_nsems; i++) {
 			if (sma->sem_base[i].semval == 0) {
 				got_zero = 1;
-				semop_completed |= wake_const_ops(sma, i, pt);
+				semop_completed |= wake_const_ops(sma, i, wake_q);
 			}
 		}
 	}
@@ -885,7 +815,7 @@ static int do_smart_wakeup_zero(struct sem_array *sma, struct sembuf *sops,
 	 * then check the global queue, too.
 	 */
 	if (got_zero)
-		semop_completed |= wake_const_ops(sma, -1, pt);
+		semop_completed |= wake_const_ops(sma, -1, wake_q);
 
 	return semop_completed;
 }
@@ -895,19 +825,19 @@ static int do_smart_wakeup_zero(struct sem_array *sma, struct sembuf *sops,
  * update_queue - look for tasks that can be completed.
  * @sma: semaphore array.
  * @semnum: semaphore that was modified.
- * @pt: list head for the tasks that must be woken up.
+ * @wake_q: lockless wake-queue head.
  *
  * update_queue must be called after a semaphore in a semaphore array
  * was modified. If multiple semaphores were modified, update_queue must
  * be called with semnum = -1, as well as with the number of each modified
  * semaphore.
- * The tasks that must be woken up are added to @pt. The return code
+ * The tasks that must be woken up are added to @wake_q. The return code
  * is stored in q->pid.
  * The function internally checks if const operations can now succeed.
  *
  * The function return 1 if at least one semop was completed successfully.
  */
-static int update_queue(struct sem_array *sma, int semnum, struct list_head *pt)
+static int update_queue(struct sem_array *sma, int semnum, struct wake_q_head *wake_q)
 {
 	struct sem_queue *q;
 	struct list_head *walk;
@@ -949,11 +879,11 @@ again:
 			restart = 0;
 		} else {
 			semop_completed = 1;
-			do_smart_wakeup_zero(sma, q->sops, q->nsops, pt);
+			do_smart_wakeup_zero(sma, q->sops, q->nsops, wake_q);
 			restart = check_restart(sma, q);
 		}
 
-		wake_up_sem_queue_prepare(pt, q, error);
+		wake_up_sem_queue_prepare(q, error, wake_q);
 		if (restart)
 			goto again;
 	}
@@ -984,24 +914,24 @@ static void set_semotime(struct sem_array *sma, struct sembuf *sops)
  * @sops: operations that were performed
  * @nsops: number of operations
  * @otime: force setting otime
- * @pt: list head of the tasks that must be woken up.
+ * @wake_q: lockless wake-queue head
  *
  * do_smart_update() does the required calls to update_queue and wakeup_zero,
  * based on the actual changes that were performed on the semaphore array.
  * Note that the function does not do the actual wake-up: the caller is
- * responsible for calling wake_up_sem_queue_do(@pt).
+ * responsible for calling wake_up_q().
  * It is safe to perform this call after dropping all locks.
  */
 static void do_smart_update(struct sem_array *sma, struct sembuf *sops, int nsops,
-			int otime, struct list_head *pt)
+			    int otime, struct wake_q_head *wake_q)
 {
 	int i;
 
-	otime |= do_smart_wakeup_zero(sma, sops, nsops, pt);
+	otime |= do_smart_wakeup_zero(sma, sops, nsops, wake_q);
 
 	if (!list_empty(&sma->pending_alter)) {
 		/* semaphore array uses the global queue - just process it. */
-		otime |= update_queue(sma, -1, pt);
+		otime |= update_queue(sma, -1, wake_q);
 	} else {
 		if (!sops) {
 			/*
@@ -1009,7 +939,7 @@ static void do_smart_update(struct sem_array *sma, struct sembuf *sops, int nsop
 			 * known. Check all.
 			 */
 			for (i = 0; i < sma->sem_nsems; i++)
-				otime |= update_queue(sma, i, pt);
+				otime |= update_queue(sma, i, wake_q);
 		} else {
 			/*
 			 * Check the semaphores that were increased:
@@ -1023,7 +953,7 @@ static void do_smart_update(struct sem_array *sma, struct sembuf *sops, int nsop
 			for (i = 0; i < nsops; i++) {
 				if (sops[i].sem_op > 0) {
 					otime |= update_queue(sma,
-							sops[i].sem_num, pt);
+							      sops[i].sem_num, wake_q);
 				}
 			}
 		}
@@ -1111,8 +1041,8 @@ static void freeary(struct ipc_namespace *ns, struct kern_ipc_perm *ipcp)
 	struct sem_undo *un, *tu;
 	struct sem_queue *q, *tq;
 	struct sem_array *sma = container_of(ipcp, struct sem_array, sem_perm);
-	struct list_head tasks;
 	int i;
+	WAKE_Q(wake_q);
 
 	/* Free the existing undo structures for this semaphore set.  */
 	ipc_assert_locked_object(&sma->sem_perm);
@@ -1126,25 +1056,24 @@ static void freeary(struct ipc_namespace *ns, struct kern_ipc_perm *ipcp)
 	}
 
 	/* Wake up all pending processes and let them fail with EIDRM. */
-	INIT_LIST_HEAD(&tasks);
 	list_for_each_entry_safe(q, tq, &sma->pending_const, list) {
 		unlink_queue(sma, q);
-		wake_up_sem_queue_prepare(&tasks, q, -EIDRM);
+		wake_up_sem_queue_prepare(q, -EIDRM, &wake_q);
 	}
 
 	list_for_each_entry_safe(q, tq, &sma->pending_alter, list) {
 		unlink_queue(sma, q);
-		wake_up_sem_queue_prepare(&tasks, q, -EIDRM);
+		wake_up_sem_queue_prepare(q, -EIDRM, &wake_q);
 	}
 	for (i = 0; i < sma->sem_nsems; i++) {
 		struct sem *sem = sma->sem_base + i;
 		list_for_each_entry_safe(q, tq, &sem->pending_const, list) {
 			unlink_queue(sma, q);
-			wake_up_sem_queue_prepare(&tasks, q, -EIDRM);
+			wake_up_sem_queue_prepare(q, -EIDRM, &wake_q);
 		}
 		list_for_each_entry_safe(q, tq, &sem->pending_alter, list) {
 			unlink_queue(sma, q);
-			wake_up_sem_queue_prepare(&tasks, q, -EIDRM);
+			wake_up_sem_queue_prepare(q, -EIDRM, &wake_q);
 		}
 	}
 
@@ -1153,7 +1082,7 @@ static void freeary(struct ipc_namespace *ns, struct kern_ipc_perm *ipcp)
 	sem_unlock(sma, -1);
 	rcu_read_unlock();
 
-	wake_up_sem_queue_do(&tasks);
+	wake_up_q(&wake_q);
 	ns->used_sems -= sma->sem_nsems;
 	ipc_rcu_putref(sma, sem_rcu_free);
 }
@@ -1292,9 +1221,9 @@ static int semctl_setval(struct ipc_namespace *ns, int semid, int semnum,
 	struct sem_undo *un;
 	struct sem_array *sma;
 	struct sem *curr;
-	int err;
-	struct list_head tasks;
-	int val;
+	int err, val;
+	WAKE_Q(wake_q);
+
 #if defined(CONFIG_64BIT) && defined(__BIG_ENDIAN)
 	/* big-endian 64bit */
 	val = arg >> 32;
@@ -1306,8 +1235,6 @@ static int semctl_setval(struct ipc_namespace *ns, int semid, int semnum,
 	if (val > SEMVMX || val < 0)
 		return -ERANGE;
 
-	INIT_LIST_HEAD(&tasks);
-
 	rcu_read_lock();
 	sma = sem_obtain_object_check(ns, semid);
 	if (IS_ERR(sma)) {
@@ -1350,10 +1277,10 @@ static int semctl_setval(struct ipc_namespace *ns, int semid, int semnum,
 	curr->sempid = task_tgid_vnr(current);
 	sma->sem_ctime = get_seconds();
 	/* maybe some queued-up processes were waiting for this */
-	do_smart_update(sma, NULL, 0, 0, &tasks);
+	do_smart_update(sma, NULL, 0, 0, &wake_q);
 	sem_unlock(sma, -1);
 	rcu_read_unlock();
-	wake_up_sem_queue_do(&tasks);
+	wake_up_q(&wake_q);
 	return 0;
 }
 
@@ -1365,9 +1292,7 @@ static int semctl_main(struct ipc_namespace *ns, int semid, int semnum,
 	int err, nsems;
 	ushort fast_sem_io[SEMMSL_FAST];
 	ushort *sem_io = fast_sem_io;
-	struct list_head tasks;
-
-	INIT_LIST_HEAD(&tasks);
+	WAKE_Q(wake_q);
 
 	rcu_read_lock();
 	sma = sem_obtain_object_check(ns, semid);
@@ -1478,7 +1403,7 @@ static int semctl_main(struct ipc_namespace *ns, int semid, int semnum,
 		}
 		sma->sem_ctime = get_seconds();
 		/* maybe some queued-up processes were waiting for this */
-		do_smart_update(sma, NULL, 0, 0, &tasks);
+		do_smart_update(sma, NULL, 0, 0, &wake_q);
 		err = 0;
 		goto out_unlock;
 	}
@@ -1514,7 +1439,7 @@ out_unlock:
 	sem_unlock(sma, -1);
 out_rcu_wakeup:
 	rcu_read_unlock();
-	wake_up_sem_queue_do(&tasks);
+	wake_up_q(&wake_q);
 out_free:
 	if (sem_io != fast_sem_io)
 		ipc_free(sem_io);
@@ -1787,32 +1712,6 @@ out:
 	return un;
 }
 
-
-/**
- * get_queue_result - retrieve the result code from sem_queue
- * @q: Pointer to queue structure
- *
- * Retrieve the return code from the pending queue. If IN_WAKEUP is found in
- * q->status, then we must loop until the value is replaced with the final
- * value: This may happen if a task is woken up by an unrelated event (e.g.
- * signal) and in parallel the task is woken up by another task because it got
- * the requested semaphores.
- *
- * The function can be called with or without holding the semaphore spinlock.
- */
-static int get_queue_result(struct sem_queue *q)
-{
-	int error;
-
-	error = q->status;
-	while (unlikely(error == IN_WAKEUP)) {
-		cpu_relax();
-		error = q->status;
-	}
-
-	return error;
-}
-
 SYSCALL_DEFINE4(semtimedop, int, semid, struct sembuf __user *, tsops,
 		unsigned, nsops, const struct timespec __user *, timeout)
 {
@@ -1825,7 +1724,6 @@ SYSCALL_DEFINE4(semtimedop, int, semid, struct sembuf __user *, tsops,
 	struct sem_queue queue;
 	unsigned long jiffies_left = 0;
 	struct ipc_namespace *ns;
-	struct list_head tasks;
 
 	ns = current->nsproxy->ipc_ns;
 
@@ -1865,7 +1763,6 @@ SYSCALL_DEFINE4(semtimedop, int, semid, struct sembuf __user *, tsops,
 			alter = 1;
 	}
 
-	INIT_LIST_HEAD(&tasks);
 
 	if (undos) {
 		/* On success, find_alloc_undo takes the rcu_read_lock */
@@ -1933,22 +1830,31 @@ SYSCALL_DEFINE4(semtimedop, int, semid, struct sembuf __user *, tsops,
 	queue.alter = alter;
 
 	error = perform_atomic_semop(sma, &queue);
-	if (error == 0) {
-		/* If the operation was successful, then do
+	if (error == 0) { /* non-blocking succesfull path */
+		WAKE_Q(wake_q);
+
+		/*
+		 * If the operation was successful, then do
 		 * the required updates.
 		 */
 		if (alter)
-			do_smart_update(sma, sops, nsops, 1, &tasks);
+			do_smart_update(sma, sops, nsops, 1, &wake_q);
 		else
 			set_semotime(sma, sops);
+
+		sem_unlock(sma, locknum);
+		rcu_read_unlock();
+		wake_up_q(&wake_q);
+
+		goto out_free;
 	}
-	if (error <= 0)
+	if (error < 0) /* non-blocking error path */
 		goto out_unlock_free;
 
-	/* We need to sleep on this operation, so we put the current
+	/*
+	 * We need to sleep on this operation, so we put the current
 	 * task into the pending queue and go to sleep.
 	 */
-
 	if (nsops == 1) {
 		struct sem *curr;
 		curr = &sma->sem_base[sops->sem_num];
@@ -1977,10 +1883,10 @@ SYSCALL_DEFINE4(semtimedop, int, semid, struct sembuf __user *, tsops,
 		sma->complex_count++;
 	}
 
+sleep_again:
 	queue.status = -EINTR;
 	queue.sleeper = current;
 
-sleep_again:
 	__set_current_state(TASK_INTERRUPTIBLE);
 	sem_unlock(sma, locknum);
 	rcu_read_unlock();
@@ -1990,28 +1896,30 @@ sleep_again:
 	else
 		schedule();
 
-	error = get_queue_result(&queue);
-
+	/*
+	 * fastpath: the semop has completed, either successfully or not, from
+	 * the syscall pov, is quite irrelevant to us at this point; we're done.
+	 *
+	 * We _do_ care, nonetheless, about being awoken by a signal or spuriously.
+	 * The queue.status is checked again in the slowpath (aka after taking
+	 * sem_lock), such that we can detect scenarios where we were awakened
+	 * externally, during the window between wake_q_add() and wake_up_q().
+	 */
+	error = READ_ONCE(queue.status);
 	if (error != -EINTR) {
-		/* fast path: update_queue already obtained all requested
-		 * resources.
-		 * Perform a smp_mb(): User space could assume that semop()
-		 * is a memory barrier: Without the mb(), the cpu could
-		 * speculatively read in user space stale data that was
-		 * overwritten by the previous owner of the semaphore.
+		/*
+		 * User space could assume that semop() is a memory barrier:
+		 * Without the mb(), the cpu could speculatively read in user
+		 * space stale data that was overwritten by the previous owner
+		 * of the semaphore.
 		 */
 		smp_mb();
-
 		goto out_free;
 	}
 
 	rcu_read_lock();
 	sma = sem_obtain_lock(ns, semid, sops, nsops, &locknum);
-
-	/*
-	 * Wait until it's guaranteed that no wakeup_sem_queue_do() is ongoing.
-	 */
-	error = get_queue_result(&queue);
+	error = READ_ONCE(queue.status);
 
 	/*
 	 * Array removed? If yes, leave without sem_unlock().
@@ -2021,7 +1929,6 @@ sleep_again:
 		goto out_free;
 	}
 
-
 	/*
 	 * If queue.status != -EINTR we are woken up by another process.
 	 * Leave without unlink_queue(), but with sem_unlock().
@@ -2030,13 +1937,13 @@ sleep_again:
 		goto out_unlock_free;
 
 	/*
-	 * If an interrupt occurred we have to clean up the queue
+	 * If an interrupt occurred we have to clean up the queue.
 	 */
 	if (timeout && jiffies_left == 0)
 		error = -EAGAIN;
 
 	/*
-	 * If the wakeup was spurious, just retry
+	 * If the wakeup was spurious, just retry.
 	 */
 	if (error == -EINTR && !signal_pending(current))
 		goto sleep_again;
@@ -2046,7 +1953,6 @@ sleep_again:
 out_unlock_free:
 	sem_unlock(sma, locknum);
 	rcu_read_unlock();
-	wake_up_sem_queue_do(&tasks);
 out_free:
 	if (sops != fast_sops)
 		kfree(sops);
@@ -2107,8 +2013,8 @@ void exit_sem(struct task_struct *tsk)
 	for (;;) {
 		struct sem_array *sma;
 		struct sem_undo *un;
-		struct list_head tasks;
 		int semid, i;
+		WAKE_Q(wake_q);
 
 		rcu_read_lock();
 		un = list_entry_rcu(ulp->list_proc.next,
@@ -2194,11 +2100,10 @@ void exit_sem(struct task_struct *tsk)
 			}
 		}
 		/* maybe some queued-up processes were waiting for this */
-		INIT_LIST_HEAD(&tasks);
-		do_smart_update(sma, NULL, 0, 1, &tasks);
+		do_smart_update(sma, NULL, 0, 1, &wake_q);
 		sem_unlock(sma, -1);
 		rcu_read_unlock();
-		wake_up_sem_queue_do(&tasks);
+		wake_up_q(&wake_q);
 
 		kfree_rcu(un, rcu);
 	}
-- 
2.6.6

^ permalink raw reply related	[flat|nested] 18+ messages in thread

end of thread, other threads:[~2016-09-19 18:26 UTC | newest]

Thread overview: 18+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2016-09-12 11:53 [PATCH -next 0/5] ipc/sem: semop(2) improvements Davidlohr Bueso
2016-09-12 11:53 ` [PATCH 1/5] ipc/sem: do not call wake_sem_queue_do() prematurely Davidlohr Bueso
2016-09-13  4:17   ` Manfred Spraul
2016-09-13  8:14     ` Davidlohr Bueso
2016-09-12 11:53 ` [PATCH 2/5] ipc/sem: rework task wakeups Davidlohr Bueso
2016-09-13 18:04   ` Manfred Spraul
2016-09-14 15:45     ` Davidlohr Bueso
2016-09-18 14:37   ` Manfred Spraul
2016-09-18 18:26     ` Davidlohr Bueso
2016-09-12 11:53 ` [PATCH 3/5] ipc/sem: optimize perform_atomic_semop() Davidlohr Bueso
2016-09-12 17:56   ` Manfred Spraul
2016-09-13  8:33     ` Davidlohr Bueso
2016-09-19  4:41       ` Manfred Spraul
2016-09-12 11:53 ` [PATCH 4/5] ipc/sem: explicitly inline check_restart Davidlohr Bueso
2016-09-12 11:53 ` [PATCH 5/5] ipc/sem: use proper list api for pending_list wakeups Davidlohr Bueso
2016-09-18 17:51   ` Manfred Spraul
2016-09-18 19:11 [PATCH -next v2 0/5] ipc/sem: semop(2) improvements Davidlohr Bueso
2016-09-18 19:11 ` [PATCH 2/5] ipc/sem: rework task wakeups Davidlohr Bueso
2016-09-19 18:26   ` Manfred Spraul

This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.