All of lore.kernel.org
 help / color / mirror / Atom feed
* [PATCH v4] ipc/msg: Implement lockless pipelined wakeups
@ 2016-07-20 15:45 Sebastian Andrzej Siewior
  2016-07-21  0:16 ` Davidlohr Bueso
  0 siblings, 1 reply; 5+ messages in thread
From: Sebastian Andrzej Siewior @ 2016-07-20 15:45 UTC (permalink / raw)
  To: linux-kernel
  Cc: Peter Zijlstra, tglx, Sebastian Andrzej Siewior, Davidlohr Bueso,
	Manfred Spraul, Andrew Morton

This patch moves the wakeup_process() invocation so it is not done under
the perm->lock by making use of a lockless wake_q. With this change, the
waiter is woken up once the message has been assigned and it does not
need to loop on SMP if the message points to NULL. In the signal case we
still need to check the pointer under the lock to verify the state.

This change should also avoid the introduction of preempt_disable() in
-RT which avoids a busy-loop which pools for the NULL -> !NULL
change if the waiter has a higher priority compared to the waker.

This has been tested with Manred's pmsg-shared tool on a "AMD A10-7800
Radeon R7, 12 Compute Cores 4C+8G":

test             |   before   |   after    | diff
-----------------|------------|------------|----------
pmsg-shared 8 60 | 19,347,422 | 30,442,191 | + ~57.34 %
pmsg-shared 4 60 | 21,367,197 | 35,743,458 | + ~67.28 %
pmsg-shared 2 60 | 22,884,224 | 24,278,200 | +  ~6.09 %

v3…v4:  - drop smp_wmb in the error case as per Davidlohr
v2…v3:  - add smp_[rw]mb back including the usage graphic of them
        - use READ_ONCE / WRITE_ONCE after the removal of the volatile
	  attribute.
v1…v2:
	- msg_receiver.r_msg is no longer volatile. After all we no
	  longer have that busy loop.
	- added a comment while we do wake_q_add() followed by the
	  assignment of ->r_msg  and not the other way around.

Cc: Davidlohr Bueso <dave@stgolabs.net>
Cc: Manfred Spraul <manfred@colorfullife.com>
Cc: Andrew Morton <akpm@linux-foundation.org>
Signed-off-by: Sebastian Andrzej Siewior <bigeasy@linutronix.de>
---
 ipc/msg.c | 137 ++++++++++++++++++++++++--------------------------------------
 1 file changed, 53 insertions(+), 84 deletions(-)

diff --git a/ipc/msg.c b/ipc/msg.c
index 1471db9a7e61..d0e73749b41d 100644
--- a/ipc/msg.c
+++ b/ipc/msg.c
@@ -51,13 +51,7 @@ struct msg_receiver {
 	long			r_msgtype;
 	long			r_maxsize;
 
-	/*
-	 * Mark r_msg volatile so that the compiler
-	 * does not try to get smart and optimize
-	 * it. We rely on this for the lockless
-	 * receive algorithm.
-	 */
-	struct msg_msg		*volatile r_msg;
+	struct msg_msg		*r_msg;
 };
 
 /* one msg_sender for each sleeping sender */
@@ -183,21 +177,16 @@ static void ss_wakeup(struct list_head *h, int kill)
 	}
 }
 
-static void expunge_all(struct msg_queue *msq, int res)
+static void expunge_all(struct msg_queue *msq, int res,
+			struct wake_q_head *wake_q)
 {
 	struct msg_receiver *msr, *t;
 
 	list_for_each_entry_safe(msr, t, &msq->q_receivers, r_list) {
-		msr->r_msg = NULL; /* initialize expunge ordering */
-		wake_up_process(msr->r_tsk);
-		/*
-		 * Ensure that the wakeup is visible before setting r_msg as
-		 * the receiving end depends on it: either spinning on a nil,
-		 * or dealing with -EAGAIN cases. See lockless receive part 1
-		 * and 2 in do_msgrcv().
-		 */
-		smp_wmb(); /* barrier (B) */
-		msr->r_msg = ERR_PTR(res);
+
+		WRITE_ONCE(msr->r_msg, ERR_PTR(res));
+		/* rely on wake_q_add() barrier instead of explicit smp_wmb */
+		wake_q_add(wake_q, msr->r_tsk);
 	}
 }
 
@@ -213,11 +202,13 @@ static void freeque(struct ipc_namespace *ns, struct kern_ipc_perm *ipcp)
 {
 	struct msg_msg *msg, *t;
 	struct msg_queue *msq = container_of(ipcp, struct msg_queue, q_perm);
+	WAKE_Q(wake_q);
 
-	expunge_all(msq, -EIDRM);
+	expunge_all(msq, -EIDRM, &wake_q);
 	ss_wakeup(&msq->q_senders, 1);
 	msg_rmid(ns, msq);
 	ipc_unlock_object(&msq->q_perm);
+	wake_up_q(&wake_q);
 	rcu_read_unlock();
 
 	list_for_each_entry_safe(msg, t, &msq->q_messages, m_list) {
@@ -342,6 +333,7 @@ static int msgctl_down(struct ipc_namespace *ns, int msqid, int cmd,
 	struct kern_ipc_perm *ipcp;
 	struct msqid64_ds uninitialized_var(msqid64);
 	struct msg_queue *msq;
+	WAKE_Q(wake_q);
 	int err;
 
 	if (cmd == IPC_SET) {
@@ -389,7 +381,7 @@ static int msgctl_down(struct ipc_namespace *ns, int msqid, int cmd,
 		/* sleeping receivers might be excluded by
 		 * stricter permissions.
 		 */
-		expunge_all(msq, -EAGAIN);
+		expunge_all(msq, -EAGAIN, &wake_q);
 		/* sleeping senders might be able to send
 		 * due to a larger queue size.
 		 */
@@ -402,6 +394,7 @@ static int msgctl_down(struct ipc_namespace *ns, int msqid, int cmd,
 
 out_unlock0:
 	ipc_unlock_object(&msq->q_perm);
+	wake_up_q(&wake_q);
 out_unlock1:
 	rcu_read_unlock();
 out_up:
@@ -566,7 +559,8 @@ static int testmsg(struct msg_msg *msg, long type, int mode)
 	return 0;
 }
 
-static inline int pipelined_send(struct msg_queue *msq, struct msg_msg *msg)
+static inline int pipelined_send(struct msg_queue *msq, struct msg_msg *msg,
+				 struct wake_q_head *wake_q)
 {
 	struct msg_receiver *msr, *t;
 
@@ -577,27 +571,23 @@ static inline int pipelined_send(struct msg_queue *msq, struct msg_msg *msg)
 
 			list_del(&msr->r_list);
 			if (msr->r_maxsize < msg->m_ts) {
-				/* initialize pipelined send ordering */
-				msr->r_msg = NULL;
-				wake_up_process(msr->r_tsk);
-				/* barrier (B) see barrier comment below */
-				smp_wmb();
-				msr->r_msg = ERR_PTR(-E2BIG);
+				WRITE_ONCE(msr->r_msg, ERR_PTR(-E2BIG));
+				/*
+				 * rely on wake_q_add() barrier instead of
+				 * explicit smp_wmb
+				 */
+				wake_q_add(wake_q, msr->r_tsk);
 			} else {
-				msr->r_msg = NULL;
 				msq->q_lrpid = task_pid_vnr(msr->r_tsk);
 				msq->q_rtime = get_seconds();
-				wake_up_process(msr->r_tsk);
 				/*
-				 * Ensure that the wakeup is visible before
-				 * setting r_msg, as the receiving can otherwise
-				 * exit - once r_msg is set, the receiver can
-				 * continue. See lockless receive part 1 and 2
-				 * in do_msgrcv(). Barrier (B).
+				 * Ensure that we see the new r_msg after the
+				 * wake up or the old value forcing to take the
+				 * queue lock.
 				 */
-				smp_wmb();
-				msr->r_msg = msg;
-
+				WRITE_ONCE(msr->r_msg, msg);
+				smp_wmb(); /* barrier (B) */
+				wake_q_add(wake_q, msr->r_tsk);
 				return 1;
 			}
 		}
@@ -613,6 +603,7 @@ long do_msgsnd(int msqid, long mtype, void __user *mtext,
 	struct msg_msg *msg;
 	int err;
 	struct ipc_namespace *ns;
+	WAKE_Q(wake_q);
 
 	ns = current->nsproxy->ipc_ns;
 
@@ -686,7 +677,6 @@ long do_msgsnd(int msqid, long mtype, void __user *mtext,
 			err = -EIDRM;
 			goto out_unlock0;
 		}
-
 		ss_del(&s);
 
 		if (signal_pending(current)) {
@@ -698,7 +688,7 @@ long do_msgsnd(int msqid, long mtype, void __user *mtext,
 	msq->q_lspid = task_tgid_vnr(current);
 	msq->q_stime = get_seconds();
 
-	if (!pipelined_send(msq, msg)) {
+	if (!pipelined_send(msq, msg, &wake_q)) {
 		/* no one is waiting for this message, enqueue it */
 		list_add_tail(&msg->m_list, &msq->q_messages);
 		msq->q_cbytes += msgsz;
@@ -712,6 +702,7 @@ long do_msgsnd(int msqid, long mtype, void __user *mtext,
 
 out_unlock0:
 	ipc_unlock_object(&msq->q_perm);
+	wake_up_q(&wake_q);
 out_unlock1:
 	rcu_read_unlock();
 	if (msg != NULL)
@@ -919,71 +910,49 @@ long do_msgrcv(int msqid, void __user *buf, size_t bufsz, long msgtyp, int msgfl
 		rcu_read_unlock();
 		schedule();
 
-		/* Lockless receive, part 1:
-		 * Disable preemption.  We don't hold a reference to the queue
-		 * and getting a reference would defeat the idea of a lockless
-		 * operation, thus the code relies on rcu to guarantee the
-		 * existence of msq:
+		/*
+		 * Lockless receive, part 1:
+		 * We don't hold a reference to the queue and getting a
+		 * reference would defeat the idea of a lockless operation,
+		 * thus the code relies on rcu to guarantee the existence of
+		 * msq:
 		 * Prior to destruction, expunge_all(-EIRDM) changes r_msg.
 		 * Thus if r_msg is -EAGAIN, then the queue not yet destroyed.
-		 * rcu_read_lock() prevents preemption between reading r_msg
-		 * and acquiring the q_perm.lock in ipc_lock_object().
 		 */
 		rcu_read_lock();
 
-		/* Lockless receive, part 2:
-		 * Wait until pipelined_send or expunge_all are outside of
-		 * wake_up_process(). There is a race with exit(), see
-		 * ipc/mqueue.c for the details. The correct serialization
-		 * ensures that a receiver cannot continue without the wakeup
-		 * being visibible _before_ setting r_msg:
+		/*
+		 * Lockless receive, part 2:
+		 * The work in pipelined_send() and expunge_all():
+		 * - Set pointer to message
+		 * - Queue the receiver task for later wakeup
+		 * - Wake up the process after the lock is dropped.
 		 *
 		 * CPU 0                             CPU 1
 		 * <loop receiver>
 		 *   smp_rmb(); (A) <-- pair -.      <waker thread>
-		 *   <load ->r_msg>           |        msr->r_msg = NULL;
-		 *                            |        wake_up_process();
+		 *   <load ->r_msg>           |        msr->r_msg = msg;
 		 * <continue>                 `------> smp_wmb(); (B)
