From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: (majordomo@vger.kernel.org) by vger.kernel.org via listexpand id S1757468AbbE3ADt (ORCPT ); Fri, 29 May 2015 20:03:49 -0400 Received: from smtp2.provo.novell.com ([137.65.250.81]:48384 "EHLO smtp2.provo.novell.com" rhost-flags-OK-OK-OK-OK) by vger.kernel.org with ESMTP id S1756906AbbE3ADf (ORCPT ); Fri, 29 May 2015 20:03:35 -0400 From: Davidlohr Bueso To: Andrew Morton Cc: Manfred Spraul , dave@stgolabs.net, linux-kernel@vger.kernel.org, Davidlohr Bueso Subject: [PATCH 2/2] ipc,msg: provide barrier pairings for lockless receive Date: Fri, 29 May 2015 17:03:06 -0700 Message-Id: <1432944186-7305-2-git-send-email-dave@stgolabs.net> X-Mailer: git-send-email 2.1.4 In-Reply-To: <1432944186-7305-1-git-send-email-dave@stgolabs.net> References: <1432944186-7305-1-git-send-email-dave@stgolabs.net> Sender: linux-kernel-owner@vger.kernel.org List-ID: X-Mailing-List: linux-kernel@vger.kernel.org We currently use a full barrier on the sender side to to avoid receiver tasks disappearing on us while still performing on the sender side wakeup. We lack however, the proper CPU-CPU interactions pairing on the receiver side which busy-waits for the message. Similarly, we do not need a full smp_mb, and can relax the semantics for the writer and reader sides of the message. This is safe as we are only ordering loads and stores to r_msg. And in both smp_wmb and smp_rmb, there are no stores after the calls _anyway_. This obviously applies for pipelined_send and expunge_all, for EIRDM when destroying a queue. Signed-off-by: Davidlohr Bueso --- ipc/msg.c | 31 ++++++++++++++++++++++--------- 1 file changed, 22 insertions(+), 9 deletions(-) diff --git a/ipc/msg.c b/ipc/msg.c index 2b6fdbb..ac5116e 100644 --- a/ipc/msg.c +++ b/ipc/msg.c @@ -196,7 +196,7 @@ static void expunge_all(struct msg_queue *msq, int res) * or dealing with -EAGAIN cases. See lockless receive part 1 * and 2 in do_msgrcv(). */ - smp_mb(); + smp_wmb(); msr->r_msg = ERR_PTR(res); } } @@ -580,7 +580,7 @@ static inline int pipelined_send(struct msg_queue *msq, struct msg_msg *msg) /* initialize pipelined send ordering */ msr->r_msg = NULL; wake_up_process(msr->r_tsk); - smp_mb(); /* see barrier comment below */ + smp_wmb(); /* see barrier comment below */ msr->r_msg = ERR_PTR(-E2BIG); } else { msr->r_msg = NULL; @@ -589,11 +589,12 @@ static inline int pipelined_send(struct msg_queue *msq, struct msg_msg *msg) wake_up_process(msr->r_tsk); /* * Ensure that the wakeup is visible before - * setting r_msg, as the receiving end depends - * on it. See lockless receive part 1 and 2 in - * do_msgrcv(). + * setting r_msg, as the receiving can otherwise + * exit - once r_msg is set, the receiver can + * continue. See lockless receive part 1 and 2 + * in do_msgrcv(). */ - smp_mb(); + smp_wmb(); msr->r_msg = msg; return 1; @@ -934,10 +935,22 @@ long do_msgrcv(int msqid, void __user *buf, size_t bufsz, long msgtyp, int msgfl * wake_up_process(). There is a race with exit(), see * ipc/mqueue.c for the details. */ - msg = (struct msg_msg *)msr_d.r_msg; - while (msg == NULL) { - cpu_relax(); + for (;;) { + /* + * Pairs with writer barrier in pipelined_send + * or expunge_all + */ + smp_rmb(); msg = (struct msg_msg *)msr_d.r_msg; + if (msg) + break; + + /* + * The cpu_relax() call is a compiler barrier + * which forces everything in this loop to be + * re-loaded. + */ + cpu_relax(); } /* Lockless receive, part 3: -- 2.1.4