linux-kernel.vger.kernel.org archive mirror
 help / color / mirror / Atom feed
* [PATCH v2] ipc/msg: Implement lockless pipelined wakeups
@ 2015-11-03 15:03 Sebastian Andrzej Siewior
  2015-11-03 17:30 ` Davidlohr Bueso
                   ` (2 more replies)
  0 siblings, 3 replies; 7+ messages in thread
From: Sebastian Andrzej Siewior @ 2015-11-03 15:03 UTC (permalink / raw)
  To: linux-kernel
  Cc: George Spelvin, Thomas Gleixner, Peter Zijlstra,
	Sebastian Andrzej Siewior, Davidlohr Bueso, Manfred Spraul,
	Andrew Morton

This patch moves the wakeup_process() invocation so it is not done under
the perm->lock by making use of a lockless wake_q. With this change, the
waiter is woken up once the message has been assigned and it does not
need to loop on SMP if the message points to NULL. In the signal case we
still need to check the pointer under the lock to verify the state.

This change should also avoid the introduction of preempt_disable() in
-RT which avoids a busy-loop which pools for the NULL -> !NULL
change if the waiter has a higher priority compared to the waker.

This has been tested with Manred's pmsg-shared tool on a "AMD A10-7800
Radeon R7, 12 Compute Cores 4C+8G":

test             |   before   |   after    | diff
-----------------|------------|------------|----------
pmsg-shared 8 60 | 52,386,363 | 56,455,763 | + ~7.77 %
pmsg-shared 2 60 | 69,200,531 | 73,346,420 | + ~5.99 %

v1…v2:
	- msg_receiver.r_msg is no longer volatile. After all we no
	  longer have that busy loop.
	- added a comment while we do wake_q_add() followed by the
	  assignment of ->r_msg  and not the other way around.

Cc: Davidlohr Bueso <dave@stgolabs.net>
Cc: Manfred Spraul <manfred@colorfullife.com>
Cc: Andrew Morton <akpm@linux-foundation.org>
Signed-off-by: Sebastian Andrzej Siewior <bigeasy@linutronix.de>
---
 ipc/msg.c | 120 +++++++++++++++++++++-----------------------------------------
 1 file changed, 40 insertions(+), 80 deletions(-)

diff --git a/ipc/msg.c b/ipc/msg.c
index 1471db9a7e61..e85f51aa416f 100644
--- a/ipc/msg.c
+++ b/ipc/msg.c
@@ -51,13 +51,7 @@ struct msg_receiver {
 	long			r_msgtype;
 	long			r_maxsize;
 
-	/*
-	 * Mark r_msg volatile so that the compiler
-	 * does not try to get smart and optimize
-	 * it. We rely on this for the lockless
-	 * receive algorithm.
-	 */
-	struct msg_msg		*volatile r_msg;
+	struct msg_msg		*r_msg;
 };
 
 /* one msg_sender for each sleeping sender */
@@ -183,20 +177,14 @@ static void ss_wakeup(struct list_head *h, int kill)
 	}
 }
 
-static void expunge_all(struct msg_queue *msq, int res)
+static void expunge_all(struct msg_queue *msq, int res,
+			struct wake_q_head *wake_q)
 {
 	struct msg_receiver *msr, *t;
 
 	list_for_each_entry_safe(msr, t, &msq->q_receivers, r_list) {
-		msr->r_msg = NULL; /* initialize expunge ordering */
-		wake_up_process(msr->r_tsk);
-		/*
-		 * Ensure that the wakeup is visible before setting r_msg as
-		 * the receiving end depends on it: either spinning on a nil,
-		 * or dealing with -EAGAIN cases. See lockless receive part 1
-		 * and 2 in do_msgrcv().
-		 */
-		smp_wmb(); /* barrier (B) */
+
+		wake_q_add(wake_q, msr->r_tsk);
 		msr->r_msg = ERR_PTR(res);
 	}
 }