-		 *                                     msr->r_msg = msg;
+		 *                                     wake_up_process();
 		 *
 		 * Where (A) orders the message value read and where (B) orders
 		 * the write to the r_msg -- done in both pipelined_send and
 		 * expunge_all.
+		 * Should the process wake up before this wakeup (due to a
+		 * signal) it will either see the message and continue ...
 		 */
-		for (;;) {
-			/*
-			 * Pairs with writer barrier in pipelined_send
-			 * or expunge_all.
-			 */
-			smp_rmb(); /* barrier (A) */
-			msg = (struct msg_msg *)msr_d.r_msg;
-			if (msg)
-				break;
-
-			/*
-			 * The cpu_relax() call is a compiler barrier
-			 * which forces everything in this loop to be
-			 * re-loaded.
-			 */
-			cpu_relax();
-		}
-
-		/* Lockless receive, part 3:
-		 * If there is a message or an error then accept it without
-		 * locking.
-		 */
+		smp_rmb(); /* barrier (A) */
+		msg = READ_ONCE(msr_d.r_msg);
 		if (msg != ERR_PTR(-EAGAIN))
 			goto out_unlock1;
 
-		/* Lockless receive, part 3:
-		 * Acquire the queue spinlock.
-		 */
+		 /*
+		  * ... or see -EAGAIN, acquire the lock to check the message
+		  * again.
+		  */
 		ipc_lock_object(&msq->q_perm);
 
