From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: (majordomo@vger.kernel.org) by vger.kernel.org via listexpand id S933077AbbD1DZT (ORCPT ); Mon, 27 Apr 2015 23:25:19 -0400 Received: from cantor2.suse.de ([195.135.220.15]:39063 "EHLO mx2.suse.de" rhost-flags-OK-OK-OK-OK) by vger.kernel.org with ESMTP id S932281AbbD1DZP (ORCPT ); Mon, 27 Apr 2015 23:25:15 -0400 Message-ID: <1430191493.2050.3.camel@stgolabs.net> Subject: Re: [PATCH v2] ipc/mqueue: remove STATE_PENDING From: Davidlohr Bueso To: Thomas Gleixner Cc: Sebastian Andrzej Siewior , Manfred Spraul , LKML , Peter Zijlstra , Ingo Molnar , Darren Hart , Steven Rostedt , fredrik.markstrom@windriver.com, "Paul E. McKenney" Date: Mon, 27 Apr 2015 20:24:53 -0700 In-Reply-To: References: <1428419030-20030-1-git-send-email-bigeasy@linutronix.de> <1428419030-20030-4-git-send-email-bigeasy@linutronix.de> <55241851.7060704@colorfullife.com> <20150410143726.GD3057@linutronix.de> Content-Type: text/plain; charset="UTF-8" X-Mailer: Evolution 3.12.11 Mime-Version: 1.0 Content-Transfer-Encoding: 7bit Sender: linux-kernel-owner@vger.kernel.org List-ID: X-Mailing-List: linux-kernel@vger.kernel.org On Fri, 2015-04-24 at 00:18 +0200, Thomas Gleixner wrote: > Can you please convert that over to Peters lockless wake queues so we > do not reimplement the same thing open coded here. So I'd like to include this in my v2 of the wake_q stuff, along with the futex conversion. What do you guys think of the following? Thanks, Davidlohr 8<------------------------------------------------------------- Subject: [PATCH] ipc/mqueue: lockless pipelined wakeups This patch moves the wakeup_process() invocation so it is not done under the info->lock by making use of a lockless wake_q. With this change, the waiter is woken up once it is STATE_READY and it does not need to loop on SMP if it is still in STATE_PENDING. In the timeout case we still need to grab the info->lock to verify the state. This change should also avoid the introduction of preempt_disable() in -RT which avoids a busy-loop which pools for the STATE_PENDING -> STATE_READY change if the waiter has a higher priority compared to the waker. Additionally, this patch micro-optimizes wq_sleep by using the cheaper cousin of set_current_state(TASK_INTERRUPTABLE) as we will block no matter what, thus get rid of the implied barrier. Secondly, and related to the lockless wakeups, comment the smp_wmb and add barrier pairing on the reader side. Based-on-work-from: Sebastian Andrzej Siewior Signed-off-by: Davidlohr Bueso --- ipc/mqueue.c | 52 +++++++++++++++++++++++++++++++++------------------- 1 file changed, 33 insertions(+), 19 deletions(-) diff --git a/ipc/mqueue.c b/ipc/mqueue.c index 3aaea7f..11c7b92 100644 --- a/ipc/mqueue.c +++ b/ipc/mqueue.c @@ -47,8 +47,7 @@ #define RECV 1 #define STATE_NONE 0 -#define STATE_PENDING 1 -#define STATE_READY 2 +#define STATE_READY 1 struct posix_msg_tree_node { struct rb_node rb_node; @@ -571,15 +570,13 @@ static int wq_sleep(struct mqueue_inode_info *info, int sr, wq_add(info, sr, ewp); for (;;) { - set_current_state(TASK_INTERRUPTIBLE); + __set_current_state(TASK_INTERRUPTIBLE); spin_unlock(&info->lock); time = schedule_hrtimeout_range_clock(timeout, 0, HRTIMER_MODE_ABS, CLOCK_REALTIME); - while (ewp->state == STATE_PENDING) - cpu_relax(); - + smp_rmb(); /* pairs with smp_wmb() in pipelined_send/receive */ if (ewp->state == STATE_READY) { retval = 0; goto out; @@ -909,9 +906,14 @@ out_name: * bypasses the message array and directly hands the message over to the * receiver. * The receiver accepts the message and returns without grabbing the queue - * spinlock. Therefore an intermediate STATE_PENDING state and memory barriers - * are necessary. The same algorithm is used for sysv semaphores, see - * ipc/sem.c for more details. + * spinlock. The used algorithm is different from sysv semaphores (ipc/sem.c): + * + * - Set pointer to message. + * - Queue the receiver task's for later wakeup (without the info->lock). + * - Update its state to STATE_READY. Now the receiver can continue. + * - Wake up the process after the lock is dropped. Should the process wake up + * before this wakeup (due to a timeout or a signal) it will either see + * STATE_READY and continue or acquire the lock to check the sate again. * * The same algorithm is used for senders. */ @@ -919,21 +921,29 @@ out_name: /* pipelined_send() - send a message directly to the task waiting in * sys_mq_timedreceive() (without inserting message into a queue). */ -static inline void pipelined_send(struct mqueue_inode_info *info, +static inline void pipelined_send(struct wake_q_head *wake_q, + struct mqueue_inode_info *info, struct msg_msg *message, struct ext_wait_queue *receiver) { receiver->msg = message; list_del(&receiver->list); - receiver->state = STATE_PENDING; - wake_up_process(receiver->task); - smp_wmb(); + wake_q_add(wake_q, receiver->task); + /* + * Ensure that updating receiver->state is the last + * write operation: As once set, the receiver can continue, + * and if we don't have the reference count from the wake_q, + * yet, at that point we can later have a use-after-free + * condition and bogus wakeup. + */ + smp_wmb(); /* pairs with smp_rmb() in wq_sleep */ receiver->state = STATE_READY; } /* pipelined_receive() - if there is task waiting in sys_mq_timedsend() * gets its message and put to the queue (we have one free place for sure). */ -static inline void pipelined_receive(struct mqueue_inode_info *info) +static inline void pipelined_receive(struct wake_q_head *wake_q, + struct mqueue_inode_info *info) { struct ext_wait_queue *sender = wq_get_first_waiter(info, SEND); @@ -944,10 +954,10 @@ static inline void pipelined_receive(struct mqueue_inode_info *info) } if (msg_insert(sender->msg, info)) return; + list_del(&sender->list); - sender->state = STATE_PENDING; - wake_up_process(sender->task); - smp_wmb(); + wake_q_add(wake_q, sender->task); + smp_wmb(); /* pairs with smp_rmb() in wq_sleep */ sender->state = STATE_READY; } @@ -965,6 +975,7 @@ SYSCALL_DEFINE5(mq_timedsend, mqd_t, mqdes, const char __user *, u_msg_ptr, struct timespec ts; struct posix_msg_tree_node *new_leaf = NULL; int ret = 0; + WAKE_Q(wake_q); if (u_abs_timeout) { int res = prepare_timeout(u_abs_timeout, &expires, &ts); @@ -1049,7 +1060,7 @@ SYSCALL_DEFINE5(mq_timedsend, mqd_t, mqdes, const char __user *, u_msg_ptr, } else { receiver = wq_get_first_waiter(info, RECV); if (receiver) { - pipelined_send(info, msg_ptr, receiver); + pipelined_send(&wake_q, info, msg_ptr, receiver); } else { /* adds message to the queue */ ret = msg_insert(msg_ptr, info); @@ -1062,6 +1073,7 @@ SYSCALL_DEFINE5(mq_timedsend, mqd_t, mqdes, const char __user *, u_msg_ptr, } out_unlock: spin_unlock(&info->lock); + wake_up_q(&wake_q); out_free: if (ret) free_msg(msg_ptr); @@ -1084,6 +1096,7 @@ SYSCALL_DEFINE5(mq_timedreceive, mqd_t, mqdes, char __user *, u_msg_ptr, ktime_t expires, *timeout = NULL; struct timespec ts; struct posix_msg_tree_node *new_leaf = NULL; + WAKE_Q(wake_q); if (u_abs_timeout) { int res = prepare_timeout(u_abs_timeout, &expires, &ts); @@ -1155,8 +1168,9 @@ SYSCALL_DEFINE5(mq_timedreceive, mqd_t, mqdes, char __user *, u_msg_ptr, CURRENT_TIME; /* There is now free space in queue. */ - pipelined_receive(info); + pipelined_receive(&wake_q, info); spin_unlock(&info->lock); + wake_up_q(&wake_q); ret = 0; } if (ret == 0) { -- 2.1.4