@@ -213,11 +201,13 @@ static void freeque(struct ipc_namespace *ns, struct kern_ipc_perm *ipcp)
 {
 	struct msg_msg *msg, *t;
 	struct msg_queue *msq = container_of(ipcp, struct msg_queue, q_perm);
+	WAKE_Q(wake_q);
 
-	expunge_all(msq, -EIDRM);
+	expunge_all(msq, -EIDRM, &wake_q);
 	ss_wakeup(&msq->q_senders, 1);
 	msg_rmid(ns, msq);
 	ipc_unlock_object(&msq->q_perm);
+	wake_up_q(&wake_q);
 	rcu_read_unlock();
 
 	list_for_each_entry_safe(msg, t, &msq->q_messages, m_list) {
@@ -342,6 +332,7 @@ static int msgctl_down(struct ipc_namespace *ns, int msqid, int cmd,
 	struct kern_ipc_perm *ipcp;
 	struct msqid64_ds uninitialized_var(msqid64);
 	struct msg_queue *msq;
+	WAKE_Q(wake_q);
 	int err;
 
 	if (cmd == IPC_SET) {
@@ -389,7 +380,7 @@ static int msgctl_down(struct ipc_namespace *ns, int msqid, int cmd,
 		/* sleeping receivers might be excluded by
 		 * stricter permissions.
 		 */
-		expunge_all(msq, -EAGAIN);
+		expunge_all(msq, -EAGAIN, &wake_q);
 		/* sleeping senders might be able to send
 		 * due to a larger queue size.
 		 */
@@ -402,6 +393,7 @@ static int msgctl_down(struct ipc_namespace *ns, int msqid, int cmd,
 
 out_unlock0:
 	ipc_unlock_object(&msq->q_perm);
+	wake_up_q(&wake_q);
 out_unlock1:
 	rcu_read_unlock();
 out_up:
@@ -566,7 +558,8 @@ static int testmsg(struct msg_msg *msg, long type, int mode)
 	return 0;
 }
 
-static inline int pipelined_send(struct msg_queue *msq, struct msg_msg *msg)
+static inline int pipelined_send(struct msg_queue *msq, struct msg_msg *msg,
+				 struct wake_q_head *wake_q)
 {
 	struct msg_receiver *msr, *t;
 
@@ -577,26 +570,23 @@ static inline int pipelined_send(struct msg_queue *msq, struct msg_msg *msg)
 
 			list_del(&msr->r_list);
 			if (msr->r_maxsize < msg->m_ts) {
-				/* initialize pipelined send ordering */
-				msr->r_msg = NULL;
-				wake_up_process(msr->r_tsk);
-				/* barrier (B) see barrier comment below */
-				smp_wmb();
+				wake_q_add(wake_q, msr->r_tsk);
 				msr->r_msg = ERR_PTR(-E2BIG);
 			} else {
-				msr->r_msg = NULL;
 				msq->q_lrpid = task_pid_vnr(msr->r_tsk);
 				msq->q_rtime = get_seconds();
-				wake_up_process(msr->r_tsk);
-				/*
-				 * Ensure that the wakeup is visible before
-				 * setting r_msg, as the receiving can otherwise
-				 * exit - once r_msg is set, the receiver can
-				 * continue. See lockless receive part 1 and 2
-				 * in do_msgrcv(). Barrier (B).
-				 */
-				smp_wmb();
+				wake_q_add(wake_q, msr->r_tsk);
 				msr->r_msg = msg;
+				/*
+				 * Rely on the implicit cmpxchg barrier from
+				 * wake_q_add such that we can ensure that
+				 * updating msr->r_msg is the last write
+				 * operation: As once set, the receiver can
+				 * continue, and if we don't have the reference
+				 * count from the wake_q, yet, at that point we
+				 * can later have a use-after-free condition and
+				 * bogus wakeup.
+				 */
 
 				return 1;
 			}
@@ -613,6 +603,7 @@ long do_msgsnd(int msqid, long mtype, void __user *mtext,
 	struct msg_msg *msg;
 	int err;
 	struct ipc_namespace *ns;
+	WAKE_Q(wake_q);
 
 	ns = current->nsproxy->ipc_ns;
 
@@ -698,7 +689,7 @@ long do_msgsnd(int msqid, long mtype, void __user *mtext,
 	msq->q_lspid = task_tgid_vnr(current);
 	msq->q_stime = get_seconds();
 
-	if (!pipelined_send(msq, msg)) {
+	if (!pipelined_send(msq, msg, &wake_q)) {
 		/* no one is waiting for this message, enqueue it */
 		list_add_tail(&msg->m_list, &msq->q_messages);
 		msq->q_cbytes += msgsz;
@@ -712,6 +703,7 @@ long do_msgsnd(int msqid, long mtype, void __user *mtext,
 
 out_unlock0:
 	ipc_unlock_object(&msq->q_perm);
+	wake_up_q(&wake_q);
 out_unlock1:
 	rcu_read_unlock();
 	if (msg != NULL)
@@ -932,58 +924,26 @@ long do_msgrcv(int msqid, void __user *buf, size_t bufsz, long msgtyp, int msgfl
 		rcu_read_lock();
 
 		/* Lockless receive, part 2:
-		 * Wait until pipelined_send or expunge_all are outside of
-		 * wake_up_process(). There is a race with exit(), see
-		 * ipc/mqueue.c for the details. The correct serialization
-		 * ensures that a receiver cannot continue without the wakeup
-		 * being visibible _before_ setting r_msg:
+		 * The work in pipelined_send() and expunge_all():
+		 * - Set pointer to message
+		 * - Queue the receiver task for later wakeup
+		 * - Wake up the process after the lock is dropped.
 		 *
-		 * CPU 0                             CPU 1
-		 * <loop receiver>
-		 *   smp_rmb(); (A) <-- pair -.      <waker thread>
-		 *   <load ->r_msg>           |        msr->r_msg = NULL;
-		 *                            |        wake_up_process();
-		 * <continue>                 `------> smp_wmb(); (B)
-		 *                                     msr->r_msg = msg;
-		 *
-		 * Where (A) orders the message value read and where (B) orders
-		 * the write to the r_msg -- done in both pipelined_send and
-		 * expunge_all.
+		 * Should the process wake up before this wakeup (due to a
+		 * signal) it will either see the message and continue ...
 		 */
-		for (;;) {
-			/*
-			 * Pairs with writer barrier in pipelined_send
-			 * or expunge_all.
-			 */
-			smp_rmb(); /* barrier (A) */
-			msg = (struct msg_msg *)msr_d.r_msg;
-			if (msg)
-				break;
 
-			/*
-			 * The cpu_relax() call is a compiler barrier
-			 * which forces everything in this loop to be
-			 * re-loaded.
-			 */
-			cpu_relax();
-		}
-
-		/* Lockless receive, part 3:
-		 * If there is a message or an error then accept it without
-		 * locking.
-		 */
+		msg = msr_d.r_msg;
 		if (msg != ERR_PTR(-EAGAIN))
 			goto out_unlock1;
 
-		/* Lockless receive, part 3:
-		 * Acquire the queue spinlock.
-		 */
+		 /*
+		  * ... or see -EAGAIN, acquire the lock to check the message
+		  * again.
+		  */
 		ipc_lock_object(&msq->q_perm);
 
-		/* Lockless receive, part 4:
-		 * Repeat test after acquiring the spinlock.
-		 */
-		msg = (struct msg_msg *)msr_d.r_msg;
+		msg = msr_d.r_msg;
 		if (msg != ERR_PTR(-EAGAIN))
 			goto out_unlock0;
 
-- 
2.6.2


^ permalink raw reply related	[flat|nested] 7+ messages in thread

* Re: [PATCH v2] ipc/msg: Implement lockless pipelined wakeups
  2015-11-03 15:03 [PATCH v2] ipc/msg: Implement lockless pipelined wakeups Sebastian Andrzej Siewior
@ 2015-11-03 17:30 ` Davidlohr Bueso
  2015-11-03 20:12   ` Sebastian Andrzej Siewior
  2015-11-04 11:55 ` Peter Zijlstra
  2015-11-04 12:02 ` Peter Zijlstra
  2 siblings, 1 reply; 7+ messages in thread
From: Davidlohr Bueso @ 2015-11-03 17:30 UTC (permalink / raw)
  To: Sebastian Andrzej Siewior
  Cc: linux-kernel, George Spelvin, Thomas Gleixner, Peter Zijlstra,
	Manfred Spraul, Andrew Morton

On Tue, 03 Nov 2015, Sebastian Andrzej Siewior wrote:

>@@ -577,26 +570,23 @@ static inline int pipelined_send(struct msg_queue *msq, struct msg_msg *msg)
>
> 			list_del(&msr->r_list);
> 			if (msr->r_maxsize < msg->m_ts) {
>-				/* initialize pipelined send ordering */
>-				msr->r_msg = NULL;
>-				wake_up_process(msr->r_tsk);
>-				/* barrier (B) see barrier comment below */
>-				smp_wmb();
>+				wake_q_add(wake_q, msr->r_tsk);
> 				msr->r_msg = ERR_PTR(-E2BIG);
> 			} else {
>-				msr->r_msg = NULL;
> 				msq->q_lrpid = task_pid_vnr(msr->r_tsk);
> 				msq->q_rtime = get_seconds();
>-				wake_up_process(msr->r_tsk);
>-				/*
>-				 * Ensure that the wakeup is visible before
>-				 * setting r_msg, as the receiving can otherwise
>-				 * exit - once r_msg is set, the receiver can
>-				 * continue. See lockless receive part 1 and 2
>-				 * in do_msgrcv(). Barrier (B).
>-				 */
>-				smp_wmb();
>+				wake_q_add(wake_q, msr->r_tsk);
> 				msr->r_msg = msg;
>+				/*
>+				 * Rely on the implicit cmpxchg barrier from
>+				 * wake_q_add such that we can ensure that
>+				 * updating msr->r_msg is the last write
>+				 * operation: As once set, the receiver can
>+				 * continue, and if we don't have the reference
>+				 * count from the wake_q, yet, at that point we
>+				 * can later have a use-after-free condition and
>+				 * bogus wakeup.
>+				 */

Not sure why you placed the comment here. Why not between smp_wmb() and the r_msg
write as we have it?

You might also want to add a reference to this comment in expunge_all(), which
does the same thing.

> [...]
>
> 		/* Lockless receive, part 2:
>-		 * Wait until pipelined_send or expunge_all are outside of
>-		 * wake_up_process(). There is a race with exit(), see
>-		 * ipc/mqueue.c for the details. The correct serialization
>-		 * ensures that a receiver cannot continue without the wakeup
>-		 * being visibible _before_ setting r_msg:
>+		 * The work in pipelined_send() and expunge_all():
>+		 * - Set pointer to message
>+		 * - Queue the receiver task for later wakeup
>+		 * - Wake up the process after the lock is dropped.
> 		 *
>-		 * CPU 0                             CPU 1
>-		 * <loop receiver>
>-		 *   smp_rmb(); (A) <-- pair -.      <waker thread>
>-		 *   <load ->r_msg>           |        msr->r_msg = NULL;
>-		 *                            |        wake_up_process();
>-		 * <continue>                 `------> smp_wmb(); (B)
>-		 *                                     msr->r_msg = msg;
>-		 *
>-		 * Where (A) orders the message value read and where (B) orders
>-		 * the write to the r_msg -- done in both pipelined_send and
>-		 * expunge_all.
>+		 * Should the process wake up before this wakeup (due to a
>+		 * signal) it will either see the message and continue ...
> 		 */
>-		for (;;) {
>-			/*
>-			 * Pairs with writer barrier in pipelined_send
>-			 * or expunge_all.
>-			 */
>-			smp_rmb(); /* barrier (A) */
>-			msg = (struct msg_msg *)msr_d.r_msg;
>-			if (msg)
>-				break;
>
>-			/*
>-			 * The cpu_relax() call is a compiler barrier
>-			 * which forces everything in this loop to be
>-			 * re-loaded.
>-			 */
>-			cpu_relax();
>-		}
>-
>-		/* Lockless receive, part 3:
>-		 * If there is a message or an error then accept it without
>-		 * locking.
>-		 */
>+		msg = msr_d.r_msg;

But you're getting rid of the barrier pairing (smp_rmb) we have in pipelined sends
and expunge_all, which is necesary even if we don't busy wait on nil. Likewise,
there's no need to remove the comment above that illustrates this.

Thanks,
Davidlohr

^ permalink raw reply	[flat|nested] 7+ messages in thread

* Re: [PATCH v2] ipc/msg: Implement lockless pipelined wakeups
  2015-11-03 17:30 ` Davidlohr Bueso
@ 2015-11-03 20:12   ` Sebastian Andrzej Siewior
  2015-11-04 18:11     ` Davidlohr Bueso
  0 siblings, 1 reply; 7+ messages in thread
From: Sebastian Andrzej Siewior @ 2015-11-03 20:12 UTC (permalink / raw)
  To: Davidlohr Bueso
  Cc: linux-kernel, George Spelvin, Thomas Gleixner, Peter Zijlstra,
	Manfred Spraul, Andrew Morton

On 11/03/2015 06:30 PM, Davidlohr Bueso wrote:
> On Tue, 03 Nov 2015, Sebastian Andrzej Siewior wrote:
> 
>> @@ -577,26 +570,23 @@ static inline int pipelined_send(struct
>> msg_queue *msq, struct msg_msg *msg)
>>
>>             list_del(&msr->r_list);
>>             if (msr->r_maxsize < msg->m_ts) {
>> -                /* initialize pipelined send ordering */
>> -                msr->r_msg = NULL;
>> -                wake_up_process(msr->r_tsk);
>> -                /* barrier (B) see barrier comment below */
>> -                smp_wmb();
>> +                wake_q_add(wake_q, msr->r_tsk);
>>                 msr->r_msg = ERR_PTR(-E2BIG);
>>             } else {
>> -                msr->r_msg = NULL;
>>                 msq->q_lrpid = task_pid_vnr(msr->r_tsk);
>>                 msq->q_rtime = get_seconds();
>> -                wake_up_process(msr->r_tsk);
>> -                /*
>> -                 * Ensure that the wakeup is visible before
>> -                 * setting r_msg, as the receiving can otherwise
>> -                 * exit - once r_msg is set, the receiver can
>> -                 * continue. See lockless receive part 1 and 2
>> -                 * in do_msgrcv(). Barrier (B).
>> -                 */
>> -                smp_wmb();
>> +                wake_q_add(wake_q, msr->r_tsk);
>>                 msr->r_msg = msg;
>> +                /*
>> +                 * Rely on the implicit cmpxchg barrier from
>> +                 * wake_q_add such that we can ensure that
>> +                 * updating msr->r_msg is the last write
>> +                 * operation: As once set, the receiver can
>> +                 * continue, and if we don't have the reference
>> +                 * count from the wake_q, yet, at that point we
>> +                 * can later have a use-after-free condition and
>> +                 * bogus wakeup.
>> +                 */
> 
> Not sure why you placed the comment here. Why not between smp_wmb() and
> the r_msg
> write as we have it?

This follows the scheme we have in pipelined_send(). First wake_q_add()
then ->state (here we have the msg instead).

> 
> You might also want to add a reference to this comment in expunge_all(),
> which
> does the same thing.

okay.

>> [...]
>>
>>         /* Lockless receive, part 2:
>> -         * Wait until pipelined_send or expunge_all are outside of
>> -         * wake_up_process(). There is a race with exit(), see
>> -         * ipc/mqueue.c for the details. The correct serialization
>> -         * ensures that a receiver cannot continue without the wakeup
>> -         * being visibible _before_ setting r_msg:
>> +         * The work in pipelined_send() and expunge_all():
>> +         * - Set pointer to message
>> +         * - Queue the receiver task for later wakeup
>> +         * - Wake up the process after the lock is dropped.
>>          *
>> -         * CPU 0                             CPU 1
>> -         * <loop receiver>
>> -         *   smp_rmb(); (A) <-- pair -.      <waker thread>
>> -         *   <load ->r_msg>           |        msr->r_msg = NULL;
>> -         *                            |        wake_up_process();
>> -         * <continue>                 `------> smp_wmb(); (B)
>> -         *                                     msr->r_msg = msg;
>> -         *
>> -         * Where (A) orders the message value read and where (B) orders
>> -         * the write to the r_msg -- done in both pipelined_send and
>> -         * expunge_all.
>> +         * Should the process wake up before this wakeup (due to a
>> +         * signal) it will either see the message and continue ...
>>          */
>> -        for (;;) {
>> -            /*
>> -             * Pairs with writer barrier in pipelined_send
>> -             * or expunge_all.
>> -             */
>> -            smp_rmb(); /* barrier (A) */
>> -            msg = (struct msg_msg *)msr_d.r_msg;
>> -            if (msg)
>> -                break;
>>
>> -            /*
>> -             * The cpu_relax() call is a compiler barrier
>> -             * which forces everything in this loop to be
>> -             * re-loaded.
>> -             */
>> -            cpu_relax();
>> -        }
>> -
>> -        /* Lockless receive, part 3:
>> -         * If there is a message or an error then accept it without
>> -         * locking.
>> -         */
>> +        msg = msr_d.r_msg;
> 
> But you're getting rid of the barrier pairing (smp_rmb) we have in
> pipelined sends
> and expunge_all, which is necesary even if we don't busy wait on nil.

In pipelined_receive() (mqueue) there is the wake_q_add() with the
implicit cmpxchg barrier. The matching barrier pairing should be in
wq_sleep() but there is none. Why is it okay to have none there and I
need one here?

> Likewise,
> there's no need to remove the comment above that illustrates this.

I did not assume we need a barrier here. If we do, I keep it in the
comment / graphic but right I now, I think that it can go.

> Thanks,
> Davidlohr

Sebastian

^ permalink raw reply	[flat|nested] 7+ messages in thread

* Re: [PATCH v2] ipc/msg: Implement lockless pipelined wakeups
  2015-11-03 15:03 [PATCH v2] ipc/msg: Implement lockless pipelined wakeups Sebastian Andrzej Siewior
  2015-11-03 17:30 ` Davidlohr Bueso
@ 2015-11-04 11:55 ` Peter Zijlstra
  2015-11-04 17:32   ` Davidlohr Bueso
  2015-11-04 12:02 ` Peter Zijlstra
  2 siblings, 1 reply; 7+ messages in thread
From: Peter Zijlstra @ 2015-11-04 11:55 UTC (permalink / raw)
  To: Sebastian Andrzej Siewior
  Cc: linux-kernel, George Spelvin, Thomas Gleixner, Davidlohr Bueso,
	Manfred Spraul, Andrew Morton

On Tue, Nov 03, 2015 at 04:03:29PM +0100, Sebastian Andrzej Siewior wrote:

> -	struct msg_msg		*volatile r_msg;
> +	struct msg_msg		*r_msg;

> +				wake_q_add(wake_q, msr->r_tsk);
>  				msr->r_msg = msg;
> +				/*
> +				 * Rely on the implicit cmpxchg barrier from
> +				 * wake_q_add such that we can ensure that

Davidlohr, didn't you want to make that cmpxchg_relaxed() or
cmpxchg_release() ?

> +				 * updating msr->r_msg is the last write
> +				 * operation: As once set, the receiver can
> +				 * continue, and if we don't have the reference

Which seems to suggest you want to at least make that WRITE_ONCE()

> +				 * count from the wake_q, yet, at that point we
> +				 * can later have a use-after-free condition and
> +				 * bogus wakeup.
> +				 */

And I agree with DAvidlohr that the comment is placed oddly, one would
expect it between wake_q_add() and the store in question.



^ permalink raw reply	[flat|nested] 7+ messages in thread

* Re: [PATCH v2] ipc/msg: Implement lockless pipelined wakeups
  2015-11-03 15:03 [PATCH v2] ipc/msg: Implement lockless pipelined wakeups Sebastian Andrzej Siewior
  2015-11-03 17:30 ` Davidlohr Bueso
  2015-11-04 11:55 ` Peter Zijlstra
@ 2015-11-04 12:02 ` Peter Zijlstra
  2 siblings, 0 replies; 7+ messages in thread
From: Peter Zijlstra @ 2015-11-04 12:02 UTC (permalink / raw)
  To: Sebastian Andrzej Siewior
  Cc: linux-kernel, George Spelvin, Thomas Gleixner, Davidlohr Bueso,
	Manfred Spraul, Andrew Morton

On Tue, Nov 03, 2015 at 04:03:29PM +0100, Sebastian Andrzej Siewior wrote:
> @@ -932,58 +924,26 @@ long do_msgrcv(int msqid, void __user *buf, size_t bufsz, long msgtyp, int msgfl
>  		rcu_read_lock();
>  
>  		/* Lockless receive, part 2:

This is a broken comment style, please fix that while you're there
anyway.

Also, the comment above ("Lockless receive, part 1:") is broken
too, not only in style, but it refers to rcu_read_unlock() as disabling
preemption, and avoiding preemption, which is false.

> +		 * The work in pipelined_send() and expunge_all():
> +		 * - Set pointer to message
> +		 * - Queue the receiver task for later wakeup
> +		 * - Wake up the process after the lock is dropped.
>  		 *
> +		 * Should the process wake up before this wakeup (due to a
> +		 * signal) it will either see the message and continue ...
>  		 */
>  
> +		msg = msr_d.r_msg;

Since this is a lockless read, it should very much be READ_ONCE(), esp.
since you killed the volatile on its type.

^ permalink raw reply	[flat|nested] 7+ messages in thread

* Re: [PATCH v2] ipc/msg: Implement lockless pipelined wakeups
  2015-11-04 11:55 ` Peter Zijlstra
@ 2015-11-04 17:32   ` Davidlohr Bueso
  0 siblings, 0 replies; 7+ messages in thread
From: Davidlohr Bueso @ 2015-11-04 17:32 UTC (permalink / raw)
  To: Peter Zijlstra
  Cc: Sebastian Andrzej Siewior, linux-kernel, George Spelvin,
	Thomas Gleixner, Manfred Spraul, Andrew Morton

On Wed, 04 Nov 2015, Peter Zijlstra wrote:

>On Tue, Nov 03, 2015 at 04:03:29PM +0100, Sebastian Andrzej Siewior wrote:
>
>> -	struct msg_msg		*volatile r_msg;
>> +	struct msg_msg		*r_msg;
>
>> +				wake_q_add(wake_q, msr->r_tsk);
>>  				msr->r_msg = msg;
>> +				/*
>> +				 * Rely on the implicit cmpxchg barrier from
>> +				 * wake_q_add such that we can ensure that
>
>Davidlohr, didn't you want to make that cmpxchg_relaxed() or
>cmpxchg_release() ?

Right, I had forgotten about that. iirc we ended up deciding to fully
relax the cmpxchg and update the callers instead.

^ permalink raw reply	[flat|nested] 7+ messages in thread

* Re: [PATCH v2] ipc/msg: Implement lockless pipelined wakeups
  2015-11-03 20:12   ` Sebastian Andrzej Siewior
@ 2015-11-04 18:11     ` Davidlohr Bueso
  0 siblings, 0 replies; 7+ messages in thread
From: Davidlohr Bueso @ 2015-11-04 18:11 UTC (permalink / raw)
  To: Sebastian Andrzej Siewior
  Cc: linux-kernel, George Spelvin, Thomas Gleixner, Peter Zijlstra,
	Manfred Spraul, Andrew Morton

On Tue, 03 Nov 2015, Sebastian Andrzej Siewior wrote:

>In pipelined_receive() (mqueue) there is the wake_q_add() with the
>implicit cmpxchg barrier. The matching barrier pairing should be in
>wq_sleep() but there is none. Why is it okay to have none there and I
>need one here?

So the pairing in mqueue is done in wq_sleep() as we unlock info->lock,
so there's an implied barrier there.

Also, considering Peter's suggestion to wrap around READ/WRITE_ONCE
for sysv queues (specially since you got rid of the volatile tag),
it seems we also need them for mqueues.

Thanks,
Davidlohr

^ permalink raw reply	[flat|nested] 7+ messages in thread

end of thread, other threads:[~2015-11-04 18:11 UTC | newest]

Thread overview: 7+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2015-11-03 15:03 [PATCH v2] ipc/msg: Implement lockless pipelined wakeups Sebastian Andrzej Siewior
2015-11-03 17:30 ` Davidlohr Bueso
2015-11-03 20:12   ` Sebastian Andrzej Siewior
2015-11-04 18:11     ` Davidlohr Bueso
2015-11-04 11:55 ` Peter Zijlstra
2015-11-04 17:32   ` Davidlohr Bueso
2015-11-04 12:02 ` Peter Zijlstra

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).