-		/* Lockless receive, part 4:
-		 * Repeat test after acquiring the spinlock.
-		 */
-		msg = (struct msg_msg *)msr_d.r_msg;
+		msg = msr_d.r_msg;
 		if (msg != ERR_PTR(-EAGAIN))
 			goto out_unlock0;
 
-- 
2.8.1

^ permalink raw reply related	[flat|nested] 5+ messages in thread

* Re: [PATCH v4] ipc/msg: Implement lockless pipelined wakeups
  2016-07-20 15:45 [PATCH v4] ipc/msg: Implement lockless pipelined wakeups Sebastian Andrzej Siewior
@ 2016-07-21  0:16 ` Davidlohr Bueso
  2016-07-21 10:07   ` [PATCH v5] " Sebastian Andrzej Siewior
  2016-07-21 10:23   ` [PATCH v4] " Sebastian Andrzej Siewior
  0 siblings, 2 replies; 5+ messages in thread
From: Davidlohr Bueso @ 2016-07-21  0:16 UTC (permalink / raw)
  To: Sebastian Andrzej Siewior
  Cc: linux-kernel, Peter Zijlstra, tglx, Manfred Spraul, Andrew Morton

On Wed, 20 Jul 2016, Sebastian Andrzej Siewior wrote:

>This patch moves the wakeup_process() invocation so it is not done under
>the perm->lock by making use of a lockless wake_q. With this change, the
>waiter is woken up once the message has been assigned and it does not
>need to loop on SMP if the message points to NULL. In the signal case we
>still need to check the pointer under the lock to verify the state.
>
>This change should also avoid the introduction of preempt_disable() in
>-RT which avoids a busy-loop which pools for the NULL -> !NULL
>change if the waiter has a higher priority compared to the waker.
>
>This has been tested with Manred's pmsg-shared tool on a "AMD A10-7800
>Radeon R7, 12 Compute Cores 4C+8G":
>
>test             |   before   |   after    | diff
>-----------------|------------|------------|----------
>pmsg-shared 8 60 | 19,347,422 | 30,442,191 | + ~57.34 %
>pmsg-shared 4 60 | 21,367,197 | 35,743,458 | + ~67.28 %
>pmsg-shared 2 60 | 22,884,224 | 24,278,200 | +  ~6.09 %
>
>v3???v4:  - drop smp_wmb in the error case as per Davidlohr
>v2???v3:  - add smp_[rw]mb back including the usage graphic of them
>        - use READ_ONCE / WRITE_ONCE after the removal of the volatile
>	  attribute.
>v1???v2:
>	- msg_receiver.r_msg is no longer volatile. After all we no
>	  longer have that busy loop.
>	- added a comment while we do wake_q_add() followed by the
>	  assignment of ->r_msg  and not the other way around.
>
>Cc: Davidlohr Bueso <dave@stgolabs.net>
>Cc: Manfred Spraul <manfred@colorfullife.com>
>Cc: Andrew Morton <akpm@linux-foundation.org>
>Signed-off-by: Sebastian Andrzej Siewior <bigeasy@linutronix.de>

This looks good to me, and is now very similar to the posix flavor of
msg queues in this regard. fwiw I threw it under ltp msgqueue specific
tests without things breaking. Just a small comments below, otherwise
feel free to add my:

Reviewed-by: Davidlohr Bueso <dave@stgolabs.net>

>-static inline int pipelined_send(struct msg_queue *msq, struct msg_msg *msg)
>+static inline int pipelined_send(struct msg_queue *msq, struct msg_msg *msg,
>+				 struct wake_q_head *wake_q)
> {
> 	struct msg_receiver *msr, *t;
>
>@@ -577,27 +571,23 @@ static inline int pipelined_send(struct msg_queue *msq, struct msg_msg *msg)
>
> 			list_del(&msr->r_list);
> 			if (msr->r_maxsize < msg->m_ts) {
>-				/* initialize pipelined send ordering */
>-				msr->r_msg = NULL;
>-				wake_up_process(msr->r_tsk);
>-				/* barrier (B) see barrier comment below */
>-				smp_wmb();
>-				msr->r_msg = ERR_PTR(-E2BIG);
>+				WRITE_ONCE(msr->r_msg, ERR_PTR(-E2BIG));
>+				/*
>+				 * rely on wake_q_add() barrier instead of
>+				 * explicit smp_wmb
>+				 */
>+				wake_q_add(wake_q, msr->r_tsk);
> 			} else {
>-				msr->r_msg = NULL;
> 				msq->q_lrpid = task_pid_vnr(msr->r_tsk);
> 				msq->q_rtime = get_seconds();
>-				wake_up_process(msr->r_tsk);
> 				/*
>-				 * Ensure that the wakeup is visible before
>-				 * setting r_msg, as the receiving can otherwise
>-				 * exit - once r_msg is set, the receiver can
>-				 * continue. See lockless receive part 1 and 2
>-				 * in do_msgrcv(). Barrier (B).
>+				 * Ensure that we see the new r_msg after the
>+				 * wake up or the old value forcing to take the
>+				 * queue lock.
> 				 */
>-				smp_wmb();
>-				msr->r_msg = msg;
>-
>+				WRITE_ONCE(msr->r_msg, msg);
>+				smp_wmb(); /* barrier (B) */
>+				wake_q_add(wake_q, msr->r_tsk);

Just as with expunge_all and the E2BIG case, could you remove that explicit
barrier (B) and just rely on wake_q_add?

Thanks,
Davidlohr

^ permalink raw reply	[flat|nested] 5+ messages in thread

* [PATCH v5] ipc/msg: Implement lockless pipelined wakeups
  2016-07-21  0:16 ` Davidlohr Bueso
