All of lore.kernel.org
 help / color / mirror / Atom feed
* [PATCH -next v2 0/5] ipc/sem: semop(2) improvements
@ 2016-09-18 19:11 Davidlohr Bueso
  2016-09-18 19:11 ` [PATCH 1/5] ipc/sem: do not call wake_sem_queue_do() prematurely Davidlohr Bueso
                   ` (5 more replies)
  0 siblings, 6 replies; 15+ messages in thread
From: Davidlohr Bueso @ 2016-09-18 19:11 UTC (permalink / raw)
  To: akpm; +Cc: manfred, dave, linux-kernel

Changes from v1 (https://lkml.org/lkml/2016/9/12/266)
- Got rid of the signal_pending check in wakeup fastpath. (patch 2)
- Added read/access once to queue.status (we're obviously concerned about
 lockless access upon unrelated events, even if on the stack).
- Got rid of initializing wake_q and wake_up_q call upon perform_atomic_semop
  error return path. (patch 2)
- Documented ordering between wake_q_add and setting ->status.
- What I did not do was refactor the checks in perfor_atomic_semop[_slow]
  as I could not get a decent/clean way of doing it without adding more
  unnecessary code. If we wanted to do smart semop scans that we received from
  userspace, this would still need to be done under sem_lock for semval values
  obviously. So I've left it as is, where we mainly duplicate the function, but
  I still believe this is the most straightforward way of dealing with this
  situation  (patch 3).
- Replaced using SEMOP_FAST with BITS_PER_LONG, as this is really what we want
  to limit the duplicate scanning.
- More testing.
- Added Manfred's ack (patch 5).

Hi,

Here are a few updates around the semop syscall handling that I noticed while
reviewing Manfred's simple vs complex ops fixes. Changes are on top of -next,
which means that Manfred's pending patches to ipc/sem.c that remove the redundant
barrier(s) would probably have to be rebased.

The patchset has survived the following testscases:
- ltp
- ipcsemtest (https://github.com/manfred-colorfu/ipcsemtest)
- ipcscale (https://github.com/manfred-colorfu/ipcscale)

Details are in each individual patch. Please consider for v4.9.

Thanks!

Davidlohr Bueso (5):
  ipc/sem: do not call wake_sem_queue_do() prematurely
  ipc/sem: rework task wakeups
  ipc/sem: optimize perform_atomic_semop()
  ipc/sem: explicitly inline check_restart
  ipc/sem: use proper list api for pending_list wakeups

 ipc/sem.c | 415 ++++++++++++++++++++++++++++++--------------------------------
 1 file changed, 199 insertions(+), 216 deletions(-)

-- 
2.6.6

^ permalink raw reply	[flat|nested] 15+ messages in thread

* [PATCH 1/5] ipc/sem: do not call wake_sem_queue_do() prematurely
  2016-09-18 19:11 [PATCH -next v2 0/5] ipc/sem: semop(2) improvements Davidlohr Bueso
@ 2016-09-18 19:11 ` Davidlohr Bueso
  2016-09-18 19:11 ` [PATCH 2/5] ipc/sem: rework task wakeups Davidlohr Bueso
                   ` (4 subsequent siblings)
  5 siblings, 0 replies; 15+ messages in thread
From: Davidlohr Bueso @ 2016-09-18 19:11 UTC (permalink / raw)
  To: akpm; +Cc: manfred, dave, linux-kernel, Davidlohr Bueso

... as this call should obviously be paired with its _prepare()
counterpart. At least whenever possible, as there is no harm in
calling it bogusly as we do now in a few places. Immediate error
semop(2) paths that are far from ever having the task block can
be simplified and avoid a few unnecessary loads on their way out
of the call as it is not deeply nested.

Signed-off-by: Davidlohr Bueso <dbueso@suse.de>
---
 ipc/sem.c | 19 ++++++++++++-------
 1 file changed, 12 insertions(+), 7 deletions(-)

diff --git a/ipc/sem.c b/ipc/sem.c
index 5e318c5f749d..a4e8bb2fae38 100644
--- a/ipc/sem.c
+++ b/ipc/sem.c
@@ -1887,16 +1887,22 @@ SYSCALL_DEFINE4(semtimedop, int, semid, struct sembuf __user *, tsops,
 	}
 
 	error = -EFBIG;
-	if (max >= sma->sem_nsems)
-		goto out_rcu_wakeup;
+	if (max >= sma->sem_nsems) {
+		rcu_read_unlock();
+		goto out_free;
+	}
 
 	error = -EACCES;
-	if (ipcperms(ns, &sma->sem_perm, alter ? S_IWUGO : S_IRUGO))
-		goto out_rcu_wakeup;
+	if (ipcperms(ns, &sma->sem_perm, alter ? S_IWUGO : S_IRUGO)) {
+		rcu_read_unlock();
+		goto out_free;
+	}
 
 	error = security_sem_semop(sma, sops, nsops, alter);
-	if (error)
-		goto out_rcu_wakeup;
+	if (error) {
+		rcu_read_unlock();
+		goto out_free;
+	}
 
 	error = -EIDRM;
 	locknum = sem_lock(sma, sops, nsops);
@@ -2039,7 +2045,6 @@ sleep_again:
 
 out_unlock_free:
 	sem_unlock(sma, locknum);
-out_rcu_wakeup:
 	rcu_read_unlock();
 	wake_up_sem_queue_do(&tasks);
 out_free:
-- 
2.6.6

^ permalink raw reply related	[flat|nested] 15+ messages in thread

* [PATCH 2/5] ipc/sem: rework task wakeups
  2016-09-18 19:11 [PATCH -next v2 0/5] ipc/sem: semop(2) improvements Davidlohr Bueso
  2016-09-18 19:11 ` [PATCH 1/5] ipc/sem: do not call wake_sem_queue_do() prematurely Davidlohr Bueso
@ 2016-09-18 19:11 ` Davidlohr Bueso
  2016-09-19 18:26   ` Manfred Spraul
  2016-09-18 19:11 ` [PATCH 3/5] ipc/sem: optimize perform_atomic_semop() Davidlohr Bueso
                   ` (3 subsequent siblings)
  5 siblings, 1 reply; 15+ messages in thread
From: Davidlohr Bueso @ 2016-09-18 19:11 UTC (permalink / raw)
  To: akpm; +Cc: manfred, dave, linux-kernel, Davidlohr Bueso

Our sysv sems have been using the notion of lockless wakeups for a while,
ever since 0a2b9d4c796 (ipc/sem.c: move wake_up_process out of the spinlock
section), in order to reduce the sem_lock hold times. This in-house pending
queue can be replaced by wake_q (just like all the rest of ipc now), in that
it provides the following advantages:

o Simplifies and gets rid of unnecessary code.

o We get rid of the IN_WAKEUP complexities. Given that wake_q_add() grabs
reference to the task, if awoken due to an unrelated event, between the
wake_q_add() and wake_up_q() window, we cannot race with sys_exit and the
imminent call to wake_up_process().

o By not spinning IN_WAKEUP, we no longer need to disable preemption.

In consequence, the wakeup paths (after schedule(), that is) must acknowledge
an external signal/event, as well spurious wakeup occurring during the pending
wakeup window. Obviously no changes in semantics that could be visible to the
user. The fastpath is _only_ for when we know for sure that we were awoken due
to a the waker's successful semop call (queue.status is not -EINTR).

On a 48-core Haswell, running the ipcscale 'waitforzero' test, the following
is seen with increasing thread counts:

                               v4.8-rc5                v4.8-rc5
                                                        semopv2
Hmean    sembench-sem-2      574733.00 (  0.00%)   578322.00 (  0.62%)
Hmean    sembench-sem-8      811708.00 (  0.00%)   824689.00 (  1.59%)
Hmean    sembench-sem-12     842448.00 (  0.00%)   845409.00 (  0.35%)
Hmean    sembench-sem-21     933003.00 (  0.00%)   977748.00 (  4.80%)
Hmean    sembench-sem-48     935910.00 (  0.00%)  1004759.00 (  7.36%)
Hmean    sembench-sem-79     937186.00 (  0.00%)   983976.00 (  4.99%)
Hmean    sembench-sem-234    974256.00 (  0.00%)  1060294.00 (  8.83%)
Hmean    sembench-sem-265    975468.00 (  0.00%)  1016243.00 (  4.18%)
Hmean    sembench-sem-296    991280.00 (  0.00%)  1042659.00 (  5.18%)
Hmean    sembench-sem-327    975415.00 (  0.00%)  1029977.00 (  5.59%)
Hmean    sembench-sem-358   1014286.00 (  0.00%)  1049624.00 (  3.48%)
Hmean    sembench-sem-389    972939.00 (  0.00%)  1043127.00 (  7.21%)
Hmean    sembench-sem-420    981909.00 (  0.00%)  1056747.00 (  7.62%)
Hmean    sembench-sem-451    990139.00 (  0.00%)  1051609.00 (  6.21%)
Hmean    sembench-sem-482    965735.00 (  0.00%)  1040313.00 (  7.72%)

Signed-off-by: Davidlohr Bueso <dbueso@suse.de>
---
 ipc/sem.c | 265 ++++++++++++++++++++------------------------------------------
 1 file changed, 85 insertions(+), 180 deletions(-)

diff --git a/ipc/sem.c b/ipc/sem.c
index a4e8bb2fae38..4d836035f9bb 100644
--- a/ipc/sem.c
+++ b/ipc/sem.c
@@ -11,6 +11,7 @@
  * (c) 2001 Red Hat Inc
  * Lockless wakeup
  * (c) 2003 Manfred Spraul <manfred@colorfullife.com>
+ * (c) 2016 Davidlohr Bueso <dave@stgolabs.net>
  * Further wakeup optimizations, documentation
  * (c) 2010 Manfred Spraul <manfred@colorfullife.com>
  *
@@ -53,15 +54,11 @@
  *   Semaphores are actively given to waiting tasks (necessary for FIFO).
  *   (see update_queue())
  * - To improve the scalability, the actual wake-up calls are performed after
- *   dropping all locks. (see wake_up_sem_queue_prepare(),
- *   wake_up_sem_queue_do())
+ *   dropping all locks. (see wake_up_sem_queue_prepare())
  * - All work is done by the waker, the woken up task does not have to do
  *   anything - not even acquiring a lock or dropping a refcount.
  * - A woken up task may not even touch the semaphore array anymore, it may
  *   have been destroyed already by a semctl(RMID).
- * - The synchronizations between wake-ups due to a timeout/signal and a
- *   wake-up due to a completed semaphore operation is achieved by using an
- *   intermediate state (IN_WAKEUP).
  * - UNDO values are stored in an array (one per process and per
  *   semaphore array, lazily allocated). For backwards compatibility, multiple
  *   modes for the UNDO variables are supported (per process, per thread)
@@ -471,40 +468,6 @@ static inline void sem_rmid(struct ipc_namespace *ns, struct sem_array *s)
 	ipc_rmid(&sem_ids(ns), &s->sem_perm);
 }
 
-/*
- * Lockless wakeup algorithm:
- * Without the check/retry algorithm a lockless wakeup is possible:
- * - queue.status is initialized to -EINTR before blocking.
- * - wakeup is performed by
- *	* unlinking the queue entry from the pending list
- *	* setting queue.status to IN_WAKEUP
- *	  This is the notification for the blocked thread that a
- *	  result value is imminent.
- *	* call wake_up_process
- *	* set queue.status to the final value.
- * - the previously blocked thread checks queue.status:
- *	* if it's IN_WAKEUP, then it must wait until the value changes
- *	* if it's not -EINTR, then the operation was completed by
- *	  update_queue. semtimedop can return queue.status without
- *	  performing any operation on the sem array.
- *	* otherwise it must acquire the spinlock and check what's up.
- *
- * The two-stage algorithm is necessary to protect against the following
- * races:
- * - if queue.status is set after wake_up_process, then the woken up idle
- *   thread could race forward and try (and fail) to acquire sma->lock
- *   before update_queue had a chance to set queue.status
- * - if queue.status is written before wake_up_process and if the
- *   blocked process is woken up by a signal between writing
- *   queue.status and the wake_up_process, then the woken up
- *   process could return from semtimedop and die by calling
- *   sys_exit before wake_up_process is called. Then wake_up_process
- *   will oops, because the task structure is already invalid.
- *   (yes, this happened on s390 with sysv msg).
- *
- */
-#define IN_WAKEUP	1
-
 /**
  * newary - Create a new semaphore set
  * @ns: namespace
@@ -703,51 +666,18 @@ undo:
 	return result;
 }
 
-/** wake_up_sem_queue_prepare(q, error): Prepare wake-up
- * @q: queue entry that must be signaled
- * @error: Error value for the signal
- *
- * Prepare the wake-up of the queue entry q.
- */
-static void wake_up_sem_queue_prepare(struct list_head *pt,
-				struct sem_queue *q, int error)
-{
-	if (list_empty(pt)) {
-		/*
-		 * Hold preempt off so that we don't get preempted and have the
-		 * wakee busy-wait until we're scheduled back on.
-		 */
-		preempt_disable();
-	}
-	q->status = IN_WAKEUP;
-	q->pid = error;
-
-	list_add_tail(&q->list, pt);
-}
-
-/**
- * wake_up_sem_queue_do - do the actual wake-up
- * @pt: list of tasks to be woken up
- *
- * Do the actual wake-up.
- * The function is called without any locks held, thus the semaphore array
- * could be destroyed already and the tasks can disappear as soon as the
- * status is set to the actual return code.
- */
-static void wake_up_sem_queue_do(struct list_head *pt)
+static inline void wake_up_sem_queue_prepare(struct sem_queue *q, int error,
+					     struct wake_q_head *wake_q)
 {
-	struct sem_queue *q, *t;
-	int did_something;
-
-	did_something = !list_empty(pt);
-	list_for_each_entry_safe(q, t, pt, list) {
-		wake_up_process(q->sleeper);
-		/* q can disappear immediately after writing q->status. */
-		smp_wmb();
-		q->status = q->pid;
-	}
-	if (did_something)
-		preempt_enable();
+	wake_q_add(wake_q, q->sleeper);
+	/*
+	 * Rely on the above implicit barrier, such that we can
+	 * ensure that we hold reference to the task before setting
+	 * q->status. Otherwise we could race with do_exit if the
+	 * task is awoken by an external event before calling
+	 * wake_up_process().
+	 */
+	WRITE_ONCE(q->status, error);
 }
 
 static void unlink_queue(struct sem_array *sma, struct sem_queue *q)
@@ -795,18 +725,18 @@ static int check_restart(struct sem_array *sma, struct sem_queue *q)
  * wake_const_ops - wake up non-alter tasks
  * @sma: semaphore array.
  * @semnum: semaphore that was modified.
- * @pt: list head for the tasks that must be woken up.
+ * @wake_q: lockless wake-queue head.
  *
  * wake_const_ops must be called after a semaphore in a semaphore array
  * was set to 0. If complex const operations are pending, wake_const_ops must
  * be called with semnum = -1, as well as with the number of each modified
  * semaphore.
- * The tasks that must be woken up are added to @pt. The return code
+ * The tasks that must be woken up are added to @wake_q. The return code
  * is stored in q->pid.
  * The function returns 1 if at least one operation was completed successfully.
  */
 static int wake_const_ops(struct sem_array *sma, int semnum,
-				struct list_head *pt)
+			  struct wake_q_head *wake_q)
 {
 	struct sem_queue *q;
 	struct list_head *walk;
@@ -832,7 +762,7 @@ static int wake_const_ops(struct sem_array *sma, int semnum,
 
 			unlink_queue(sma, q);
 
-			wake_up_sem_queue_prepare(pt, q, error);
+			wake_up_sem_queue_prepare(q, error, wake_q);
 			if (error == 0)
 				semop_completed = 1;
 		}
@@ -845,14 +775,14 @@ static int wake_const_ops(struct sem_array *sma, int semnum,
  * @sma: semaphore array
  * @sops: operations that were performed
  * @nsops: number of operations
- * @pt: list head of the tasks that must be woken up.
+ * @wake_q: lockless wake-queue head
  *
  * Checks all required queue for wait-for-zero operations, based
  * on the actual changes that were performed on the semaphore array.
  * The function returns 1 if at least one operation was completed successfully.
  */
 static int do_smart_wakeup_zero(struct sem_array *sma, struct sembuf *sops,
-					int nsops, struct list_head *pt)
+				int nsops, struct wake_q_head *wake_q)
 {
 	int i;
 	int semop_completed = 0;
@@ -865,7 +795,7 @@ static int do_smart_wakeup_zero(struct sem_array *sma, struct sembuf *sops,
 
 			if (sma->sem_base[num].semval == 0) {
 				got_zero = 1;
-				semop_completed |= wake_const_ops(sma, num, pt);
+				semop_completed |= wake_const_ops(sma, num, wake_q);
 			}
 		}
 	} else {
@@ -876,7 +806,7 @@ static int do_smart_wakeup_zero(struct sem_array *sma, struct sembuf *sops,
 		for (i = 0; i < sma->sem_nsems; i++) {
 			if (sma->sem_base[i].semval == 0) {
 				got_zero = 1;
-				semop_completed |= wake_const_ops(sma, i, pt);
+				semop_completed |= wake_const_ops(sma, i, wake_q);
 			}
 		}
 	}
@@ -885,7 +815,7 @@ static int do_smart_wakeup_zero(struct sem_array *sma, struct sembuf *sops,
 	 * then check the global queue, too.
 	 */
 	if (got_zero)
-		semop_completed |= wake_const_ops(sma, -1, pt);
+		semop_completed |= wake_const_ops(sma, -1, wake_q);
 
 	return semop_completed;
 }
@@ -895,19 +825,19 @@ static int do_smart_wakeup_zero(struct sem_array *sma, struct sembuf *sops,
  * update_queue - look for tasks that can be completed.
  * @sma: semaphore array.
  * @semnum: semaphore that was modified.
- * @pt: list head for the tasks that must be woken up.
+ * @wake_q: lockless wake-queue head.
  *
  * update_queue must be called after a semaphore in a semaphore array
  * was modified. If multiple semaphores were modified, update_queue must
  * be called with semnum = -1, as well as with the number of each modified
  * semaphore.
- * The tasks that must be woken up are added to @pt. The return code
+ * The tasks that must be woken up are added to @wake_q. The return code
  * is stored in q->pid.
  * The function internally checks if const operations can now succeed.
  *
  * The function return 1 if at least one semop was completed successfully.
  */
-static int update_queue(struct sem_array *sma, int semnum, struct list_head *pt)
+static int update_queue(struct sem_array *sma, int semnum, struct wake_q_head *wake_q)
 {
 	struct sem_queue *q;
 	struct list_head *walk;
@@ -949,11 +879,11 @@ again:
 			restart = 0;
 		} else {
 			semop_completed = 1;
-			do_smart_wakeup_zero(sma, q->sops, q->nsops, pt);
+			do_smart_wakeup_zero(sma, q->sops, q->nsops, wake_q);
 			restart = check_restart(sma, q);
 		}
 
-		wake_up_sem_queue_prepare(pt, q, error);
+		wake_up_sem_queue_prepare(q, error, wake_q);
 		if (restart)
 			goto again;
 	}
@@ -984,24 +914,24 @@ static void set_semotime(struct sem_array *sma, struct sembuf *sops)
  * @sops: operations that were performed
  * @nsops: number of operations
  * @otime: force setting otime
- * @pt: list head of the tasks that must be woken up.
+ * @wake_q: lockless wake-queue head
  *
  * do_smart_update() does the required calls to update_queue and wakeup_zero,
  * based on the actual changes that were performed on the semaphore array.
  * Note that the function does not do the actual wake-up: the caller is
- * responsible for calling wake_up_sem_queue_do(@pt).
+ * responsible for calling wake_up_q().
  * It is safe to perform this call after dropping all locks.
  */
 static void do_smart_update(struct sem_array *sma, struct sembuf *sops, int nsops,
-			int otime, struct list_head *pt)
+			    int otime, struct wake_q_head *wake_q)
 {
 	int i;
 
-	otime |= do_smart_wakeup_zero(sma, sops, nsops, pt);
+	otime |= do_smart_wakeup_zero(sma, sops, nsops, wake_q);
 
 	if (!list_empty(&sma->pending_alter)) {
 		/* semaphore array uses the global queue - just process it. */
-		otime |= update_queue(sma, -1, pt);
+		otime |= update_queue(sma, -1, wake_q);
 	} else {
 		if (!sops) {
 			/*
@@ -1009,7 +939,7 @@ static void do_smart_update(struct sem_array *sma, struct sembuf *sops, int nsop
 			 * known. Check all.
 			 */
 			for (i = 0; i < sma->sem_nsems; i++)
-				otime |= update_queue(sma, i, pt);
+				otime |= update_queue(sma, i, wake_q);
 		} else {
 			/*
 			 * Check the semaphores that were increased:
@@ -1023,7 +953,7 @@ static void do_smart_update(struct sem_array *sma, struct sembuf *sops, int nsop
 			for (i = 0; i < nsops; i++) {
 				if (sops[i].sem_op > 0) {
 					otime |= update_queue(sma,
-							sops[i].sem_num, pt);
+							      sops[i].sem_num, wake_q);
 				}
 			}
 		}
@@ -1111,8 +1041,8 @@ static void freeary(struct ipc_namespace *ns, struct kern_ipc_perm *ipcp)
 	struct sem_undo *un, *tu;
 	struct sem_queue *q, *tq;
 	struct sem_array *sma = container_of(ipcp, struct sem_array, sem_perm);
-	struct list_head tasks;
 	int i;
+	WAKE_Q(wake_q);
 
 	/* Free the existing undo structures for this semaphore set.  */
 	ipc_assert_locked_object(&sma->sem_perm);
@@ -1126,25 +1056,24 @@ static void freeary(struct ipc_namespace *ns, struct kern_ipc_perm *ipcp)
 	}
 
 	/* Wake up all pending processes and let them fail with EIDRM. */
-	INIT_LIST_HEAD(&tasks);
 	list_for_each_entry_safe(q, tq, &sma->pending_const, list) {
 		unlink_queue(sma, q);
-		wake_up_sem_queue_prepare(&tasks, q, -EIDRM);
+		wake_up_sem_queue_prepare(q, -EIDRM, &wake_q);
 	}
 
 	list_for_each_entry_safe(q, tq, &sma->pending_alter, list) {
 		unlink_queue(sma, q);
-		wake_up_sem_queue_prepare(&tasks, q, -EIDRM);
+		wake_up_sem_queue_prepare(q, -EIDRM, &wake_q);
 	}
 	for (i = 0; i < sma->sem_nsems; i++) {
 		struct sem *sem = sma->sem_base + i;
 		list_for_each_entry_safe(q, tq, &sem->pending_const, list) {
 			unlink_queue(sma, q);
-			wake_up_sem_queue_prepare(&tasks, q, -EIDRM);
+			wake_up_sem_queue_prepare(q, -EIDRM, &wake_q);
 		}
 		list_for_each_entry_safe(q, tq, &sem->pending_alter, list) {
 			unlink_queue(sma, q);
-			wake_up_sem_queue_prepare(&tasks, q, -EIDRM);
+			wake_up_sem_queue_prepare(q, -EIDRM, &wake_q);
 		}
 	}
 
@@ -1153,7 +1082,7 @@ static void freeary(struct ipc_namespace *ns, struct kern_ipc_perm *ipcp)
 	sem_unlock(sma, -1);
 	rcu_read_unlock();
 
-	wake_up_sem_queue_do(&tasks);
+	wake_up_q(&wake_q);
 	ns->used_sems -= sma->sem_nsems;
 	ipc_rcu_putref(sma, sem_rcu_free);
 }
@@ -1292,9 +1221,9 @@ static int semctl_setval(struct ipc_namespace *ns, int semid, int semnum,
 	struct sem_undo *un;
 	struct sem_array *sma;
 	struct sem *curr;
-	int err;
-	struct list_head tasks;
-	int val;
+	int err, val;
+	WAKE_Q(wake_q);
+
 #if defined(CONFIG_64BIT) && defined(__BIG_ENDIAN)
 	/* big-endian 64bit */
 	val = arg >> 32;
@@ -1306,8 +1235,6 @@ static int semctl_setval(struct ipc_namespace *ns, int semid, int semnum,
 	if (val > SEMVMX || val < 0)
 		return -ERANGE;
 
-	INIT_LIST_HEAD(&tasks);
-
 	rcu_read_lock();
 	sma = sem_obtain_object_check(ns, semid);
 	if (IS_ERR(sma)) {
@@ -1350,10 +1277,10 @@ static int semctl_setval(struct ipc_namespace *ns, int semid, int semnum,
 	curr->sempid = task_tgid_vnr(current);
 	sma->sem_ctime = get_seconds();
 	/* maybe some queued-up processes were waiting for this */
-	do_smart_update(sma, NULL, 0, 0, &tasks);
+	do_smart_update(sma, NULL, 0, 0, &wake_q);
 	sem_unlock(sma, -1);
 	rcu_read_unlock();
-	wake_up_sem_queue_do(&tasks);
+	wake_up_q(&wake_q);
 	return 0;
 }
 
@@ -1365,9 +1292,7 @@ static int semctl_main(struct ipc_namespace *ns, int semid, int semnum,
 	int err, nsems;
 	ushort fast_sem_io[SEMMSL_FAST];
 	ushort *sem_io = fast_sem_io;
-	struct list_head tasks;
-
-	INIT_LIST_HEAD(&tasks);
+	WAKE_Q(wake_q);
 
 	rcu_read_lock();
 	sma = sem_obtain_object_check(ns, semid);
@@ -1478,7 +1403,7 @@ static int semctl_main(struct ipc_namespace *ns, int semid, int semnum,
 		}
 		sma->sem_ctime = get_seconds();
 		/* maybe some queued-up processes were waiting for this */
-		do_smart_update(sma, NULL, 0, 0, &tasks);
+		do_smart_update(sma, NULL, 0, 0, &wake_q);
 		err = 0;
 		goto out_unlock;
 	}
@@ -1514,7 +1439,7 @@ out_unlock:
 	sem_unlock(sma, -1);
 out_rcu_wakeup:
 	rcu_read_unlock();
-	wake_up_sem_queue_do(&tasks);
+	wake_up_q(&wake_q);
 out_free:
 	if (sem_io != fast_sem_io)
 		ipc_free(sem_io);
@@ -1787,32 +1712,6 @@ out:
 	return un;
 }
 
-
-/**
- * get_queue_result - retrieve the result code from sem_queue
- * @q: Pointer to queue structure
- *
- * Retrieve the return code from the pending queue. If IN_WAKEUP is found in
- * q->status, then we must loop until the value is replaced with the final
- * value: This may happen if a task is woken up by an unrelated event (e.g.
- * signal) and in parallel the task is woken up by another task because it got
- * the requested semaphores.
- *
- * The function can be called with or without holding the semaphore spinlock.
- */
-static int get_queue_result(struct sem_queue *q)
-{
-	int error;
-
-	error = q->status;
-	while (unlikely(error == IN_WAKEUP)) {
-		cpu_relax();
-		error = q->status;
-	}
-
-	return error;
-}
-
 SYSCALL_DEFINE4(semtimedop, int, semid, struct sembuf __user *, tsops,
 		unsigned, nsops, const struct timespec __user *, timeout)
 {
@@ -1825,7 +1724,6 @@ SYSCALL_DEFINE4(semtimedop, int, semid, struct sembuf __user *, tsops,
 	struct sem_queue queue;
 	unsigned long jiffies_left = 0;
 	struct ipc_namespace *ns;
-	struct list_head tasks;
 
 	ns = current->nsproxy->ipc_ns;
 
@@ -1865,7 +1763,6 @@ SYSCALL_DEFINE4(semtimedop, int, semid, struct sembuf __user *, tsops,
 			alter = 1;
 	}
 
-	INIT_LIST_HEAD(&tasks);
 
 	if (undos) {
 		/* On success, find_alloc_undo takes the rcu_read_lock */
@@ -1933,22 +1830,31 @@ SYSCALL_DEFINE4(semtimedop, int, semid, struct sembuf __user *, tsops,
 	queue.alter = alter;
 
 	error = perform_atomic_semop(sma, &queue);
-	if (error == 0) {
-		/* If the operation was successful, then do
+	if (error == 0) { /* non-blocking succesfull path */
+		WAKE_Q(wake_q);
+
+		/*
+		 * If the operation was successful, then do
 		 * the required updates.
 		 */
 		if (alter)
-			do_smart_update(sma, sops, nsops, 1, &tasks);
+			do_smart_update(sma, sops, nsops, 1, &wake_q);
 		else
 			set_semotime(sma, sops);
+
+		sem_unlock(sma, locknum);
+		rcu_read_unlock();
+		wake_up_q(&wake_q);
+
+		goto out_free;
 	}
-	if (error <= 0)
+	if (error < 0) /* non-blocking error path */
 		goto out_unlock_free;
 
-	/* We need to sleep on this operation, so we put the current
+	/*
+	 * We need to sleep on this operation, so we put the current
 	 * task into the pending queue and go to sleep.
 	 */
-
 	if (nsops == 1) {
 		struct sem *curr;
 		curr = &sma->sem_base[sops->sem_num];
@@ -1977,10 +1883,10 @@ SYSCALL_DEFINE4(semtimedop, int, semid, struct sembuf __user *, tsops,
 		sma->complex_count++;
 	}
 
+sleep_again:
 	queue.status = -EINTR;
 	queue.sleeper = current;
 
-sleep_again:
 	__set_current_state(TASK_INTERRUPTIBLE);
 	sem_unlock(sma, locknum);
 	rcu_read_unlock();
@@ -1990,28 +1896,30 @@ sleep_again:
 	else
 		schedule();
 
-	error = get_queue_result(&queue);
-
+	/*
+	 * fastpath: the semop has completed, either successfully or not, from
+	 * the syscall pov, is quite irrelevant to us at this point; we're done.
+	 *
+	 * We _do_ care, nonetheless, about being awoken by a signal or spuriously.
+	 * The queue.status is checked again in the slowpath (aka after taking
+	 * sem_lock), such that we can detect scenarios where we were awakened
+	 * externally, during the window between wake_q_add() and wake_up_q().
+	 */
+	error = READ_ONCE(queue.status);
 	if (error != -EINTR) {
-		/* fast path: update_queue already obtained all requested
-		 * resources.
-		 * Perform a smp_mb(): User space could assume that semop()
-		 * is a memory barrier: Without the mb(), the cpu could
-		 * speculatively read in user space stale data that was
-		 * overwritten by the previous owner of the semaphore.
+		/*
+		 * User space could assume that semop() is a memory barrier:
+		 * Without the mb(), the cpu could speculatively read in user
+		 * space stale data that was overwritten by the previous owner
+		 * of the semaphore.
 		 */
 		smp_mb();
-
 		goto out_free;
 	}
 
 	rcu_read_lock();
 	sma = sem_obtain_lock(ns, semid, sops, nsops, &locknum);
-
-	/*
-	 * Wait until it's guaranteed that no wakeup_sem_queue_do() is ongoing.
-	 */
-	error = get_queue_result(&queue);
+	error = READ_ONCE(queue.status);
 
 	/*
 	 * Array removed? If yes, leave without sem_unlock().
@@ -2021,7 +1929,6 @@ sleep_again:
 		goto out_free;
 	}
 
-
 	/*
 	 * If queue.status != -EINTR we are woken up by another process.
 	 * Leave without unlink_queue(), but with sem_unlock().
@@ -2030,13 +1937,13 @@ sleep_again:
 		goto out_unlock_free;
 
 	/*
-	 * If an interrupt occurred we have to clean up the queue
+	 * If an interrupt occurred we have to clean up the queue.
 	 */
 	if (timeout && jiffies_left == 0)
 		error = -EAGAIN;
 
 	/*
-	 * If the wakeup was spurious, just retry
+	 * If the wakeup was spurious, just retry.
 	 */
 	if (error == -EINTR && !signal_pending(current))
 		goto sleep_again;
@@ -2046,7 +1953,6 @@ sleep_again:
 out_unlock_free:
 	sem_unlock(sma, locknum);
 	rcu_read_unlock();
-	wake_up_sem_queue_do(&tasks);
 out_free:
 	if (sops != fast_sops)
 		kfree(sops);
@@ -2107,8 +2013,8 @@ void exit_sem(struct task_struct *tsk)
 	for (;;) {
 		struct sem_array *sma;
 		struct sem_undo *un;
-		struct list_head tasks;
 		int semid, i;
+		WAKE_Q(wake_q);
 
 		rcu_read_lock();
 		un = list_entry_rcu(ulp->list_proc.next,
@@ -2194,11 +2100,10 @@ void exit_sem(struct task_struct *tsk)
 			}
 		}
 		/* maybe some queued-up processes were waiting for this */
-		INIT_LIST_HEAD(&tasks);
-		do_smart_update(sma, NULL, 0, 1, &tasks);
+		do_smart_update(sma, NULL, 0, 1, &wake_q);
 		sem_unlock(sma, -1);
 		rcu_read_unlock();
-		wake_up_sem_queue_do(&tasks);
+		wake_up_q(&wake_q);
 
 		kfree_rcu(un, rcu);
 	}
-- 
2.6.6

^ permalink raw reply related	[flat|nested] 15+ messages in thread

* [PATCH 3/5] ipc/sem: optimize perform_atomic_semop()
  2016-09-18 19:11 [PATCH -next v2 0/5] ipc/sem: semop(2) improvements Davidlohr Bueso
  2016-09-18 19:11 ` [PATCH 1/5] ipc/sem: do not call wake_sem_queue_do() prematurely Davidlohr Bueso
  2016-09-18 19:11 ` [PATCH 2/5] ipc/sem: rework task wakeups Davidlohr Bueso
@ 2016-09-18 19:11 ` Davidlohr Bueso
  2016-09-21 19:46   ` [PATCH v3] " Davidlohr Bueso
  2016-09-18 19:11 ` [PATCH 4/5] ipc/sem: explicitly inline check_restart Davidlohr Bueso
                   ` (2 subsequent siblings)
  5 siblings, 1 reply; 15+ messages in thread
From: Davidlohr Bueso @ 2016-09-18 19:11 UTC (permalink / raw)
  To: akpm; +Cc: manfred, dave, linux-kernel, Davidlohr Bueso

This is the main workhorse that deals with semop user calls
such that the waitforzero or semval update operations, on the
set, can complete on not as the sma currently stands. Currently,
the set is iterated twice (setting semval, then backwards for
the sempid value). Slowpaths, and particularly SEM_UNDO calls,
must undo any altered sem when it is detected that the caller
must block or has errored-out.

With larger sets, there can occur situations where this involves
a lot of cycles and can obviously be a suboptimal use of cached
resources in shared memory. Ie, discarding CPU caches that are
also calling semop and have the sembuf cached (and can complete),
while the current lock holder doing the semop will block, error,
or does a waitforzero operation.

This patch proposes still iterating the set twice, but the first
scan is read-only, and we perform the actual updates afterward,
once we know that the call will succeed. In order to not suffer
from the overhead of dealing with sops that act on the same sem_num,
such (rare) cases use perform_atomic_semop_slow(), which is exactly
what we have now. Duplicates are detected before grabbing sem_lock,
and uses simple a 64-bit variable to enable the sem_num-th bit.
Of course, this means that semops calls with a sem_num larger than
BITS_PER_LONG will take the _slow() alternative; but many real-world
workloads only work on a handful of semaphores in a given set, thus
good enough for the common case.

In addition add some comments to when we expect to the caller
to block.

Signed-off-by: Davidlohr Bueso <dbueso@suse.de>
---
 ipc/sem.c | 103 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++------
 1 file changed, 94 insertions(+), 9 deletions(-)

diff --git a/ipc/sem.c b/ipc/sem.c
index 4d836035f9bb..e868b5933ff8 100644
--- a/ipc/sem.c
+++ b/ipc/sem.c
@@ -115,7 +115,8 @@ struct sem_queue {
 	struct sembuf		*sops;	 /* array of pending operations */
 	struct sembuf		*blocking; /* the operation that blocked */
 	int			nsops;	 /* number of operations */
-	int			alter;	 /* does *sops alter the array? */
+	bool			alter;	 /* does *sops alter the array? */
+	bool                    dupsop;	 /* sops on more than one sem_num */
 };
 
 /* Each task has a list of undo requests. They are executed automatically
@@ -587,15 +588,23 @@ SYSCALL_DEFINE3(semget, key_t, key, int, nsems, int, semflg)
 }
 
 /**
- * perform_atomic_semop - Perform (if possible) a semaphore operation
+ * perform_atomic_semop[_slow] - Attempt to perform semaphore
+ *                               operations on a given array.
  * @sma: semaphore array
  * @q: struct sem_queue that describes the operation
  *
+ * Caller blocking are as follows, based the value
+ * indicated by the semaphore operation (sem_op):
+ *
+ *  (1) >0 never blocks.
+ *  (2)  0 (wait-for-zero operation): semval is non-zero.
+ *  (3) <0 attempting to decrement semval to a value smaller than zero.
+ *
  * Returns 0 if the operation was possible.
  * Returns 1 if the operation is impossible, the caller must sleep.
- * Negative values are error codes.
+ * Returns <0 for error codes.
  */
-static int perform_atomic_semop(struct sem_array *sma, struct sem_queue *q)
+static int perform_atomic_semop_slow(struct sem_array *sma, struct sem_queue *q)
 {
 	int result, sem_op, nsops, pid;
 	struct sembuf *sop;
@@ -666,6 +675,71 @@ undo:
 	return result;
 }
 
+static int perform_atomic_semop(struct sem_array *sma, struct sem_queue *q)
+{
+	int result, sem_op, nsops;
+	struct sembuf *sop;
+	struct sem *curr;
+	struct sembuf *sops;
+	struct sem_undo *un;
+
+	sops = q->sops;
+	nsops = q->nsops;
+	un = q->undo;
+
+	if (unlikely(q->dupsop))
+		return perform_atomic_semop_slow(sma, q);
+
+	/*
+	 * We scan the semaphore set twice, first to ensure that the entire
+	 * operation can succeed, therefore avoiding any pointless writes
+	 * to shared memory and having to undo such changes in order to block
+	 * until the operations can go through.
+	 */
+	for (sop = sops; sop < sops + nsops; sop++) {
+		curr = sma->sem_base + sop->sem_num;
+		sem_op = sop->sem_op;
+		result = curr->semval;
+
+		if (!sem_op && result)
+			goto would_block; /* wait-for-zero */
+
+		result += sem_op;
+		if (result < 0)
+			goto would_block;
+
+		if (result > SEMVMX)
+			return -ERANGE;
+
+		if (sop->sem_flg & SEM_UNDO) {
+			int undo = un->semadj[sop->sem_num] - sem_op;
+
+			/* Exceeding the undo range is an error. */
+			if (undo < (-SEMAEM - 1) || undo > SEMAEM)
+				return -ERANGE;
+		}
+	}
+
+	for (sop = sops; sop < sops + nsops; sop++) {
+		curr = sma->sem_base + sop->sem_num;
+		sem_op = sop->sem_op;
+		result = curr->semval;
+
+		if (sop->sem_flg & SEM_UNDO) {
+			int undo = un->semadj[sop->sem_num] - sem_op;
+			un->semadj[sop->sem_num] = undo;
+		}
+		curr->semval += sem_op;
+		curr->sempid = q->pid;
+	}
+
+	return 0;
+
+would_block:
+	q->blocking = sop;
+	return sop->sem_flg & IPC_NOWAIT? -EAGAIN : 1;
+}
+
 static inline void wake_up_sem_queue_prepare(struct sem_queue *q, int error,
 					     struct wake_q_head *wake_q)
 {
@@ -1720,9 +1794,10 @@ SYSCALL_DEFINE4(semtimedop, int, semid, struct sembuf __user *, tsops,
 	struct sembuf fast_sops[SEMOPM_FAST];
 	struct sembuf *sops = fast_sops, *sop;
 	struct sem_undo *un;
-	int undos = 0, alter = 0, max, locknum;
+	int max, locknum;
+	bool undos = false, alter = false, dupsop = false;
 	struct sem_queue queue;
-	unsigned long jiffies_left = 0;
+	unsigned long dup = 0, jiffies_left = 0;
 	struct ipc_namespace *ns;
 
 	ns = current->nsproxy->ipc_ns;
@@ -1736,10 +1811,12 @@ SYSCALL_DEFINE4(semtimedop, int, semid, struct sembuf __user *, tsops,
 		if (sops == NULL)
 			return -ENOMEM;
 	}
+
 	if (copy_from_user(sops, tsops, nsops * sizeof(*tsops))) {
 		error =  -EFAULT;
 		goto out_free;
 	}
+
 	if (timeout) {
 		struct timespec _timeout;
 		if (copy_from_user(&_timeout, timeout, sizeof(*timeout))) {
@@ -1753,17 +1830,24 @@ SYSCALL_DEFINE4(semtimedop, int, semid, struct sembuf __user *, tsops,
 		}
 		jiffies_left = timespec_to_jiffies(&_timeout);
 	}
+
 	max = 0;
 	for (sop = sops; sop < sops + nsops; sop++) {
 		if (sop->sem_num >= max)
 			max = sop->sem_num;
 		if (sop->sem_flg & SEM_UNDO)
-			undos = 1;
+			undos = true;
 		if (sop->sem_op != 0)
-			alter = 1;
+			alter = true;
+		/* detect duplicate sops */
+		if (sop->sem_num < BITS_PER_LONG && !dupsop) {
+			if (dup & (1 << sop->sem_num))
+				dupsop = true;
+			else
+				dup |= 1 << sop->sem_num;
+		}
 	}
 
-
 	if (undos) {
 		/* On success, find_alloc_undo takes the rcu_read_lock */
 		un = find_alloc_undo(ns, semid);
@@ -1828,6 +1912,7 @@ SYSCALL_DEFINE4(semtimedop, int, semid, struct sembuf __user *, tsops,
 	queue.undo = un;
 	queue.pid = task_tgid_vnr(current);
 	queue.alter = alter;
+	queue.dupsop = dupsop;
 
 	error = perform_atomic_semop(sma, &queue);
 	if (error == 0) { /* non-blocking succesfull path */
-- 
2.6.6

^ permalink raw reply related	[flat|nested] 15+ messages in thread

* [PATCH 4/5] ipc/sem: explicitly inline check_restart
  2016-09-18 19:11 [PATCH -next v2 0/5] ipc/sem: semop(2) improvements Davidlohr Bueso
                   ` (2 preceding siblings ...)
  2016-09-18 19:11 ` [PATCH 3/5] ipc/sem: optimize perform_atomic_semop() Davidlohr Bueso
@ 2016-09-18 19:11 ` Davidlohr Bueso
  2016-09-18 19:11 ` [PATCH 5/5] ipc/sem: use proper list api for pending_list wakeups Davidlohr Bueso
  2016-09-19 18:40 ` [PATCH -next v2 0/5] ipc/sem: semop(2) improvements Manfred Spraul
  5 siblings, 0 replies; 15+ messages in thread
From: Davidlohr Bueso @ 2016-09-18 19:11 UTC (permalink / raw)
  To: akpm; +Cc: manfred, dave, linux-kernel, Davidlohr Bueso

The compiler already does this, but make it explicit. This helper
is really small and also used in update_queue's main loop, which is
O(N^2) scanning. Inline and avoid the function overhead.

Signed-off-by: Davidlohr Bueso <dbueso@suse.de>
---
 ipc/sem.c | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/ipc/sem.c b/ipc/sem.c
index e868b5933ff8..89adba51e85f 100644
--- a/ipc/sem.c
+++ b/ipc/sem.c
@@ -771,7 +771,7 @@ static void unlink_queue(struct sem_array *sma, struct sem_queue *q)
  * modified the array.
  * Note that wait-for-zero operations are handled without restart.
  */
-static int check_restart(struct sem_array *sma, struct sem_queue *q)
+static inline int check_restart(struct sem_array *sma, struct sem_queue *q)
 {
 	/* pending complex alter operations are too difficult to analyse */
 	if (!list_empty(&sma->pending_alter))
-- 
2.6.6

^ permalink raw reply related	[flat|nested] 15+ messages in thread

* [PATCH 5/5] ipc/sem: use proper list api for pending_list wakeups
  2016-09-18 19:11 [PATCH -next v2 0/5] ipc/sem: semop(2) improvements Davidlohr Bueso
                   ` (3 preceding siblings ...)
  2016-09-18 19:11 ` [PATCH 4/5] ipc/sem: explicitly inline check_restart Davidlohr Bueso
@ 2016-09-18 19:11 ` Davidlohr Bueso
  2016-09-19 18:40 ` [PATCH -next v2 0/5] ipc/sem: semop(2) improvements Manfred Spraul
  5 siblings, 0 replies; 15+ messages in thread
From: Davidlohr Bueso @ 2016-09-18 19:11 UTC (permalink / raw)
  To: akpm; +Cc: manfred, dave, linux-kernel, Davidlohr Bueso

... saves some LoC and looks cleaner than re-implementing the
calls.

Acked-by: Manfred Spraul <manfred@colorfullife.com>
Signed-off-by: Davidlohr Bueso <dbueso@suse.de>
---
 ipc/sem.c | 38 +++++++++++++-------------------------
 1 file changed, 13 insertions(+), 25 deletions(-)

diff --git a/ipc/sem.c b/ipc/sem.c
index 89adba51e85f..e7da9821bf46 100644
--- a/ipc/sem.c
+++ b/ipc/sem.c
@@ -812,8 +812,7 @@ static inline int check_restart(struct sem_array *sma, struct sem_queue *q)
 static int wake_const_ops(struct sem_array *sma, int semnum,
 			  struct wake_q_head *wake_q)
 {
-	struct sem_queue *q;
-	struct list_head *walk;
+	struct sem_queue *q, *tmp;
 	struct list_head *pending_list;
 	int semop_completed = 0;
 
@@ -822,25 +821,19 @@ static int wake_const_ops(struct sem_array *sma, int semnum,
 	else
 		pending_list = &sma->sem_base[semnum].pending_const;
 
-	walk = pending_list->next;
-	while (walk != pending_list) {
-		int error;
-
-		q = container_of(walk, struct sem_queue, list);
-		walk = walk->next;
-
-		error = perform_atomic_semop(sma, q);
-
-		if (error <= 0) {
-			/* operation completed, remove from queue & wakeup */
+	list_for_each_entry_safe(q, tmp, pending_list, list) {
+		int error = perform_atomic_semop(sma, q);
 
-			unlink_queue(sma, q);
+		if (error > 0)
+			continue;
+		/* operation completed, remove from queue & wakeup */
+		unlink_queue(sma, q);
 
-			wake_up_sem_queue_prepare(q, error, wake_q);
-			if (error == 0)
-				semop_completed = 1;
-		}
+		wake_up_sem_queue_prepare(q, error, wake_q);
+		if (error == 0)
+			semop_completed = 1;
 	}
+
 	return semop_completed;
 }
 
@@ -913,8 +906,7 @@ static int do_smart_wakeup_zero(struct sem_array *sma, struct sembuf *sops,
  */
 static int update_queue(struct sem_array *sma, int semnum, struct wake_q_head *wake_q)
 {
-	struct sem_queue *q;
-	struct list_head *walk;
+	struct sem_queue *q, *tmp;
 	struct list_head *pending_list;
 	int semop_completed = 0;
 
@@ -924,13 +916,9 @@ static int update_queue(struct sem_array *sma, int semnum, struct wake_q_head *w
 		pending_list = &sma->sem_base[semnum].pending_alter;
 
 again:
-	walk = pending_list->next;
-	while (walk != pending_list) {
+	list_for_each_entry_safe(q, tmp, pending_list, list) {
 		int error, restart;
 
-		q = container_of(walk, struct sem_queue, list);
-		walk = walk->next;
-
 		/* If we are scanning the single sop, per-semaphore list of
 		 * one semaphore and that semaphore is 0, then it is not
 		 * necessary to scan further: simple increments
-- 
2.6.6

^ permalink raw reply related	[flat|nested] 15+ messages in thread

* Re: [PATCH 2/5] ipc/sem: rework task wakeups
  2016-09-18 19:11 ` [PATCH 2/5] ipc/sem: rework task wakeups Davidlohr Bueso
@ 2016-09-19 18:26   ` Manfred Spraul
  0 siblings, 0 replies; 15+ messages in thread
From: Manfred Spraul @ 2016-09-19 18:26 UTC (permalink / raw)
  To: Davidlohr Bueso, akpm; +Cc: linux-kernel, Davidlohr Bueso

On 09/18/2016 09:11 PM, Davidlohr Bueso wrote:
> Our sysv sems have been using the notion of lockless wakeups for a while,
> ever since 0a2b9d4c796 (ipc/sem.c: move wake_up_process out of the spinlock
> section), in order to reduce the sem_lock hold times. This in-house pending
> queue can be replaced by wake_q (just like all the rest of ipc now), in that
> it provides the following advantages:
>
> o Simplifies and gets rid of unnecessary code.
>
> o We get rid of the IN_WAKEUP complexities. Given that wake_q_add() grabs
> reference to the task, if awoken due to an unrelated event, between the
> wake_q_add() and wake_up_q() window, we cannot race with sys_exit and the
> imminent call to wake_up_process().
>
> o By not spinning IN_WAKEUP, we no longer need to disable preemption.
>
> In consequence, the wakeup paths (after schedule(), that is) must acknowledge
> an external signal/event, as well spurious wakeup occurring during the pending
> wakeup window. Obviously no changes in semantics that could be visible to the
> user. The fastpath is _only_ for when we know for sure that we were awoken due
> to a the waker's successful semop call (queue.status is not -EINTR).
>
> On a 48-core Haswell, running the ipcscale 'waitforzero' test, the following
> is seen with increasing thread counts:
>
>                                 v4.8-rc5                v4.8-rc5
>                                                          semopv2
> Hmean    sembench-sem-2      574733.00 (  0.00%)   578322.00 (  0.62%)
> Hmean    sembench-sem-8      811708.00 (  0.00%)   824689.00 (  1.59%)
> Hmean    sembench-sem-12     842448.00 (  0.00%)   845409.00 (  0.35%)
> Hmean    sembench-sem-21     933003.00 (  0.00%)   977748.00 (  4.80%)
> Hmean    sembench-sem-48     935910.00 (  0.00%)  1004759.00 (  7.36%)
> Hmean    sembench-sem-79     937186.00 (  0.00%)   983976.00 (  4.99%)
> Hmean    sembench-sem-234    974256.00 (  0.00%)  1060294.00 (  8.83%)
> Hmean    sembench-sem-265    975468.00 (  0.00%)  1016243.00 (  4.18%)
> Hmean    sembench-sem-296    991280.00 (  0.00%)  1042659.00 (  5.18%)
> Hmean    sembench-sem-327    975415.00 (  0.00%)  1029977.00 (  5.59%)
> Hmean    sembench-sem-358   1014286.00 (  0.00%)  1049624.00 (  3.48%)
> Hmean    sembench-sem-389    972939.00 (  0.00%)  1043127.00 (  7.21%)
> Hmean    sembench-sem-420    981909.00 (  0.00%)  1056747.00 (  7.62%)
> Hmean    sembench-sem-451    990139.00 (  0.00%)  1051609.00 (  6.21%)
> Hmean    sembench-sem-482    965735.00 (  0.00%)  1040313.00 (  7.72%)
>
> Signed-off-by: Davidlohr Bueso <dbueso@suse.de>
Acked-by: Manfred Spraul <manfred@colorfullife.com>

--
     Manfred

^ permalink raw reply	[flat|nested] 15+ messages in thread

* Re: [PATCH -next v2 0/5] ipc/sem: semop(2) improvements
  2016-09-18 19:11 [PATCH -next v2 0/5] ipc/sem: semop(2) improvements Davidlohr Bueso
                   ` (4 preceding siblings ...)
  2016-09-18 19:11 ` [PATCH 5/5] ipc/sem: use proper list api for pending_list wakeups Davidlohr Bueso
@ 2016-09-19 18:40 ` Manfred Spraul
  2016-09-20 15:03   ` Davidlohr Bueso
  5 siblings, 1 reply; 15+ messages in thread
From: Manfred Spraul @ 2016-09-19 18:40 UTC (permalink / raw)
  To: Davidlohr Bueso, akpm; +Cc: linux-kernel

On 09/18/2016 09:11 PM, Davidlohr Bueso wrote:
> Changes from v1 (https://lkml.org/lkml/2016/9/12/266)
> - Got rid of the signal_pending check in wakeup fastpath. (patch 2)
> - Added read/access once to queue.status (we're obviously concerned about
>   lockless access upon unrelated events, even if on the stack).
> - Got rid of initializing wake_q and wake_up_q call upon perform_atomic_semop
>    error return path. (patch 2)
> - Documented ordering between wake_q_add and setting ->status.
> - What I did not do was refactor the checks in perfor_atomic_semop[_slow]
>    as I could not get a decent/clean way of doing it without adding more
>    unnecessary code. If we wanted to do smart semop scans that we received from
>    userspace, this would still need to be done under sem_lock for semval values
>    obviously. So I've left it as is, where we mainly duplicate the function, but
>    I still believe this is the most straightforward way of dealing with this
>    situation  (patch 3).
> - Replaced using SEMOP_FAST with BITS_PER_LONG, as this is really what we want
>    to limit the duplicate scanning.
> - More testing.
> - Added Manfred's ack (patch 5).
>
> Hi,
>
> Here are a few updates around the semop syscall handling that I noticed while
> reviewing Manfred's simple vs complex ops fixes. Changes are on top of -next,
> which means that Manfred's pending patches to ipc/sem.c that remove the redundant
> barrier(s) would probably have to be rebased.
>
> The patchset has survived the following testscases:
> - ltp
> - ipcsemtest (https://github.com/manfred-colorfu/ipcsemtest)
> - ipcscale (https://github.com/manfred-colorfu/ipcscale)
>
> Details are in each individual patch. Please consider for v4.9.
>
> Thanks!
>
> Davidlohr Bueso (5):
>    ipc/sem: do not call wake_sem_queue_do() prematurely
The only patch that I don't like.
Especially: patch 2 of the series removes the wake_up_q from the 
function epilogue.
So only the code duplication (additional instances of rcu_read_unlock()) 
remains, I don't see any advantages.

>    ipc/sem: rework task wakeups
Acked
>    ipc/sem: optimize perform_atomic_semop()
I'm still thinking about it.
Code duplication is evil, but perhaps it is the best solution.

What I don't like is the hardcoded "< BITS_PER_LONG".
At least:
- (1 << sop->sem_num)
+ (1 << (sop->sem_num%BITS_PER_LONG))
>    ipc/sem: explicitly inline check_restart
Do we really need that? Isn't that the compiler's task?
Especially since the compiler is already doing it correctly.
>    ipc/sem: use proper list api for pending_list wakeups
Acked
>   ipc/sem.c | 415 ++++++++++++++++++++++++++++++--------------------------------
>   1 file changed, 199 insertions(+), 216 deletions(-)
>
--

     Manfred

^ permalink raw reply	[flat|nested] 15+ messages in thread

* Re: [PATCH -next v2 0/5] ipc/sem: semop(2) improvements
  2016-09-19 18:40 ` [PATCH -next v2 0/5] ipc/sem: semop(2) improvements Manfred Spraul
@ 2016-09-20 15:03   ` Davidlohr Bueso
  0 siblings, 0 replies; 15+ messages in thread
From: Davidlohr Bueso @ 2016-09-20 15:03 UTC (permalink / raw)
  To: Manfred Spraul; +Cc: akpm, linux-kernel

On Mon, 19 Sep 2016, Manfred Spraul wrote:
>On 09/18/2016 09:11 PM, Davidlohr Bueso wrote:

>>Davidlohr Bueso (5):
>>   ipc/sem: do not call wake_sem_queue_do() prematurely
>The only patch that I don't like.
>Especially: patch 2 of the series removes the wake_up_q from the 
>function epilogue.
>So only the code duplication (additional instances of 
>rcu_read_unlock()) remains, I don't see any advantages.
>
>>   ipc/sem: rework task wakeups
>Acked

Thanks.

>>   ipc/sem: optimize perform_atomic_semop()
>I'm still thinking about it.
>Code duplication is evil, but perhaps it is the best solution.
>
>What I don't like is the hardcoded "< BITS_PER_LONG".
>At least:
>- (1 << sop->sem_num)
>+ (1 << (sop->sem_num%BITS_PER_LONG))

Yeah, I'll send v3 for that.

>>   ipc/sem: explicitly inline check_restart
>Do we really need that? Isn't that the compiler's task?
>Especially since the compiler is already doing it correctly.

Yes, I mentioned in the changelog that the compiler does it and this is
merely explicit. That said I see no harm in it, I guess whatever akpm says.

>>   ipc/sem: use proper list api for pending_list wakeups
>Acked

Thanks,
Davidlohr

^ permalink raw reply	[flat|nested] 15+ messages in thread

* [PATCH v3] ipc/sem: optimize perform_atomic_semop()
  2016-09-18 19:11 ` [PATCH 3/5] ipc/sem: optimize perform_atomic_semop() Davidlohr Bueso
@ 2016-09-21 19:46   ` Davidlohr Bueso
  0 siblings, 0 replies; 15+ messages in thread
From: Davidlohr Bueso @ 2016-09-21 19:46 UTC (permalink / raw)
  To: akpm; +Cc: manfred, linux-kernel, Davidlohr Bueso

This is the main workhorse that deals with semop user calls
such that the waitforzero or semval update operations, on the
set, can complete on not as the sma currently stands. Currently,
the set is iterated twice (setting semval, then backwards for
the sempid value). Slowpaths, and particularly SEM_UNDO calls,
must undo any altered sem when it is detected that the caller
must block or has errored-out.

With larger sets, there can occur situations where this involves
a lot of cycles and can obviously be a suboptimal use of cached
resources in shared memory. Ie, discarding CPU caches that are
also calling semop and have the sembuf cached (and can complete),
while the current lock holder doing the semop will block, error,
or does a waitforzero operation.

This patch proposes still iterating the set twice, but the first
scan is read-only, and we perform the actual updates afterward,
once we know that the call will succeed. In order to not suffer
from the overhead of dealing with sops that act on the same sem_num,
such (rare) cases use perform_atomic_semop_slow(), which is exactly
what we have now. Duplicates are detected before grabbing sem_lock,
and uses simple a 32/64-bit hash array variable to based on the sem_num
we are working on.

In addition add some comments to when we expect to the caller
to block.

Signed-off-by: Davidlohr Bueso <dbueso@suse.de>
---

Change from v2: Improved dup detection for working on larger sets
as well as support for test-for-zero-and-increase.

 ipc/sem.c | 111 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++------
 1 file changed, 101 insertions(+), 10 deletions(-)

diff --git a/ipc/sem.c b/ipc/sem.c
index 4d836035f9bb..008c5cfcccb7 100644
--- a/ipc/sem.c
+++ b/ipc/sem.c
@@ -115,7 +115,8 @@ struct sem_queue {
 	struct sembuf		*sops;	 /* array of pending operations */
 	struct sembuf		*blocking; /* the operation that blocked */
 	int			nsops;	 /* number of operations */
-	int			alter;	 /* does *sops alter the array? */
+	bool			alter;	 /* does *sops alter the array? */
+	bool                    dupsop;	 /* sops on more than one sem_num */
 };
 
 /* Each task has a list of undo requests. They are executed automatically
@@ -587,15 +588,23 @@ SYSCALL_DEFINE3(semget, key_t, key, int, nsems, int, semflg)
 }
 
 /**
- * perform_atomic_semop - Perform (if possible) a semaphore operation
+ * perform_atomic_semop[_slow] - Attempt to perform semaphore
+ *                               operations on a given array.
  * @sma: semaphore array
  * @q: struct sem_queue that describes the operation
  *
+ * Caller blocking are as follows, based the value
+ * indicated by the semaphore operation (sem_op):
+ *
+ *  (1) >0 never blocks.
+ *  (2)  0 (wait-for-zero operation): semval is non-zero.
+ *  (3) <0 attempting to decrement semval to a value smaller than zero.
+ *
  * Returns 0 if the operation was possible.
  * Returns 1 if the operation is impossible, the caller must sleep.
- * Negative values are error codes.
+ * Returns <0 for error codes.
  */
-static int perform_atomic_semop(struct sem_array *sma, struct sem_queue *q)
+static int perform_atomic_semop_slow(struct sem_array *sma, struct sem_queue *q)
 {
 	int result, sem_op, nsops, pid;
 	struct sembuf *sop;
@@ -666,6 +675,71 @@ undo:
 	return result;
 }
 
+static int perform_atomic_semop(struct sem_array *sma, struct sem_queue *q)
+{
+	int result, sem_op, nsops;
+	struct sembuf *sop;
+	struct sem *curr;
+	struct sembuf *sops;
+	struct sem_undo *un;
+
+	sops = q->sops;
+	nsops = q->nsops;
+	un = q->undo;
+
+	if (unlikely(q->dupsop))
+		return perform_atomic_semop_slow(sma, q);
+
+	/*
+	 * We scan the semaphore set twice, first to ensure that the entire
+	 * operation can succeed, therefore avoiding any pointless writes
+	 * to shared memory and having to undo such changes in order to block
+	 * until the operations can go through.
+	 */
+	for (sop = sops; sop < sops + nsops; sop++) {
+		curr = sma->sem_base + sop->sem_num;
+		sem_op = sop->sem_op;
+		result = curr->semval;
+
+		if (!sem_op && result)
+			goto would_block; /* wait-for-zero */
+
+		result += sem_op;
+		if (result < 0)
+			goto would_block;
+
+		if (result > SEMVMX)
+			return -ERANGE;
+
+		if (sop->sem_flg & SEM_UNDO) {
+			int undo = un->semadj[sop->sem_num] - sem_op;
+
+			/* Exceeding the undo range is an error. */
+			if (undo < (-SEMAEM - 1) || undo > SEMAEM)
+				return -ERANGE;
+		}
+	}
+
+	for (sop = sops; sop < sops + nsops; sop++) {
+		curr = sma->sem_base + sop->sem_num;
+		sem_op = sop->sem_op;
+		result = curr->semval;
+
+		if (sop->sem_flg & SEM_UNDO) {
+			int undo = un->semadj[sop->sem_num] - sem_op;
+			un->semadj[sop->sem_num] = undo;
+		}
+		curr->semval += sem_op;
+		curr->sempid = q->pid;
+	}
+
+	return 0;
+
+would_block:
+	q->blocking = sop;
+	return sop->sem_flg & IPC_NOWAIT? -EAGAIN : 1;
+}
+
 static inline void wake_up_sem_queue_prepare(struct sem_queue *q, int error,
 					     struct wake_q_head *wake_q)
 {
@@ -1720,9 +1794,10 @@ SYSCALL_DEFINE4(semtimedop, int, semid, struct sembuf __user *, tsops,
 	struct sembuf fast_sops[SEMOPM_FAST];
 	struct sembuf *sops = fast_sops, *sop;
 	struct sem_undo *un;
-	int undos = 0, alter = 0, max, locknum;
+	int max, locknum;
+	bool undos = false, alter = false, dupsop = false;
 	struct sem_queue queue;
-	unsigned long jiffies_left = 0;
+	unsigned long dup = 0, jiffies_left = 0;
 	struct ipc_namespace *ns;
 
 	ns = current->nsproxy->ipc_ns;
@@ -1736,10 +1811,12 @@ SYSCALL_DEFINE4(semtimedop, int, semid, struct sembuf __user *, tsops,
 		if (sops == NULL)
 			return -ENOMEM;
 	}
+
 	if (copy_from_user(sops, tsops, nsops * sizeof(*tsops))) {
 		error =  -EFAULT;
 		goto out_free;
 	}
+
 	if (timeout) {
 		struct timespec _timeout;
 		if (copy_from_user(&_timeout, timeout, sizeof(*timeout))) {
@@ -1753,17 +1830,30 @@ SYSCALL_DEFINE4(semtimedop, int, semid, struct sembuf __user *, tsops,
 		}
 		jiffies_left = timespec_to_jiffies(&_timeout);
 	}
+
 	max = 0;
 	for (sop = sops; sop < sops + nsops; sop++) {
+		unsigned long mask = 1 << ((sop->sem_num) % BITS_PER_LONG);
+
 		if (sop->sem_num >= max)
 			max = sop->sem_num;
 		if (sop->sem_flg & SEM_UNDO)
-			undos = 1;
-		if (sop->sem_op != 0)
-			alter = 1;
+			undos = true;
+		if (dup & mask) {
+			/*
+			 * There was a previous alter access that appears
+			 * to have accessed the same semaphore, thus use
+			 * the dupsop logic. "appears", because the detection
+			 * can only check % BITS_PER_LONG.
+			 */
+			dupsop = true;
+		}
+		if (sop->sem_op != 0) {
+			alter = true;
+			dup |= mask;
+		}
 	}
 
-
 	if (undos) {
 		/* On success, find_alloc_undo takes the rcu_read_lock */
 		un = find_alloc_undo(ns, semid);
@@ -1828,6 +1918,7 @@ SYSCALL_DEFINE4(semtimedop, int, semid, struct sembuf __user *, tsops,
 	queue.undo = un;
 	queue.pid = task_tgid_vnr(current);
 	queue.alter = alter;
+	queue.dupsop = dupsop;
 
 	error = perform_atomic_semop(sma, &queue);
 	if (error == 0) { /* non-blocking succesfull path */
-- 
2.6.6

^ permalink raw reply related	[flat|nested] 15+ messages in thread

* Re: [PATCH 2/5] ipc/sem: rework task wakeups
  2016-09-18 14:37   ` Manfred Spraul
@ 2016-09-18 18:26     ` Davidlohr Bueso
  0 siblings, 0 replies; 15+ messages in thread
From: Davidlohr Bueso @ 2016-09-18 18:26 UTC (permalink / raw)
  To: Manfred Spraul; +Cc: akpm, linux-kernel, Davidlohr Bueso

On Sun, 18 Sep 2016, Manfred Spraul wrote:
>>+ <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<< Why this empty line?

That's my fat fingers, will remove it.

>>+		}
>>+
>>+		sem_unlock(sma, locknum);
>>+		rcu_read_unlock();
>>+		wake_up_q(&wake_q);
>>+
>>+		goto out_free;
>>  	}
>>-	if (error <= 0)
>>-		goto out_unlock_free;
>I don't see the strategy:
>I've used the approach that cleanup is at the end, to reduce 
>duplicated code, even if it means that error codepaths unnecessarily 
>call wakeup for an empty list and that the list is always initialized.
>
>With patch 1 of the series, you start to optimize for that.
>Now this patch reintroduces some wake_up_q calls for error paths.

Well yes, but this is a much more self contained than what we currently have
in that at least perform_atomic_semop() was called. Yes, an error path will
still call wake_up_q unnecessarily, but its pretty obvious what's going on within
that error <= 0 condition. I really don't think this is a big deal. In addition
the general exit path of the function is also slightly cleaned up as a consequence.

>So: What is the aim?
>I would propose to skip patch 1 and leave the wake_up_q at the end.
>
>Or, if we really want to avoid the wakeup calls, then do it entirely.
>Perhaps:
>> if(error == 0) { /* nonblocking codepath 1, with wakeups */
>> [...]
>> }
>> if (error < 0} goto out_unlock_free;
>>
>This would have an advantage, because the WAKE_Q would be initialized 
>only when needed

Sure. Note that we can even get picky with this in semctl calls, but I'm
ok with some unnecessary initialization and wake_up_q paths. Please shout
if you really want me to change them and I can add followup patches, although
I suspect you'll agree.

Thanks,
Davidlohr

^ permalink raw reply	[flat|nested] 15+ messages in thread

* Re: [PATCH 2/5] ipc/sem: rework task wakeups
  2016-09-12 11:53 ` [PATCH 2/5] ipc/sem: rework task wakeups Davidlohr Bueso
  2016-09-13 18:04   ` Manfred Spraul
@ 2016-09-18 14:37   ` Manfred Spraul
  2016-09-18 18:26     ` Davidlohr Bueso
  1 sibling, 1 reply; 15+ messages in thread
From: Manfred Spraul @ 2016-09-18 14:37 UTC (permalink / raw)
  To: Davidlohr Bueso, akpm; +Cc: linux-kernel, Davidlohr Bueso

Hi Davidlohr,

On 09/12/2016 01:53 PM, Davidlohr Bueso wrote:
> @@ -1933,22 +1823,32 @@ SYSCALL_DEFINE4(semtimedop, int, semid, struct sembuf __user *, tsops,
>   	queue.alter = alter;
>   
>   	error = perform_atomic_semop(sma, &queue);
> -	if (error == 0) {
> -		/* If the operation was successful, then do
> +	if (error <= 0) { /* non-blocking path */
> +		WAKE_Q(wake_q);
> +
> +		/*
> +		 * If the operation was successful, then do
>   		 * the required updates.
>   		 */
> -		if (alter)
> -			do_smart_update(sma, sops, nsops, 1, &tasks);
> -		else
> -			set_semotime(sma, sops);
> +		if (error == 0) {
> +			if (alter)
> +				do_smart_update(sma, sops, nsops, 1, &wake_q);
> +			else
> +				set_semotime(sma, sops);
> + <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<< Why this empty line?
> +		}
> +
> +		sem_unlock(sma, locknum);
> +		rcu_read_unlock();
> +		wake_up_q(&wake_q);
> +
> +		goto out_free;
>   	}
> -	if (error <= 0)
> -		goto out_unlock_free;
I don't see the strategy:
I've used the approach that cleanup is at the end, to reduce duplicated 
code, even if it means that error codepaths unnecessarily call wakeup 
for an empty list and that the list is always initialized.

With patch 1 of the series, you start to optimize for that.
Now this patch reintroduces some wake_up_q calls for error paths.

So: What is the aim?
I would propose to skip patch 1 and leave the wake_up_q at the end.

Or, if we really want to avoid the wakeup calls, then do it entirely.
Perhaps:
 > if(error == 0) { /* nonblocking codepath 1, with wakeups */
 > [...]
 > }
 > if (error < 0} goto out_unlock_free;
 >
This would have an advantage, because the WAKE_Q would be initialized 
only when needed

--
     Manfred

^ permalink raw reply	[flat|nested] 15+ messages in thread

* Re: [PATCH 2/5] ipc/sem: rework task wakeups
  2016-09-13 18:04   ` Manfred Spraul
@ 2016-09-14 15:45     ` Davidlohr Bueso
  0 siblings, 0 replies; 15+ messages in thread
From: Davidlohr Bueso @ 2016-09-14 15:45 UTC (permalink / raw)
  To: Manfred Spraul; +Cc: akpm, linux-kernel, Davidlohr Bueso

On Tue, 13 Sep 2016, Manfred Spraul wrote:

>>+	if ((error = queue.status) != -EINTR && !signal_pending(current)) {
>>+		/*
>>+		 * User space could assume that semop() is a memory barrier:
>>+		 * Without the mb(), the cpu could speculatively read in user
>>+		 * space stale data that was overwritten by the previous owner
>>+		 * of the semaphore.
>>  		 */
>>  		smp_mb();
>>-
>>  		goto out_free;
>>  	}
>>  	rcu_read_lock();
>>  	sma = sem_obtain_lock(ns, semid, sops, nsops, &locknum);

>What is the purpose of the !signal_pending()?
>Even if there is a signal: If someone has set queue.status, then our 
>semaphore operations completed and we must return that result code.

It was a way of detecting being awoken by an unrelated event while the task
is marked for wakeup and wake_up_process is still not called and force the
slowpath. The same window that get_queue_result deals with by busy waiting
when IN_WAKEUP.

>Obviously: At syscall return, the syscall return code will notice the 
>pending signal and immediately the signal handler is called, but I 
>don't see that this prevents us from using the fast path.

Right, and we cannot race with sys_exit. Will drop the signal check.

>And, at least my opinion: I would avoid placing the error= into the if().

Sure, agreed.

Thanks,
Davidlohr

^ permalink raw reply	[flat|nested] 15+ messages in thread

* Re: [PATCH 2/5] ipc/sem: rework task wakeups
  2016-09-12 11:53 ` [PATCH 2/5] ipc/sem: rework task wakeups Davidlohr Bueso
@ 2016-09-13 18:04   ` Manfred Spraul
  2016-09-14 15:45     ` Davidlohr Bueso
  2016-09-18 14:37   ` Manfred Spraul
  1 sibling, 1 reply; 15+ messages in thread
From: Manfred Spraul @ 2016-09-13 18:04 UTC (permalink / raw)
  To: Davidlohr Bueso, akpm; +Cc: linux-kernel, Davidlohr Bueso

Hi Davidlohr,

On 09/12/2016 01:53 PM, Davidlohr Bueso wrote:
> Hmean    sembench-sem-482    965735.00 (  0.00%)  1040313.00 (  7.72%)

[...]

> Signed-off-by: Davidlohr Bueso <dbueso@suse.de>
> ---
>   ipc/sem.c | 268 +++++++++++++++++++-------------------------------------------
>   1 file changed, 83 insertions(+), 185 deletions(-)
Nice speedup, nice amount of removed duplicated code.
> diff --git a/ipc/sem.c b/ipc/sem.c
> index a4e8bb2fae38..86467b5b78ad 100644
[...]
>   
> -	error = get_queue_result(&queue);
> -
> -	if (error != -EINTR) {
> -		/* fast path: update_queue already obtained all requested
> -		 * resources.
> -		 * Perform a smp_mb(): User space could assume that semop()
> -		 * is a memory barrier: Without the mb(), the cpu could
> -		 * speculatively read in user space stale data that was
> -		 * overwritten by the previous owner of the semaphore.
> +	/*
> +	 * fastpath: the semop has completed, either successfully or not, from
> +	 * the syscall pov, is quite irrelevant to us at this point; we're done.
> +	 *
> +	 * We _do_ care, nonetheless, about being awoken by a signal or spuriously.
> +	 * The queue.status is checked again in the slowpath (aka after taking
> +	 * sem_lock), such that we can detect scenarios where we were awakened
> +	 * externally, during the window between wake_q_add() and wake_up_q().
> +	 */
> +	if ((error = queue.status) != -EINTR && !signal_pending(current)) {
> +		/*
> +		 * User space could assume that semop() is a memory barrier:
> +		 * Without the mb(), the cpu could speculatively read in user
> +		 * space stale data that was overwritten by the previous owner
> +		 * of the semaphore.
>   		 */
>   		smp_mb();
> -
>   		goto out_free;
>   	}
>   
>   	rcu_read_lock();
>   	sma = sem_obtain_lock(ns, semid, sops, nsops, &locknum);
What is the purpose of the !signal_pending()?
Even if there is a signal: If someone has set queue.status, then our 
semaphore operations completed and we must return that result code.
Obviously: At syscall return, the syscall return code will notice the 
pending signal and immediately the signal handler is called, but I don't 
see that this prevents us from using the fast path.

And, at least my opinion: I would avoid placing the error= into the if().

--
     Manfred

--

^ permalink raw reply	[flat|nested] 15+ messages in thread

* [PATCH 2/5] ipc/sem: rework task wakeups
  2016-09-12 11:53 [PATCH -next " Davidlohr Bueso
@ 2016-09-12 11:53 ` Davidlohr Bueso
  2016-09-13 18:04   ` Manfred Spraul
  2016-09-18 14:37   ` Manfred Spraul
  0 siblings, 2 replies; 15+ messages in thread
From: Davidlohr Bueso @ 2016-09-12 11:53 UTC (permalink / raw)
  To: akpm; +Cc: manfred, dave, linux-kernel, Davidlohr Bueso

Our sysv sems have been using the notion of lockless wakeups for a while,
ever since 0a2b9d4c796 (ipc/sem.c: move wake_up_process out of the spinlock
section), in order to reduce the sem_lock hold times. This in-house pending
queue can be replaced by wake_q (just like all the rest of ipc now), in that
it provides the following advantages:

o Simplifies and gets rid of unnecessary code.

o We get rid of the IN_WAKEUP complexities. Given that wake_q_add() grabs
reference to the task, if awoken due to an unrelated event, between the
wake_q_add() and wake_up_q() window, we cannot race with sys_exit and the
imminent call to wake_up_process().

o By not spinning IN_WAKEUP, we no longer need to disable preemption.

In consequence, the wakeup paths (after schedule(), that is) must acknowledge
an external signal/event, as well spurious wakeup occurring during the pending
wakeup window. Obviously no changes in semantics that could be visible to the
user. The fastpath is _only_ for when we know for sure that we were awoken due
to a the waker's successful semop call (queue.status is not -EINTR).

On a 48-core Haswell, running the ipcscale 'waitforzero' test, the following
is seen with increasing thread counts:

                               v4.8-rc5                v4.8-rc5
                                                        semopv1
Hmean    sembench-sem-2      574733.00 (  0.00%)   578322.00 (  0.62%)
Hmean    sembench-sem-8      811708.00 (  0.00%)   824689.00 (  1.59%)
Hmean    sembench-sem-12     842448.00 (  0.00%)   845409.00 (  0.35%)
Hmean    sembench-sem-21     933003.00 (  0.00%)   977748.00 (  4.80%)
Hmean    sembench-sem-48     935910.00 (  0.00%)  1004759.00 (  7.36%)
Hmean    sembench-sem-79     937186.00 (  0.00%)   983976.00 (  4.99%)
Hmean    sembench-sem-234    974256.00 (  0.00%)  1060294.00 (  8.83%)
Hmean    sembench-sem-265    975468.00 (  0.00%)  1016243.00 (  4.18%)
Hmean    sembench-sem-296    991280.00 (  0.00%)  1042659.00 (  5.18%)
Hmean    sembench-sem-327    975415.00 (  0.00%)  1029977.00 (  5.59%)
Hmean    sembench-sem-358   1014286.00 (  0.00%)  1049624.00 (  3.48%)
Hmean    sembench-sem-389    972939.00 (  0.00%)  1043127.00 (  7.21%)
Hmean    sembench-sem-420    981909.00 (  0.00%)  1056747.00 (  7.62%)
Hmean    sembench-sem-451    990139.00 (  0.00%)  1051609.00 (  6.21%)
Hmean    sembench-sem-482    965735.00 (  0.00%)  1040313.00 (  7.72%)
  
Signed-off-by: Davidlohr Bueso <dbueso@suse.de>
---
 ipc/sem.c | 268 +++++++++++++++++++-------------------------------------------
 1 file changed, 83 insertions(+), 185 deletions(-)

diff --git a/ipc/sem.c b/ipc/sem.c
index a4e8bb2fae38..86467b5b78ad 100644
--- a/ipc/sem.c
+++ b/ipc/sem.c
@@ -13,6 +13,7 @@
  * (c) 2003 Manfred Spraul <manfred@colorfullife.com>
  * Further wakeup optimizations, documentation
  * (c) 2010 Manfred Spraul <manfred@colorfullife.com>
+ * (c) 2016 Davidlohr Bueso <dave@stgolabs.net>
  *
  * support for audit of ipc object properties and permission changes
  * Dustin Kirkland <dustin.kirkland@us.ibm.com>
@@ -53,15 +54,11 @@
  *   Semaphores are actively given to waiting tasks (necessary for FIFO).
  *   (see update_queue())
  * - To improve the scalability, the actual wake-up calls are performed after
- *   dropping all locks. (see wake_up_sem_queue_prepare(),
- *   wake_up_sem_queue_do())
+ *   dropping all locks. (see wake_up_sem_queue_prepare())
  * - All work is done by the waker, the woken up task does not have to do
  *   anything - not even acquiring a lock or dropping a refcount.
  * - A woken up task may not even touch the semaphore array anymore, it may
  *   have been destroyed already by a semctl(RMID).
- * - The synchronizations between wake-ups due to a timeout/signal and a
- *   wake-up due to a completed semaphore operation is achieved by using an
- *   intermediate state (IN_WAKEUP).
  * - UNDO values are stored in an array (one per process and per
  *   semaphore array, lazily allocated). For backwards compatibility, multiple
  *   modes for the UNDO variables are supported (per process, per thread)
@@ -471,40 +468,6 @@ static inline void sem_rmid(struct ipc_namespace *ns, struct sem_array *s)
 	ipc_rmid(&sem_ids(ns), &s->sem_perm);
 }
 
-/*
- * Lockless wakeup algorithm:
- * Without the check/retry algorithm a lockless wakeup is possible:
- * - queue.status is initialized to -EINTR before blocking.
- * - wakeup is performed by
- *	* unlinking the queue entry from the pending list
- *	* setting queue.status to IN_WAKEUP
- *	  This is the notification for the blocked thread that a
- *	  result value is imminent.
- *	* call wake_up_process
- *	* set queue.status to the final value.
- * - the previously blocked thread checks queue.status:
- *	* if it's IN_WAKEUP, then it must wait until the value changes
- *	* if it's not -EINTR, then the operation was completed by
- *	  update_queue. semtimedop can return queue.status without
- *	  performing any operation on the sem array.
- *	* otherwise it must acquire the spinlock and check what's up.
- *
- * The two-stage algorithm is necessary to protect against the following
- * races:
- * - if queue.status is set after wake_up_process, then the woken up idle
- *   thread could race forward and try (and fail) to acquire sma->lock
- *   before update_queue had a chance to set queue.status
- * - if queue.status is written before wake_up_process and if the
- *   blocked process is woken up by a signal between writing
- *   queue.status and the wake_up_process, then the woken up
- *   process could return from semtimedop and die by calling
- *   sys_exit before wake_up_process is called. Then wake_up_process
- *   will oops, because the task structure is already invalid.
- *   (yes, this happened on s390 with sysv msg).
- *
- */
-#define IN_WAKEUP	1
-
 /**
  * newary - Create a new semaphore set
  * @ns: namespace
@@ -703,51 +666,11 @@ undo:
 	return result;
 }
 
-/** wake_up_sem_queue_prepare(q, error): Prepare wake-up
- * @q: queue entry that must be signaled
- * @error: Error value for the signal
- *
- * Prepare the wake-up of the queue entry q.
- */
-static void wake_up_sem_queue_prepare(struct list_head *pt,
-				struct sem_queue *q, int error)
-{
-	if (list_empty(pt)) {
-		/*
-		 * Hold preempt off so that we don't get preempted and have the
-		 * wakee busy-wait until we're scheduled back on.
-		 */
-		preempt_disable();
-	}
-	q->status = IN_WAKEUP;
-	q->pid = error;
-
-	list_add_tail(&q->list, pt);
-}
-
-/**
- * wake_up_sem_queue_do - do the actual wake-up
- * @pt: list of tasks to be woken up
- *
- * Do the actual wake-up.
- * The function is called without any locks held, thus the semaphore array
- * could be destroyed already and the tasks can disappear as soon as the
- * status is set to the actual return code.
- */
-static void wake_up_sem_queue_do(struct list_head *pt)
+static inline void wake_up_sem_queue_prepare(struct sem_queue *q, int error,
+					     struct wake_q_head *wake_q)
 {
-	struct sem_queue *q, *t;
-	int did_something;
-
-	did_something = !list_empty(pt);
-	list_for_each_entry_safe(q, t, pt, list) {
-		wake_up_process(q->sleeper);
-		/* q can disappear immediately after writing q->status. */
-		smp_wmb();
-		q->status = q->pid;
-	}
-	if (did_something)
-		preempt_enable();
+	wake_q_add(wake_q, q->sleeper);
+	q->status = error;
 }
 
 static void unlink_queue(struct sem_array *sma, struct sem_queue *q)
@@ -795,18 +718,18 @@ static int check_restart(struct sem_array *sma, struct sem_queue *q)
  * wake_const_ops - wake up non-alter tasks
  * @sma: semaphore array.
  * @semnum: semaphore that was modified.
- * @pt: list head for the tasks that must be woken up.
+ * @wake_q: lockless wake-queue head.
  *
  * wake_const_ops must be called after a semaphore in a semaphore array
  * was set to 0. If complex const operations are pending, wake_const_ops must
  * be called with semnum = -1, as well as with the number of each modified
  * semaphore.
- * The tasks that must be woken up are added to @pt. The return code
+ * The tasks that must be woken up are added to @wake_q. The return code
  * is stored in q->pid.
  * The function returns 1 if at least one operation was completed successfully.
  */
 static int wake_const_ops(struct sem_array *sma, int semnum,
-				struct list_head *pt)
+			  struct wake_q_head *wake_q)
 {
 	struct sem_queue *q;
 	struct list_head *walk;
@@ -832,7 +755,7 @@ static int wake_const_ops(struct sem_array *sma, int semnum,
 
 			unlink_queue(sma, q);
 
-			wake_up_sem_queue_prepare(pt, q, error);
+			wake_up_sem_queue_prepare(q, error, wake_q);
 			if (error == 0)
 				semop_completed = 1;
 		}
@@ -845,14 +768,14 @@ static int wake_const_ops(struct sem_array *sma, int semnum,
  * @sma: semaphore array
  * @sops: operations that were performed
  * @nsops: number of operations
- * @pt: list head of the tasks that must be woken up.
+ * @wake_q: lockless wake-queue head
  *
  * Checks all required queue for wait-for-zero operations, based
  * on the actual changes that were performed on the semaphore array.
  * The function returns 1 if at least one operation was completed successfully.
  */
 static int do_smart_wakeup_zero(struct sem_array *sma, struct sembuf *sops,
-					int nsops, struct list_head *pt)
+				int nsops, struct wake_q_head *wake_q)
 {
 	int i;
 	int semop_completed = 0;
@@ -865,7 +788,7 @@ static int do_smart_wakeup_zero(struct sem_array *sma, struct sembuf *sops,
 
 			if (sma->sem_base[num].semval == 0) {
 				got_zero = 1;
-				semop_completed |= wake_const_ops(sma, num, pt);
+				semop_completed |= wake_const_ops(sma, num, wake_q);
 			}
 		}
 	} else {
@@ -876,7 +799,7 @@ static int do_smart_wakeup_zero(struct sem_array *sma, struct sembuf *sops,
 		for (i = 0; i < sma->sem_nsems; i++) {
 			if (sma->sem_base[i].semval == 0) {
 				got_zero = 1;
-				semop_completed |= wake_const_ops(sma, i, pt);
+				semop_completed |= wake_const_ops(sma, i, wake_q);
 			}
 		}
 	}
@@ -885,7 +808,7 @@ static int do_smart_wakeup_zero(struct sem_array *sma, struct sembuf *sops,
 	 * then check the global queue, too.
 	 */
 	if (got_zero)
-		semop_completed |= wake_const_ops(sma, -1, pt);
+		semop_completed |= wake_const_ops(sma, -1, wake_q);
 
 	return semop_completed;
 }
@@ -895,19 +818,19 @@ static int do_smart_wakeup_zero(struct sem_array *sma, struct sembuf *sops,
  * update_queue - look for tasks that can be completed.
  * @sma: semaphore array.
  * @semnum: semaphore that was modified.
- * @pt: list head for the tasks that must be woken up.
+ * @wake_q: lockless wake-queue head.
  *
  * update_queue must be called after a semaphore in a semaphore array
  * was modified. If multiple semaphores were modified, update_queue must
  * be called with semnum = -1, as well as with the number of each modified
  * semaphore.
- * The tasks that must be woken up are added to @pt. The return code
+ * The tasks that must be woken up are added to @wake_q. The return code
  * is stored in q->pid.
  * The function internally checks if const operations can now succeed.
  *
  * The function return 1 if at least one semop was completed successfully.
  */
-static int update_queue(struct sem_array *sma, int semnum, struct list_head *pt)
+static int update_queue(struct sem_array *sma, int semnum, struct wake_q_head *wake_q)
 {
 	struct sem_queue *q;
 	struct list_head *walk;
@@ -949,11 +872,11 @@ again:
 			restart = 0;
 		} else {
 			semop_completed = 1;
-			do_smart_wakeup_zero(sma, q->sops, q->nsops, pt);
+			do_smart_wakeup_zero(sma, q->sops, q->nsops, wake_q);
 			restart = check_restart(sma, q);
 		}
 
-		wake_up_sem_queue_prepare(pt, q, error);
+		wake_up_sem_queue_prepare(q, error, wake_q);
 		if (restart)
 			goto again;
 	}
@@ -984,24 +907,24 @@ static void set_semotime(struct sem_array *sma, struct sembuf *sops)
  * @sops: operations that were performed
  * @nsops: number of operations
  * @otime: force setting otime
- * @pt: list head of the tasks that must be woken up.
+ * @wake_q: lockless wake-queue head
  *
  * do_smart_update() does the required calls to update_queue and wakeup_zero,
  * based on the actual changes that were performed on the semaphore array.
  * Note that the function does not do the actual wake-up: the caller is
- * responsible for calling wake_up_sem_queue_do(@pt).
+ * responsible for calling wake_up_q().
  * It is safe to perform this call after dropping all locks.
  */
 static void do_smart_update(struct sem_array *sma, struct sembuf *sops, int nsops,
-			int otime, struct list_head *pt)
+			    int otime, struct wake_q_head *wake_q)
 {
 	int i;
 
-	otime |= do_smart_wakeup_zero(sma, sops, nsops, pt);
+	otime |= do_smart_wakeup_zero(sma, sops, nsops, wake_q);
 
 	if (!list_empty(&sma->pending_alter)) {
 		/* semaphore array uses the global queue - just process it. */
-		otime |= update_queue(sma, -1, pt);
+		otime |= update_queue(sma, -1, wake_q);
 	} else {
 		if (!sops) {
 			/*
@@ -1009,7 +932,7 @@ static void do_smart_update(struct sem_array *sma, struct sembuf *sops, int nsop
 			 * known. Check all.
 			 */
 			for (i = 0; i < sma->sem_nsems; i++)
-				otime |= update_queue(sma, i, pt);
+				otime |= update_queue(sma, i, wake_q);
 		} else {
 			/*
 			 * Check the semaphores that were increased:
@@ -1023,7 +946,7 @@ static void do_smart_update(struct sem_array *sma, struct sembuf *sops, int nsop
 			for (i = 0; i < nsops; i++) {
 				if (sops[i].sem_op > 0) {
 					otime |= update_queue(sma,
-							sops[i].sem_num, pt);
+							      sops[i].sem_num, wake_q);
 				}
 			}
 		}
@@ -1111,8 +1034,8 @@ static void freeary(struct ipc_namespace *ns, struct kern_ipc_perm *ipcp)
 	struct sem_undo *un, *tu;
 	struct sem_queue *q, *tq;
 	struct sem_array *sma = container_of(ipcp, struct sem_array, sem_perm);
-	struct list_head tasks;
 	int i;
+	WAKE_Q(wake_q);
 
 	/* Free the existing undo structures for this semaphore set.  */
 	ipc_assert_locked_object(&sma->sem_perm);
@@ -1126,25 +1049,24 @@ static void freeary(struct ipc_namespace *ns, struct kern_ipc_perm *ipcp)
 	}
 
 	/* Wake up all pending processes and let them fail with EIDRM. */
-	INIT_LIST_HEAD(&tasks);
 	list_for_each_entry_safe(q, tq, &sma->pending_const, list) {
 		unlink_queue(sma, q);
-		wake_up_sem_queue_prepare(&tasks, q, -EIDRM);
+		wake_up_sem_queue_prepare(q, -EIDRM, &wake_q);
 	}
 
 	list_for_each_entry_safe(q, tq, &sma->pending_alter, list) {
 		unlink_queue(sma, q);
-		wake_up_sem_queue_prepare(&tasks, q, -EIDRM);
+		wake_up_sem_queue_prepare(q, -EIDRM, &wake_q);
 	}
 	for (i = 0; i < sma->sem_nsems; i++) {
 		struct sem *sem = sma->sem_base + i;
 		list_for_each_entry_safe(q, tq, &sem->pending_const, list) {
 			unlink_queue(sma, q);
-			wake_up_sem_queue_prepare(&tasks, q, -EIDRM);
+			wake_up_sem_queue_prepare(q, -EIDRM, &wake_q);
 		}
 		list_for_each_entry_safe(q, tq, &sem->pending_alter, list) {
 			unlink_queue(sma, q);
-			wake_up_sem_queue_prepare(&tasks, q, -EIDRM);
+			wake_up_sem_queue_prepare(q, -EIDRM, &wake_q);
 		}
 	}
 
@@ -1153,7 +1075,7 @@ static void freeary(struct ipc_namespace *ns, struct kern_ipc_perm *ipcp)
 	sem_unlock(sma, -1);
 	rcu_read_unlock();
 
-	wake_up_sem_queue_do(&tasks);
+	wake_up_q(&wake_q);
 	ns->used_sems -= sma->sem_nsems;
 	ipc_rcu_putref(sma, sem_rcu_free);
 }
@@ -1292,9 +1214,9 @@ static int semctl_setval(struct ipc_namespace *ns, int semid, int semnum,
 	struct sem_undo *un;
 	struct sem_array *sma;
 	struct sem *curr;
-	int err;
-	struct list_head tasks;
-	int val;
+	int err, val;
+	WAKE_Q(wake_q);
+
 #if defined(CONFIG_64BIT) && defined(__BIG_ENDIAN)
 	/* big-endian 64bit */
 	val = arg >> 32;
@@ -1306,8 +1228,6 @@ static int semctl_setval(struct ipc_namespace *ns, int semid, int semnum,
 	if (val > SEMVMX || val < 0)
 		return -ERANGE;
 
-	INIT_LIST_HEAD(&tasks);
-
 	rcu_read_lock();
 	sma = sem_obtain_object_check(ns, semid);
 	if (IS_ERR(sma)) {
@@ -1350,10 +1270,10 @@ static int semctl_setval(struct ipc_namespace *ns, int semid, int semnum,
 	curr->sempid = task_tgid_vnr(current);
 	sma->sem_ctime = get_seconds();
 	/* maybe some queued-up processes were waiting for this */
-	do_smart_update(sma, NULL, 0, 0, &tasks);
+	do_smart_update(sma, NULL, 0, 0, &wake_q);
 	sem_unlock(sma, -1);
 	rcu_read_unlock();
-	wake_up_sem_queue_do(&tasks);
+	wake_up_q(&wake_q);
 	return 0;
 }
 
@@ -1365,9 +1285,7 @@ static int semctl_main(struct ipc_namespace *ns, int semid, int semnum,
 	int err, nsems;
 	ushort fast_sem_io[SEMMSL_FAST];
 	ushort *sem_io = fast_sem_io;
-	struct list_head tasks;
-
-	INIT_LIST_HEAD(&tasks);
+	WAKE_Q(wake_q);
 
 	rcu_read_lock();
 	sma = sem_obtain_object_check(ns, semid);
@@ -1478,7 +1396,7 @@ static int semctl_main(struct ipc_namespace *ns, int semid, int semnum,
 		}
 		sma->sem_ctime = get_seconds();
 		/* maybe some queued-up processes were waiting for this */
-		do_smart_update(sma, NULL, 0, 0, &tasks);
+		do_smart_update(sma, NULL, 0, 0, &wake_q);
 		err = 0;
 		goto out_unlock;
 	}
@@ -1514,7 +1432,7 @@ out_unlock:
 	sem_unlock(sma, -1);
 out_rcu_wakeup:
 	rcu_read_unlock();
-	wake_up_sem_queue_do(&tasks);
+	wake_up_q(&wake_q);
 out_free:
 	if (sem_io != fast_sem_io)
 		ipc_free(sem_io);
@@ -1787,32 +1705,6 @@ out:
 	return un;
 }
 
-
-/**
- * get_queue_result - retrieve the result code from sem_queue
- * @q: Pointer to queue structure
- *
- * Retrieve the return code from the pending queue. If IN_WAKEUP is found in
- * q->status, then we must loop until the value is replaced with the final
- * value: This may happen if a task is woken up by an unrelated event (e.g.
- * signal) and in parallel the task is woken up by another task because it got
- * the requested semaphores.
- *
- * The function can be called with or without holding the semaphore spinlock.
- */
-static int get_queue_result(struct sem_queue *q)
-{
-	int error;
-
-	error = q->status;
-	while (unlikely(error == IN_WAKEUP)) {
-		cpu_relax();
-		error = q->status;
-	}
-
-	return error;
-}
-
 SYSCALL_DEFINE4(semtimedop, int, semid, struct sembuf __user *, tsops,
 		unsigned, nsops, const struct timespec __user *, timeout)
 {
@@ -1825,7 +1717,6 @@ SYSCALL_DEFINE4(semtimedop, int, semid, struct sembuf __user *, tsops,
 	struct sem_queue queue;
 	unsigned long jiffies_left = 0;
 	struct ipc_namespace *ns;
-	struct list_head tasks;
 
 	ns = current->nsproxy->ipc_ns;
 
@@ -1865,7 +1756,6 @@ SYSCALL_DEFINE4(semtimedop, int, semid, struct sembuf __user *, tsops,
 			alter = 1;
 	}
 
-	INIT_LIST_HEAD(&tasks);
 
 	if (undos) {
 		/* On success, find_alloc_undo takes the rcu_read_lock */
@@ -1933,22 +1823,32 @@ SYSCALL_DEFINE4(semtimedop, int, semid, struct sembuf __user *, tsops,
 	queue.alter = alter;
 
 	error = perform_atomic_semop(sma, &queue);
-	if (error == 0) {
-		/* If the operation was successful, then do
+	if (error <= 0) { /* non-blocking path */
+		WAKE_Q(wake_q);
+
+		/*
+		 * If the operation was successful, then do
 		 * the required updates.
 		 */
-		if (alter)
-			do_smart_update(sma, sops, nsops, 1, &tasks);
-		else
-			set_semotime(sma, sops);
+		if (error == 0) {
+			if (alter)
+				do_smart_update(sma, sops, nsops, 1, &wake_q);
+			else
+				set_semotime(sma, sops);
+
+		}
+
+		sem_unlock(sma, locknum);
+		rcu_read_unlock();
+		wake_up_q(&wake_q);
+
+		goto out_free;
 	}
-	if (error <= 0)
-		goto out_unlock_free;
 
-	/* We need to sleep on this operation, so we put the current
+	/*
+	 * We need to sleep on this operation, so we put the current
 	 * task into the pending queue and go to sleep.
 	 */
-
 	if (nsops == 1) {
 		struct sem *curr;
 		curr = &sma->sem_base[sops->sem_num];
@@ -1977,10 +1877,10 @@ SYSCALL_DEFINE4(semtimedop, int, semid, struct sembuf __user *, tsops,
 		sma->complex_count++;
 	}
 
+sleep_again:
 	queue.status = -EINTR;
 	queue.sleeper = current;
 
-sleep_again:
 	__set_current_state(TASK_INTERRUPTIBLE);
 	sem_unlock(sma, locknum);
 	rcu_read_unlock();
@@ -1990,28 +1890,29 @@ sleep_again:
 	else
 		schedule();
 
-	error = get_queue_result(&queue);
-
-	if (error != -EINTR) {
-		/* fast path: update_queue already obtained all requested
-		 * resources.
-		 * Perform a smp_mb(): User space could assume that semop()
-		 * is a memory barrier: Without the mb(), the cpu could
-		 * speculatively read in user space stale data that was
-		 * overwritten by the previous owner of the semaphore.
+	/*
+	 * fastpath: the semop has completed, either successfully or not, from
+	 * the syscall pov, is quite irrelevant to us at this point; we're done.
+	 *
+	 * We _do_ care, nonetheless, about being awoken by a signal or spuriously.
+	 * The queue.status is checked again in the slowpath (aka after taking
+	 * sem_lock), such that we can detect scenarios where we were awakened
+	 * externally, during the window between wake_q_add() and wake_up_q().
+	 */
+	if ((error = queue.status) != -EINTR && !signal_pending(current)) {
+		/*
+		 * User space could assume that semop() is a memory barrier:
+		 * Without the mb(), the cpu could speculatively read in user
+		 * space stale data that was overwritten by the previous owner
+		 * of the semaphore.
 		 */
 		smp_mb();
-
 		goto out_free;
 	}
 
 	rcu_read_lock();
 	sma = sem_obtain_lock(ns, semid, sops, nsops, &locknum);
-
-	/*
-	 * Wait until it's guaranteed that no wakeup_sem_queue_do() is ongoing.
-	 */
-	error = get_queue_result(&queue);
+	error = queue.status;
 
 	/*
 	 * Array removed? If yes, leave without sem_unlock().
@@ -2021,7 +1922,6 @@ sleep_again:
 		goto out_free;
 	}
 
-
 	/*
 	 * If queue.status != -EINTR we are woken up by another process.
 	 * Leave without unlink_queue(), but with sem_unlock().
@@ -2030,13 +1930,13 @@ sleep_again:
 		goto out_unlock_free;
 
 	/*
-	 * If an interrupt occurred we have to clean up the queue
+	 * If an interrupt occurred we have to clean up the queue.
 	 */
 	if (timeout && jiffies_left == 0)
 		error = -EAGAIN;
 
 	/*
-	 * If the wakeup was spurious, just retry
+	 * If the wakeup was spurious, just retry.
 	 */
 	if (error == -EINTR && !signal_pending(current))
 		goto sleep_again;
@@ -2046,7 +1946,6 @@ sleep_again:
 out_unlock_free:
 	sem_unlock(sma, locknum);
 	rcu_read_unlock();
-	wake_up_sem_queue_do(&tasks);
 out_free:
 	if (sops != fast_sops)
 		kfree(sops);
@@ -2107,8 +2006,8 @@ void exit_sem(struct task_struct *tsk)
 	for (;;) {
 		struct sem_array *sma;
 		struct sem_undo *un;
-		struct list_head tasks;
 		int semid, i;
+		WAKE_Q(wake_q);
 
 		rcu_read_lock();
 		un = list_entry_rcu(ulp->list_proc.next,
@@ -2194,11 +2093,10 @@ void exit_sem(struct task_struct *tsk)
 			}
 		}
 		/* maybe some queued-up processes were waiting for this */
-		INIT_LIST_HEAD(&tasks);
-		do_smart_update(sma, NULL, 0, 1, &tasks);
+		do_smart_update(sma, NULL, 0, 1, &wake_q);
 		sem_unlock(sma, -1);
 		rcu_read_unlock();
-		wake_up_sem_queue_do(&tasks);
+		wake_up_q(&wake_q);
 
 		kfree_rcu(un, rcu);
 	}
--
2.6.6

^ permalink raw reply related	[flat|nested] 15+ messages in thread

end of thread, other threads:[~2016-09-21 19:46 UTC | newest]

Thread overview: 15+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2016-09-18 19:11 [PATCH -next v2 0/5] ipc/sem: semop(2) improvements Davidlohr Bueso
2016-09-18 19:11 ` [PATCH 1/5] ipc/sem: do not call wake_sem_queue_do() prematurely Davidlohr Bueso
2016-09-18 19:11 ` [PATCH 2/5] ipc/sem: rework task wakeups Davidlohr Bueso
2016-09-19 18:26   ` Manfred Spraul
2016-09-18 19:11 ` [PATCH 3/5] ipc/sem: optimize perform_atomic_semop() Davidlohr Bueso
2016-09-21 19:46   ` [PATCH v3] " Davidlohr Bueso
2016-09-18 19:11 ` [PATCH 4/5] ipc/sem: explicitly inline check_restart Davidlohr Bueso
2016-09-18 19:11 ` [PATCH 5/5] ipc/sem: use proper list api for pending_list wakeups Davidlohr Bueso
2016-09-19 18:40 ` [PATCH -next v2 0/5] ipc/sem: semop(2) improvements Manfred Spraul
2016-09-20 15:03   ` Davidlohr Bueso
  -- strict thread matches above, loose matches on Subject: below --
2016-09-12 11:53 [PATCH -next " Davidlohr Bueso
2016-09-12 11:53 ` [PATCH 2/5] ipc/sem: rework task wakeups Davidlohr Bueso
2016-09-13 18:04   ` Manfred Spraul
2016-09-14 15:45     ` Davidlohr Bueso
2016-09-18 14:37   ` Manfred Spraul
2016-09-18 18:26     ` Davidlohr Bueso

This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.