@ 2016-07-21 10:07   ` Sebastian Andrzej Siewior
  2016-07-21 10:23   ` [PATCH v4] " Sebastian Andrzej Siewior
  1 sibling, 0 replies; 5+ messages in thread
From: Sebastian Andrzej Siewior @ 2016-07-21 10:07 UTC (permalink / raw)
  To: Davidlohr Bueso
  Cc: linux-kernel, Peter Zijlstra, tglx, Manfred Spraul, Andrew Morton

This patch moves the wakeup_process() invocation so it is not done under
the perm->lock by making use of a lockless wake_q. With this change, the
waiter is woken up once the message has been assigned and it does not
need to loop on SMP if the message points to NULL. In the signal case we
still need to check the pointer under the lock to verify the state.

This change should also avoid the introduction of preempt_disable() in
-RT which avoids a busy-loop which pools for the NULL -> !NULL
change if the waiter has a higher priority compared to the waker.

This has been tested with Manred's pmsg-shared tool on a "AMD A10-7800
Radeon R7, 12 Compute Cores 4C+8G":

test             |   before   |   after    | diff
-----------------|------------|------------|----------
pmsg-shared 8 60 | 19,347,422 | 30,442,191 | + ~57.34 %
pmsg-shared 4 60 | 21,367,197 | 35,743,458 | + ~67.28 %
pmsg-shared 2 60 | 22,884,224 | 24,278,200 | +  ~6.09 %

v4…v5:  - drop last smp_wmb and rely on cmpxchg barrier as per Davidlohr
v3…v4:  - drop smp_wmb in the error case as per Davidlohr
v2…v3:  - add smp_[rw]mb back including the usage graphic of them
        - use READ_ONCE / WRITE_ONCE after the removal of the volatile
	  attribute.
v1…v2:
	- msg_receiver.r_msg is no longer volatile. After all we no
	  longer have that busy loop.
	- added a comment while we do wake_q_add() followed by the
	  assignment of ->r_msg  and not the other way around.

Reviewed-by: Davidlohr Bueso <dave@stgolabs.net>
Cc: Davidlohr Bueso <dave@stgolabs.net>
Cc: Manfred Spraul <manfred@colorfullife.com>
Cc: Andrew Morton <akpm@linux-foundation.org>
Signed-off-by: Sebastian Andrzej Siewior <bigeasy@linutronix.de>
---
 ipc/msg.c | 140 +++++++++++++++++++++++++-------------------------------------
 1 file changed, 56 insertions(+), 84 deletions(-)

diff --git a/ipc/msg.c b/ipc/msg.c
index 1471db9a7e61..d05a880d8188 100644
--- a/ipc/msg.c
+++ b/ipc/msg.c
@@ -51,13 +51,7 @@ struct msg_receiver {
 	long			r_msgtype;
 	long			r_maxsize;
 
-	/*
-	 * Mark r_msg volatile so that the compiler
-	 * does not try to get smart and optimize
-	 * it. We rely on this for the lockless
-	 * receive algorithm.
-	 */
-	struct msg_msg		*volatile r_msg;
+	struct msg_msg		*r_msg;
 };
 
 /* one msg_sender for each sleeping sender */
@@ -183,21 +177,16 @@ static void ss_wakeup(struct list_head *h, int kill)
 	}
 }
 
-static void expunge_all(struct msg_queue *msq, int res)
+static void expunge_all(struct msg_queue *msq, int res,
+			struct wake_q_head *wake_q)
 {
 	struct msg_receiver *msr, *t;
 
 	list_for_each_entry_safe(msr, t, &msq->q_receivers, r_list) {
-		msr->r_msg = NULL; /* initialize expunge ordering */
-		wake_up_process(msr->r_tsk);
-		/*
-		 * Ensure that the wakeup is visible before setting r_msg as
-		 * the receiving end depends on it: either spinning on a nil,
-		 * or dealing with -EAGAIN cases. See lockless receive part 1
-		 * and 2 in do_msgrcv().
-		 */
-		smp_wmb(); /* barrier (B) */
-		msr->r_msg = ERR_PTR(res);
+
+		WRITE_ONCE(msr->r_msg, ERR_PTR(res));
+		/* rely on wake_q_add() barrier instead of explicit smp_wmb */
+		wake_q_add(wake_q, msr->r_tsk);
 	}
 }
 
@@ -213,11 +202,13 @@ static void freeque(struct ipc_namespace *ns, struct kern_ipc_perm *ipcp)
 {
 	struct msg_msg *msg, *t;
 	struct msg_queue *msq = container_of(ipcp, struct msg_queue, q_perm);
+	WAKE_Q(wake_q);
 
-	expunge_all(msq, -EIDRM);
+	expunge_all(msq, -EIDRM, &wake_q);
 	ss_wakeup(&msq->q_senders, 1);
 	msg_rmid(ns, msq);
 	ipc_unlock_object(&msq->q_perm);
+	wake_up_q(&wake_q);
 	rcu_read_unlock();
 
 	list_for_each_entry_safe(msg, t, &msq->q_messages, m_list) {
@@ -342,6 +333,7 @@ static int msgctl_down(struct ipc_namespace *ns, int msqid, int cmd,
 	struct kern_ipc_perm *ipcp;
 	struct msqid64_ds uninitialized_var(msqid64);
 	struct msg_queue *msq;
+	WAKE_Q(wake_q);
 	int err;
 
 	if (cmd == IPC_SET) {
@@ -389,7 +381,7 @@ static int msgctl_down(struct ipc_namespace *ns, int msqid, int cmd,
 		/* sleeping receivers might be excluded by
 		 * stricter permissions.
 		 */
-		expunge_all(msq, -EAGAIN);
+		expunge_all(msq, -EAGAIN, &wake_q);
 		/* sleeping senders might be able to send
 		 * due to a larger queue size.
 		 */
@@ -402,6 +394,7 @@ static int msgctl_down(struct ipc_namespace *ns, int msqid, int cmd,
 
 out_unlock0:
 	ipc_unlock_object(&msq->q_perm);
+	wake_up_q(&wake_q);
 out_unlock1:
 	rcu_read_unlock();
 out_up:
@@ -566,7 +559,8 @@ static int testmsg(struct msg_msg *msg, long type, int mode)
 	return 0;
 }
 
-static inline int pipelined_send(struct msg_queue *msq, struct msg_msg *msg)
+static inline int pipelined_send(struct msg_queue *msq, struct msg_msg *msg,
+				 struct wake_q_head *wake_q)
 {
 	struct msg_receiver *msr, *t;
 
@@ -577,27 +571,26 @@ static inline int pipelined_send(struct msg_queue *msq, struct msg_msg *msg)
 
 			list_del(&msr->r_list);
 			if (msr->r_maxsize < msg->m_ts) {
-				/* initialize pipelined send ordering */
-				msr->r_msg = NULL;
-				wake_up_process(msr->r_tsk);
-				/* barrier (B) see barrier comment below */
-				smp_wmb();
-				msr->r_msg = ERR_PTR(-E2BIG);
+				WRITE_ONCE(msr->r_msg, ERR_PTR(-E2BIG));
+				/*
+				 * rely on wake_q_add() barrier instead of
+				 * explicit smp_wmb
+				 */
+				wake_q_add(wake_q, msr->r_tsk);
 			} else {
-				msr->r_msg = NULL;
 				msq->q_lrpid = task_pid_vnr(msr->r_tsk);
 				msq->q_rtime = get_seconds();
-				wake_up_process(msr->r_tsk);
 				/*
-				 * Ensure that the wakeup is visible before
-				 * setting r_msg, as the receiving can otherwise
-				 * exit - once r_msg is set, the receiver can
-				 * continue. See lockless receive part 1 and 2
-				 * in do_msgrcv(). Barrier (B).
+				 * Ensure that we see the new r_msg after the
+				 * wake up or the old value forcing to take the
+				 * queue lock.
 				 */
-				smp_wmb();
-				msr->r_msg = msg;
-
+				WRITE_ONCE(msr->r_msg, msg);
+				/*
+				 * rely on wake_q_add() barrier instead of
+				 * explicit smp_wmb
+				 */
+				wake_q_add(wake_q, msr->r_tsk);
 				return 1;
 			}
 		}
@@ -613,6 +606,7 @@ long do_msgsnd(int msqid, long mtype, void __user *mtext,
 	struct msg_msg *msg;
 	int err;
 	struct ipc_namespace *ns;
+	WAKE_Q(wake_q);
 
 	ns = current->nsproxy->ipc_ns;
 
@@ -686,7 +680,6 @@ long do_msgsnd(int msqid, long mtype, void __user *mtext,
 			err = -EIDRM;
 			goto out_unlock0;
 		}
-
 		ss_del(&s);
 
 		if (signal_pending(current)) {
@@ -698,7 +691,7 @@ long do_msgsnd(int msqid, long mtype, void __user *mtext,
 	msq->q_lspid = task_tgid_vnr(current);
 	msq->q_stime = get_seconds();
 
-	if (!pipelined_send(msq, msg)) {
+	if (!pipelined_send(msq, msg, &wake_q)) {
 		/* no one is waiting for this message, enqueue it */
 		list_add_tail(&msg->m_list, &msq->q_messages);
 		msq->q_cbytes += msgsz;
@@ -712,6 +705,7 @@ long do_msgsnd(int msqid, long mtype, void __user *mtext,
 
 out_unlock0:
 	ipc_unlock_object(&msq->q_perm);
+	wake_up_q(&wake_q);
 out_unlock1:
 	rcu_read_unlock();
 	if (msg != NULL)
@@ -919,71 +913,49 @@ long do_msgrcv(int msqid, void __user *buf, size_t bufsz, long msgtyp, int msgfl
 		rcu_read_unlock();
 		schedule();
 
-		/* Lockless receive, part 1:
-		 * Disable preemption.  We don't hold a reference to the queue
-		 * and getting a reference would defeat the idea of a lockless
-		 * operation, thus the code relies on rcu to guarantee the
-		 * existence of msq:
+		/*
+		 * Lockless receive, part 1:
+		 * We don't hold a reference to the queue and getting a
+		 * reference would defeat the idea of a lockless operation,
+		 * thus the code relies on rcu to guarantee the existence of
+		 * msq:
 		 * Prior to destruction, expunge_all(-EIRDM) changes r_msg.
 		 * Thus if r_msg is -EAGAIN, then the queue not yet destroyed.
-		 * rcu_read_lock() prevents preemption between reading r_msg
-		 * and acquiring the q_perm.lock in ipc_lock_object().
 		 */
 		rcu_read_lock();
 
-		/* Lockless receive, part 2:
-		 * Wait until pipelined_send or expunge_all are outside of
-		 * wake_up_process(). There is a race with exit(), see
-		 * ipc/mqueue.c for the details. The correct serialization
-		 * ensures that a receiver cannot continue without the wakeup
-		 * being visibible _before_ setting r_msg:
+		/*
+		 * Lockless receive, part 2:
+		 * The work in pipelined_send() and expunge_all():
+		 * - Set pointer to message
+		 * - Queue the receiver task for later wakeup
+		 * - Wake up the process after the lock is dropped.
 		 *
 		 * CPU 0                             CPU 1
 		 * <loop receiver>
 		 *   smp_rmb(); (A) <-- pair -.      <waker thread>
-		 *   <load ->r_msg>           |        msr->r_msg = NULL;
-		 *                            |        wake_up_process();
+		 *   <load ->r_msg>           |        msr->r_msg = msg;
 		 * <continue>                 `------> smp_wmb(); (B)
-		 *                                     msr->r_msg = msg;
+		 *                                     wake_up_process();
 		 *
 		 * Where (A) orders the message value read and where (B) orders
 		 * the write to the r_msg -- done in both pipelined_send and
 		 * expunge_all.
+		 * Should the process wake up before this wakeup (due to a
+		 * signal) it will either see the message and continue ...
 		 */
-		for (;;) {
-			/*
-			 * Pairs with writer barrier in pipelined_send
-			 * or expunge_all.
-			 */
-			smp_rmb(); /* barrier (A) */
-			msg = (struct msg_msg *)msr_d.r_msg;
-			if (msg)
-				break;
-
-			/*
-			 * The cpu_relax() call is a compiler barrier
-			 * which forces everything in this loop to be
-			 * re-loaded.
-			 */
-			cpu_relax();
-		}
-
-		/* Lockless receive, part 3:
-		 * If there is a message or an error then accept it without
-		 * locking.
-		 */
+		smp_rmb(); /* barrier (A) */
+		msg = READ_ONCE(msr_d.r_msg);
 		if (msg != ERR_PTR(-EAGAIN))
 			goto out_unlock1;
 
-		/* Lockless receive, part 3:
-		 * Acquire the queue spinlock.
-		 */
+		 /*
+		  * ... or see -EAGAIN, acquire the lock to check the message
+		  * again.
+		  */
 		ipc_lock_object(&msq->q_perm);
 
-		/* Lockless receive, part 4:
-		 * Repeat test after acquiring the spinlock.
-		 */
-		msg = (struct msg_msg *)msr_d.r_msg;
+		msg = msr_d.r_msg;
 		if (msg != ERR_PTR(-EAGAIN))
 			goto out_unlock0;
 
-- 
2.8.1

^ permalink raw reply related	[flat|nested] 5+ messages in thread

* Re: [PATCH v4] ipc/msg: Implement lockless pipelined wakeups
  2016-07-21  0:16 ` Davidlohr Bueso
  2016-07-21 10:07   ` [PATCH v5] " Sebastian Andrzej Siewior
@ 2016-07-21 10:23   ` Sebastian Andrzej Siewior
  2016-07-21 19:47     ` Davidlohr Bueso
  1 sibling, 1 reply; 5+ messages in thread
From: Sebastian Andrzej Siewior @ 2016-07-21 10:23 UTC (permalink / raw)
  To: Davidlohr Bueso
  Cc: linux-kernel, Peter Zijlstra, tglx, Manfred Spraul, Andrew Morton

* Davidlohr Bueso | 2016-07-20 17:16:12 [-0700]:

>Just as with expunge_all and the E2BIG case, could you remove that explicit
>barrier (B) and just rely on wake_q_add?

Just did. So we have just a smp_rmb() on the reader side and the
comment talks about smb_wmb() and at the spot where we should have the
smb_wmb we have a comment why we don't have one :)
For my understanding: we need that smp_rmb() to ensure that everything
past that cmpxchg() is visible on all other CPUs so we don't have the
wakeup before we r_msg reads != -EAGAIN, right?

>Thanks,
>Davidlohr

Sebastian

^ permalink raw reply	[flat|nested] 5+ messages in thread

* Re: [PATCH v4] ipc/msg: Implement lockless pipelined wakeups
  2016-07-21 10:23   ` [PATCH v4] " Sebastian Andrzej Siewior
@ 2016-07-21 19:47     ` Davidlohr Bueso
  0 siblings, 0 replies; 5+ messages in thread
From: Davidlohr Bueso @ 2016-07-21 19:47 UTC (permalink / raw)
  To: Sebastian Andrzej Siewior
  Cc: linux-kernel, Peter Zijlstra, tglx, Manfred Spraul, Andrew Morton

On Thu, 21 Jul 2016, Sebastian Andrzej Siewior wrote:

>* Davidlohr Bueso | 2016-07-20 17:16:12 [-0700]:
>
>>Just as with expunge_all and the E2BIG case, could you remove that explicit
>>barrier (B) and just rely on wake_q_add?
>
>Just did. So we have just a smp_rmb() on the reader side and the
>comment talks about smb_wmb() and at the spot where we should have the
>smb_wmb we have a comment why we don't have one :)
>For my understanding: we need that smp_rmb() to ensure that everything
>past that cmpxchg() is visible on all other CPUs so we don't have the
>wakeup before we r_msg reads != -EAGAIN, right?

Hmm I'm having second thoughts about the need for barrier (A). As you know,
originally we had it to prevent races with do_exit() from the receiver thread
if the r_msg was set before doing the wakeup, we could face a use-after-free
scenario.

Now, by delaying the wakeup, the receiver task should always see whatever r_msg
is set to by the waker, even if we get reordered with wake_q_add() as the
actual wakeup_process() does not occur yet, and hence the receiver is still
blocked while this is going on -- iow, we avoid entirely the need to explicitly
wait until pipelined_send/expunge_all are done.  Similarly, "barrier (B)" simply
serves to pair with wake_up_q() such that we don't miss wakeups, but that's
always handled by the wake_q machinery anyway.

So if this is the case (which is how you had it in some previous version of this
patch), we can get rid of the pair diagram altogether as well. Manfred, Peter, does
all this make sense to you guys?

Thanks,
Davidlohr

^ permalink raw reply	[flat|nested] 5+ messages in thread

end of thread, other threads:[~2016-07-21 19:47 UTC | newest]

Thread overview: 5+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2016-07-20 15:45 [PATCH v4] ipc/msg: Implement lockless pipelined wakeups Sebastian Andrzej Siewior
2016-07-21  0:16 ` Davidlohr Bueso
2016-07-21 10:07   ` [PATCH v5] " Sebastian Andrzej Siewior
2016-07-21 10:23   ` [PATCH v4] " Sebastian Andrzej Siewior
2016-07-21 19:47     ` Davidlohr Bueso

This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.