LKML Archive on lore.kernel.org
 help / Atom feed
* [PATCH 0/5] ipc/msg: Sender/receiver optimizations
@ 2016-07-28 23:33 Davidlohr Bueso
  2016-07-28 23:33 ` [PATCH 1/5] ipc/msg: Implement lockless pipelined wakeups Davidlohr Bueso
                   ` (5 more replies)
  0 siblings, 6 replies; 9+ messages in thread
From: Davidlohr Bueso @ 2016-07-28 23:33 UTC (permalink / raw)
  To: akpm; +Cc: manfred, bigeasy, peterz, tglx, dave, linux-kernel

Hi,

I'm resending Sebastian's sysv msg queue use of wake_qs but updated
to the last observations I need wrt the need of explicit barriers
after removing the whole receiver busy-looping. After some irc exchange
it seems we're both on the same page, and things now look like he had
them earlier, in v2. This is all patch 1.

The rest of the patches are changes I noticed while reviewing patch 1,
which are mainly sender-side rework/optimizations. Details are in each
changelog.

The changes have survived ltp (which has some nasty corner cases for msgsnd
changes), as well as pmsg-shared benchmark.

Applies on Linus's latest - please consider for v4.9.

Thanks!

  ipc/msg: Implement lockless pipelined wakeups
  ipc/msg: Batch queue sender wakeups
  ipc/msg: Make ss_wakeup() kill arg boolean
  ipc/msg: Lockless security checks for msgsnd
  ipc/msg: Avoid waking sender upon full queue

 ipc/msg.c | 210 ++++++++++++++++++++++++++++++--------------------------------
 1 file changed, 101 insertions(+), 109 deletions(-)

-- 
2.6.6

^ permalink raw reply	[flat|nested] 9+ messages in thread

* [PATCH 1/5] ipc/msg: Implement lockless pipelined wakeups
  2016-07-28 23:33 [PATCH 0/5] ipc/msg: Sender/receiver optimizations Davidlohr Bueso
@ 2016-07-28 23:33 ` Davidlohr Bueso
  2016-07-28 23:33 ` [PATCH 2/5] ipc/msg: Batch queue sender wakeups Davidlohr Bueso
                   ` (4 subsequent siblings)
  5 siblings, 0 replies; 9+ messages in thread
From: Davidlohr Bueso @ 2016-07-28 23:33 UTC (permalink / raw)
  To: akpm; +Cc: manfred, bigeasy, peterz, tglx, dave, linux-kernel, Davidlohr Bueso

From: Sebastian Andrzej Siewior <bigeasy@linutronix.de>

This patch moves the wakeup_process() invocation so it is not done under
the ipc global lock by making use of a lockless wake_q. With this change,
the waiter is woken up once the message has been assigned and it does not
need to loop on SMP if the message points to NULL. In the signal case we
still need to check the pointer under the lock to verify the state.

This change should also avoid the introduction of preempt_disable() in
-RT which avoids a busy-loop which pools for the NULL -> !NULL
change if the waiter has a higher priority compared to the waker.

By making use of wake_qs, the logic of sysv msg queues is greatly
simplified (and very well suited as we can batch lockless wakeups),
particularly around the lockless receive algorithm.

This has been tested with Manred's pmsg-shared tool on a "AMD A10-7800
Radeon R7, 12 Compute Cores 4C+8G":

test             |   before   |   after    | diff
-----------------|------------|------------|----------
pmsg-shared 8 60 | 19,347,422 | 30,442,191 | + ~57.34 %
pmsg-shared 4 60 | 21,367,197 | 35,743,458 | + ~67.28 %
pmsg-shared 2 60 | 22,884,224 | 24,278,200 | +  ~6.09 %

Signed-off-by: Sebastian Andrzej Siewior <bigeasy@linutronix.de>
Signed-off-by: Davidlohr Bueso <dbueso@suse.de>
---
 ipc/msg.c | 133 +++++++++++++++++++-------------------------------------------
 1 file changed, 40 insertions(+), 93 deletions(-)

diff --git a/ipc/msg.c b/ipc/msg.c
index 1471db9a7e61..45ce6a3021be 100644
--- a/ipc/msg.c
+++ b/ipc/msg.c
@@ -51,13 +51,7 @@ struct msg_receiver {
 	long			r_msgtype;
 	long			r_maxsize;
 
-	/*
-	 * Mark r_msg volatile so that the compiler
-	 * does not try to get smart and optimize
-	 * it. We rely on this for the lockless
-	 * receive algorithm.
-	 */
-	struct msg_msg		*volatile r_msg;
+	struct msg_msg		*r_msg;
 };
 
 /* one msg_sender for each sleeping sender */
@@ -183,21 +177,14 @@ static void ss_wakeup(struct list_head *h, int kill)
 	}
 }
 
-static void expunge_all(struct msg_queue *msq, int res)
+static void expunge_all(struct msg_queue *msq, int res,
+			struct wake_q_head *wake_q)
 {
 	struct msg_receiver *msr, *t;
 
 	list_for_each_entry_safe(msr, t, &msq->q_receivers, r_list) {
-		msr->r_msg = NULL; /* initialize expunge ordering */
-		wake_up_process(msr->r_tsk);
-		/*
-		 * Ensure that the wakeup is visible before setting r_msg as
-		 * the receiving end depends on it: either spinning on a nil,
-		 * or dealing with -EAGAIN cases. See lockless receive part 1
-		 * and 2 in do_msgrcv().
-		 */
-		smp_wmb(); /* barrier (B) */
-		msr->r_msg = ERR_PTR(res);
+		wake_q_add(wake_q, msr->r_tsk);
+		WRITE_ONCE(msr->r_msg, ERR_PTR(res));
 	}
 }
 
@@ -213,11 +200,13 @@ static void freeque(struct ipc_namespace *ns, struct kern_ipc_perm *ipcp)
 {
 	struct msg_msg *msg, *t;
 	struct msg_queue *msq = container_of(ipcp, struct msg_queue, q_perm);
+	WAKE_Q(wake_q);
 
-	expunge_all(msq, -EIDRM);
+	expunge_all(msq, -EIDRM, &wake_q);
 	ss_wakeup(&msq->q_senders, 1);
 	msg_rmid(ns, msq);
 	ipc_unlock_object(&msq->q_perm);
+	wake_up_q(&wake_q);
 	rcu_read_unlock();
 
 	list_for_each_entry_safe(msg, t, &msq->q_messages, m_list) {
@@ -342,6 +331,7 @@ static int msgctl_down(struct ipc_namespace *ns, int msqid, int cmd,
 	struct kern_ipc_perm *ipcp;
 	struct msqid64_ds uninitialized_var(msqid64);
 	struct msg_queue *msq;
+	WAKE_Q(wake_q);
 	int err;
 
 	if (cmd == IPC_SET) {
@@ -389,7 +379,7 @@ static int msgctl_down(struct ipc_namespace *ns, int msqid, int cmd,
 		/* sleeping receivers might be excluded by
 		 * stricter permissions.
 		 */
-		expunge_all(msq, -EAGAIN);
+		expunge_all(msq, -EAGAIN, &wake_q);
 		/* sleeping senders might be able to send
 		 * due to a larger queue size.
 		 */
@@ -402,6 +392,7 @@ static int msgctl_down(struct ipc_namespace *ns, int msqid, int cmd,
 
 out_unlock0:
 	ipc_unlock_object(&msq->q_perm);
+	wake_up_q(&wake_q);
 out_unlock1:
 	rcu_read_unlock();
 out_up:
@@ -566,7 +557,8 @@ static int testmsg(struct msg_msg *msg, long type, int mode)
 	return 0;
 }
 
-static inline int pipelined_send(struct msg_queue *msq, struct msg_msg *msg)
+static inline int pipelined_send(struct msg_queue *msq, struct msg_msg *msg,
+				 struct wake_q_head *wake_q)
 {
 	struct msg_receiver *msr, *t;
 
@@ -577,27 +569,14 @@ static inline int pipelined_send(struct msg_queue *msq, struct msg_msg *msg)
 
 			list_del(&msr->r_list);
 			if (msr->r_maxsize < msg->m_ts) {
-				/* initialize pipelined send ordering */
-				msr->r_msg = NULL;
-				wake_up_process(msr->r_tsk);
-				/* barrier (B) see barrier comment below */
-				smp_wmb();
-				msr->r_msg = ERR_PTR(-E2BIG);
+				wake_q_add(wake_q, msr->r_tsk);
+				WRITE_ONCE(msr->r_msg, ERR_PTR(-E2BIG));
 			} else {
-				msr->r_msg = NULL;
 				msq->q_lrpid = task_pid_vnr(msr->r_tsk);
 				msq->q_rtime = get_seconds();
-				wake_up_process(msr->r_tsk);
-				/*
-				 * Ensure that the wakeup is visible before
-				 * setting r_msg, as the receiving can otherwise
-				 * exit - once r_msg is set, the receiver can
-				 * continue. See lockless receive part 1 and 2
-				 * in do_msgrcv(). Barrier (B).
-				 */
-				smp_wmb();
-				msr->r_msg = msg;
 
+				wake_q_add(wake_q, msr->r_tsk);
+				WRITE_ONCE(msr->r_msg, msg);
 				return 1;
 			}
 		}
@@ -613,6 +592,7 @@ long do_msgsnd(int msqid, long mtype, void __user *mtext,
 	struct msg_msg *msg;
 	int err;
 	struct ipc_namespace *ns;
+	WAKE_Q(wake_q);
 
 	ns = current->nsproxy->ipc_ns;
 
@@ -686,7 +666,6 @@ long do_msgsnd(int msqid, long mtype, void __user *mtext,
 			err = -EIDRM;
 			goto out_unlock0;
 		}
-
 		ss_del(&s);
 
 		if (signal_pending(current)) {
@@ -698,7 +677,7 @@ long do_msgsnd(int msqid, long mtype, void __user *mtext,
 	msq->q_lspid = task_tgid_vnr(current);
 	msq->q_stime = get_seconds();
 
-	if (!pipelined_send(msq, msg)) {
+	if (!pipelined_send(msq, msg, &wake_q)) {
 		/* no one is waiting for this message, enqueue it */
 		list_add_tail(&msg->m_list, &msq->q_messages);
 		msq->q_cbytes += msgsz;
@@ -712,6 +691,7 @@ long do_msgsnd(int msqid, long mtype, void __user *mtext,
 
 out_unlock0:
 	ipc_unlock_object(&msq->q_perm);
+	wake_up_q(&wake_q);
 out_unlock1:
 	rcu_read_unlock();
 	if (msg != NULL)
@@ -919,71 +899,38 @@ long do_msgrcv(int msqid, void __user *buf, size_t bufsz, long msgtyp, int msgfl
 		rcu_read_unlock();
 		schedule();
 
-		/* Lockless receive, part 1:
-		 * Disable preemption.  We don't hold a reference to the queue
-		 * and getting a reference would defeat the idea of a lockless
-		 * operation, thus the code relies on rcu to guarantee the
-		 * existence of msq:
+		/*
+		 * Lockless receive, part 1:
+		 * We don't hold a reference to the queue and getting a
+		 * reference would defeat the idea of a lockless operation,
+		 * thus the code relies on rcu to guarantee the existence of
+		 * msq:
 		 * Prior to destruction, expunge_all(-EIRDM) changes r_msg.
 		 * Thus if r_msg is -EAGAIN, then the queue not yet destroyed.
-		 * rcu_read_lock() prevents preemption between reading r_msg
-		 * and acquiring the q_perm.lock in ipc_lock_object().
 		 */
 		rcu_read_lock();
 
-		/* Lockless receive, part 2:
-		 * Wait until pipelined_send or expunge_all are outside of
-		 * wake_up_process(). There is a race with exit(), see
-		 * ipc/mqueue.c for the details. The correct serialization
-		 * ensures that a receiver cannot continue without the wakeup
-		 * being visibible _before_ setting r_msg:
-		 *
-		 * CPU 0                             CPU 1
-		 * <loop receiver>
-		 *   smp_rmb(); (A) <-- pair -.      <waker thread>
-		 *   <load ->r_msg>           |        msr->r_msg = NULL;
-		 *                            |        wake_up_process();
-		 * <continue>                 `------> smp_wmb(); (B)
-		 *                                     msr->r_msg = msg;
+		/*
+		 * Lockless receive, part 2:
+		 * The work in pipelined_send() and expunge_all():
+		 * - Set pointer to message
+		 * - Queue the receiver task for later wakeup
+		 * - Wake up the process after the lock is dropped.
 		 *
-		 * Where (A) orders the message value read and where (B) orders
-		 * the write to the r_msg -- done in both pipelined_send and
-		 * expunge_all.
-		 */
-		for (;;) {
-			/*
-			 * Pairs with writer barrier in pipelined_send
-			 * or expunge_all.
-			 */
-			smp_rmb(); /* barrier (A) */
-			msg = (struct msg_msg *)msr_d.r_msg;
-			if (msg)
-				break;
-
-			/*
-			 * The cpu_relax() call is a compiler barrier
-			 * which forces everything in this loop to be
-			 * re-loaded.
-			 */
-			cpu_relax();
-		}
-
-		/* Lockless receive, part 3:
-		 * If there is a message or an error then accept it without
-		 * locking.
+		 * Should the process wake up before this wakeup (due to a
+		 * signal) it will either see the message and continue ...
 		 */
+		msg = READ_ONCE(msr_d.r_msg);
 		if (msg != ERR_PTR(-EAGAIN))
 			goto out_unlock1;
 
-		/* Lockless receive, part 3:
-		 * Acquire the queue spinlock.
-		 */
+		 /*
+		  * ... or see -EAGAIN, acquire the lock to check the message
+		  * again.
+		  */
 		ipc_lock_object(&msq->q_perm);
 
-		/* Lockless receive, part 4:
-		 * Repeat test after acquiring the spinlock.
-		 */
-		msg = (struct msg_msg *)msr_d.r_msg;
+		msg = msr_d.r_msg;
 		if (msg != ERR_PTR(-EAGAIN))
 			goto out_unlock0;
 
-- 
2.6.6

^ permalink raw reply	[flat|nested] 9+ messages in thread

* [PATCH 2/5] ipc/msg: Batch queue sender wakeups
  2016-07-28 23:33 [PATCH 0/5] ipc/msg: Sender/receiver optimizations Davidlohr Bueso
  2016-07-28 23:33 ` [PATCH 1/5] ipc/msg: Implement lockless pipelined wakeups Davidlohr Bueso
@ 2016-07-28 23:33 ` Davidlohr Bueso
  2016-07-28 23:33 ` [PATCH 3/5] ipc/msg: Make ss_wakeup() kill arg boolean Davidlohr Bueso
                   ` (3 subsequent siblings)
  5 siblings, 0 replies; 9+ messages in thread
From: Davidlohr Bueso @ 2016-07-28 23:33 UTC (permalink / raw)
  To: akpm; +Cc: manfred, bigeasy, peterz, tglx, dave, linux-kernel, Davidlohr Bueso

Currently the use of wake_qs in sysv msg queues are only
for the receiver tasks that are blocked on the queue. But
blocked sender tasks (due to queue size constraints) still
are awoken with the ipc object lock held, which can be a
problem particularly for small sized queues and far from
gracious for -rt (just like it was for the receiver side).

The paths that actually wakeup a sender are obviously
related to when we are either getting rid of the queue
or after (some) space is freed-up after a receiver takes
the msg (msgrcv). Furthermore, with the exception of msgrcv,
we can always piggy-back on expunge_all that has its own
tasks lined-up for waking. Finally, upon unlinking the
message, it should be no problem delaying the wakeups a
bit until after we've released the lock.

Signed-off-by: Davidlohr Bueso <dbueso@suse.de>
---
 ipc/msg.c | 30 ++++++++++++++++++++----------
 1 file changed, 20 insertions(+), 10 deletions(-)

diff --git a/ipc/msg.c b/ipc/msg.c
index 45ce6a3021be..395013d58fda 100644
--- a/ipc/msg.c
+++ b/ipc/msg.c
@@ -166,14 +166,15 @@ static inline void ss_del(struct msg_sender *mss)
 		list_del(&mss->list);
 }
 
-static void ss_wakeup(struct list_head *h, int kill)
+static void ss_wakeup(struct list_head *h,
+		      struct wake_q_head *wake_q, int kill)
 {
 	struct msg_sender *mss, *t;
 
 	list_for_each_entry_safe(mss, t, h, list) {
 		if (kill)
 			mss->list.next = NULL;
-		wake_up_process(mss->tsk);
+		wake_q_add(wake_q, mss->tsk);
 	}
 }
 
@@ -203,7 +204,7 @@ static void freeque(struct ipc_namespace *ns, struct kern_ipc_perm *ipcp)
 	WAKE_Q(wake_q);
 
 	expunge_all(msq, -EIDRM, &wake_q);
-	ss_wakeup(&msq->q_senders, 1);
+	ss_wakeup(&msq->q_senders, &wake_q, 1);
 	msg_rmid(ns, msq);
 	ipc_unlock_object(&msq->q_perm);
 	wake_up_q(&wake_q);
@@ -331,7 +332,6 @@ static int msgctl_down(struct ipc_namespace *ns, int msqid, int cmd,
 	struct kern_ipc_perm *ipcp;
 	struct msqid64_ds uninitialized_var(msqid64);
 	struct msg_queue *msq;
-	WAKE_Q(wake_q);
 	int err;
 
 	if (cmd == IPC_SET) {
@@ -362,6 +362,9 @@ static int msgctl_down(struct ipc_namespace *ns, int msqid, int cmd,
 		freeque(ns, ipcp);
 		goto out_up;
 	case IPC_SET:
+	{
+		WAKE_Q(wake_q);
+
 		if (msqid64.msg_qbytes > ns->msg_ctlmnb &&
 		    !capable(CAP_SYS_RESOURCE)) {
 			err = -EPERM;
@@ -376,15 +379,21 @@ static int msgctl_down(struct ipc_namespace *ns, int msqid, int cmd,
 		msq->q_qbytes = msqid64.msg_qbytes;
 
 		msq->q_ctime = get_seconds();
-		/* sleeping receivers might be excluded by
+		/*
+		 * Sleeping receivers might be excluded by
 		 * stricter permissions.
 		 */
 		expunge_all(msq, -EAGAIN, &wake_q);
-		/* sleeping senders might be able to send
+		/*
+		 * Sleeping senders might be able to send
 		 * due to a larger queue size.
 		 */
-		ss_wakeup(&msq->q_senders, 0);
-		break;
+		ss_wakeup(&msq->q_senders, &wake_q, 0);
+		ipc_unlock_object(&msq->q_perm);
+		wake_up_q(&wake_q);
+
+		goto out_unlock1;
+	}
 	default:
 		err = -EINVAL;
 		goto out_unlock1;
@@ -392,7 +401,6 @@ static int msgctl_down(struct ipc_namespace *ns, int msqid, int cmd,
 
 out_unlock0:
 	ipc_unlock_object(&msq->q_perm);
-	wake_up_q(&wake_q);
 out_unlock1:
 	rcu_read_unlock();
 out_up:
@@ -809,6 +817,7 @@ long do_msgrcv(int msqid, void __user *buf, size_t bufsz, long msgtyp, int msgfl
 	struct msg_queue *msq;
 	struct ipc_namespace *ns;
 	struct msg_msg *msg, *copy = NULL;
+	WAKE_Q(wake_q);
 
 	ns = current->nsproxy->ipc_ns;
 
@@ -873,7 +882,7 @@ long do_msgrcv(int msqid, void __user *buf, size_t bufsz, long msgtyp, int msgfl
 			msq->q_cbytes -= msg->m_ts;
 			atomic_sub(msg->m_ts, &ns->msg_bytes);
 			atomic_dec(&ns->msg_hdrs);
-			ss_wakeup(&msq->q_senders, 0);
+			ss_wakeup(&msq->q_senders, &wake_q, 0);
 
 			goto out_unlock0;
 		}
@@ -945,6 +954,7 @@ long do_msgrcv(int msqid, void __user *buf, size_t bufsz, long msgtyp, int msgfl
 
 out_unlock0:
 	ipc_unlock_object(&msq->q_perm);
+	wake_up_q(&wake_q);
 out_unlock1:
 	rcu_read_unlock();
 	if (IS_ERR(msg)) {
-- 
2.6.6

^ permalink raw reply	[flat|nested] 9+ messages in thread

* [PATCH 3/5] ipc/msg: Make ss_wakeup() kill arg boolean
  2016-07-28 23:33 [PATCH 0/5] ipc/msg: Sender/receiver optimizations Davidlohr Bueso
  2016-07-28 23:33 ` [PATCH 1/5] ipc/msg: Implement lockless pipelined wakeups Davidlohr Bueso
  2016-07-28 23:33 ` [PATCH 2/5] ipc/msg: Batch queue sender wakeups Davidlohr Bueso
@ 2016-07-28 23:33 ` Davidlohr Bueso
  2016-07-28 23:33 ` [PATCH 4/5] ipc/msg: Lockless security checks for msgsnd Davidlohr Bueso
                   ` (2 subsequent siblings)
  5 siblings, 0 replies; 9+ messages in thread
From: Davidlohr Bueso @ 2016-07-28 23:33 UTC (permalink / raw)
  To: akpm; +Cc: manfred, bigeasy, peterz, tglx, dave, linux-kernel

... 'tis annoying.

Signed-off-by: Davidlohr Bueso <dave@stgolabs.net>
---
 ipc/msg.c | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)

diff --git a/ipc/msg.c b/ipc/msg.c
index 395013d58fda..5181259e2ff0 100644
--- a/ipc/msg.c
+++ b/ipc/msg.c
@@ -167,7 +167,7 @@ static inline void ss_del(struct msg_sender *mss)
 }
 
 static void ss_wakeup(struct list_head *h,
-		      struct wake_q_head *wake_q, int kill)
+		      struct wake_q_head *wake_q, bool kill)
 {
 	struct msg_sender *mss, *t;
 
@@ -204,7 +204,7 @@ static void freeque(struct ipc_namespace *ns, struct kern_ipc_perm *ipcp)
 	WAKE_Q(wake_q);
 
 	expunge_all(msq, -EIDRM, &wake_q);
-	ss_wakeup(&msq->q_senders, &wake_q, 1);
+	ss_wakeup(&msq->q_senders, &wake_q, true);
 	msg_rmid(ns, msq);
 	ipc_unlock_object(&msq->q_perm);
 	wake_up_q(&wake_q);
@@ -388,7 +388,7 @@ static int msgctl_down(struct ipc_namespace *ns, int msqid, int cmd,
 		 * Sleeping senders might be able to send
 		 * due to a larger queue size.
 		 */
-		ss_wakeup(&msq->q_senders, &wake_q, 0);
+		ss_wakeup(&msq->q_senders, &wake_q, false);
 		ipc_unlock_object(&msq->q_perm);
 		wake_up_q(&wake_q);
 
@@ -882,7 +882,7 @@ long do_msgrcv(int msqid, void __user *buf, size_t bufsz, long msgtyp, int msgfl
 			msq->q_cbytes -= msg->m_ts;
 			atomic_sub(msg->m_ts, &ns->msg_bytes);
 			atomic_dec(&ns->msg_hdrs);
-			ss_wakeup(&msq->q_senders, &wake_q, 0);
+			ss_wakeup(&msq->q_senders, &wake_q, false);
 
 			goto out_unlock0;
 		}
-- 
2.6.6

^ permalink raw reply	[flat|nested] 9+ messages in thread

* [PATCH 4/5] ipc/msg: Lockless security checks for msgsnd
  2016-07-28 23:33 [PATCH 0/5] ipc/msg: Sender/receiver optimizations Davidlohr Bueso
                   ` (2 preceding siblings ...)
  2016-07-28 23:33 ` [PATCH 3/5] ipc/msg: Make ss_wakeup() kill arg boolean Davidlohr Bueso
@ 2016-07-28 23:33 ` Davidlohr Bueso
  2016-07-28 23:33 ` [PATCH 5/5] ipc/msg: Avoid waking sender upon full queue Davidlohr Bueso
  2016-08-04 16:44 ` [PATCH 0/5] ipc/msg: Sender/receiver optimizations Peter Zijlstra
  5 siblings, 0 replies; 9+ messages in thread
From: Davidlohr Bueso @ 2016-07-28 23:33 UTC (permalink / raw)
  To: akpm; +Cc: manfred, bigeasy, peterz, tglx, dave, linux-kernel, Davidlohr Bueso

Just as with msgrcv (along with the rest of sysvipc since a few
years ago), perform the security checks without holding the ipc
object lock. This also reduces the hogging of the lock for the
entire duration of a sender, as we drop the lock upon every
iteration -- and this is exactly why we also check for racing
with RMID in the first place.

Signed-off-by: Davidlohr Bueso <dbueso@suse.de>
---
 ipc/msg.c | 7 ++++---
 1 file changed, 4 insertions(+), 3 deletions(-)

diff --git a/ipc/msg.c b/ipc/msg.c
index 5181259e2ff0..fe793304dddb 100644
--- a/ipc/msg.c
+++ b/ipc/msg.c
@@ -623,14 +623,14 @@ long do_msgsnd(int msqid, long mtype, void __user *mtext,
 		goto out_unlock1;
 	}
 
-	ipc_lock_object(&msq->q_perm);
-
 	for (;;) {
 		struct msg_sender s;
 
 		err = -EACCES;
 		if (ipcperms(ns, &msq->q_perm, S_IWUGO))
-			goto out_unlock0;
+			goto out_unlock1;
+
+		ipc_lock_object(&msq->q_perm);
 
 		/* raced with RMID? */
 		if (!ipc_valid_object(&msq->q_perm)) {
@@ -681,6 +681,7 @@ long do_msgsnd(int msqid, long mtype, void __user *mtext,
 			goto out_unlock0;
 		}
 
+		ipc_unlock_object(&msq->q_perm);
 	}
 	msq->q_lspid = task_tgid_vnr(current);
 	msq->q_stime = get_seconds();
-- 
2.6.6

^ permalink raw reply	[flat|nested] 9+ messages in thread

* [PATCH 5/5] ipc/msg: Avoid waking sender upon full queue
  2016-07-28 23:33 [PATCH 0/5] ipc/msg: Sender/receiver optimizations Davidlohr Bueso
                   ` (3 preceding siblings ...)
  2016-07-28 23:33 ` [PATCH 4/5] ipc/msg: Lockless security checks for msgsnd Davidlohr Bueso
@ 2016-07-28 23:33 ` Davidlohr Bueso
  2016-08-04 16:44 ` [PATCH 0/5] ipc/msg: Sender/receiver optimizations Peter Zijlstra
  5 siblings, 0 replies; 9+ messages in thread
From: Davidlohr Bueso @ 2016-07-28 23:33 UTC (permalink / raw)
  To: akpm; +Cc: manfred, bigeasy, peterz, tglx, dave, linux-kernel, Davidlohr Bueso

Blocked tasks queued in q_senders waiting for their message to
fit in the queue are blindly awoken every time we think there's
a remote chance this might happen. This could cause numerous
(and expensive -- thundering herd-ish) bogus wakeups if the queue
is still really full. Adding to the scheduling cost/overhead,
there's also the fact that we need to take the ipc object lock
and requeue ourselves in the q_senders list.

By keeping track of the blocked sender's message size, we can
know previously if the wakeup ought to occur or not. Otherwise,
to maintain the current wakeup order we just move it to the
tail. This is exactly what occurs right now if the sender needs
to go back to sleep.

The case of EIDRM is left completely untouched, as we need to
wakeup all the tasks, and shouldn't be playing games in the
first place.

This patch was seen to save on the 'msgctl10' ltp testcase
~15% in context switches (avg out of ten runs). Although these
tests are really about functionality (as opposed to performance),
is does show the direct benefits of the optimization.

Signed-off-by: Davidlohr Bueso <dbueso@suse.de>
---
 ipc/msg.c | 52 +++++++++++++++++++++++++++++++++++++++++++---------
 1 file changed, 43 insertions(+), 9 deletions(-)

diff --git a/ipc/msg.c b/ipc/msg.c
index fe793304dddb..1a77dfacc0d6 100644
--- a/ipc/msg.c
+++ b/ipc/msg.c
@@ -58,6 +58,7 @@ struct msg_receiver {
 struct msg_sender {
 	struct list_head	list;
 	struct task_struct	*tsk;
+	size_t                  msgsz;
 };
 
 #define SEARCH_ANY		1
@@ -153,27 +154,60 @@ static int newque(struct ipc_namespace *ns, struct ipc_params *params)
 	return msq->q_perm.id;
 }
 
-static inline void ss_add(struct msg_queue *msq, struct msg_sender *mss)
+static inline bool msg_fits_inqueue(struct msg_queue *msq, size_t msgsz)
+{
+	return msgsz + msq->q_cbytes <= msq->q_qbytes &&
+		1 + msq->q_qnum <= msq->q_qbytes;
+}
+
+static inline void ss_add(struct msg_queue *msq,
+			  struct msg_sender *mss, size_t msgsz)
 {
 	mss->tsk = current;
+	mss->msgsz = msgsz;
 	__set_current_state(TASK_INTERRUPTIBLE);
 	list_add_tail(&mss->list, &msq->q_senders);
 }
 
 static inline void ss_del(struct msg_sender *mss)
 {
-	if (mss->list.next != NULL)
+	if (mss->list.next)
 		list_del(&mss->list);
 }
 
-static void ss_wakeup(struct list_head *h,
+static void ss_wakeup(struct msg_queue *msq,
 		      struct wake_q_head *wake_q, bool kill)
 {
 	struct msg_sender *mss, *t;
+	struct task_struct *stop_tsk = NULL;
+	struct list_head *h = &msq->q_senders;
 
 	list_for_each_entry_safe(mss, t, h, list) {
 		if (kill)
 			mss->list.next = NULL;
+
+		/*
+		 * Stop at the first task we don't wakeup,
+		 * we've already iterated the original
+		 * sender queue.
+		 */
+		else if (stop_tsk == mss->tsk)
+			break;
+		/*
+		 * We are not in an EIDRM scenario here, therefore
+		 * verify that we really need to wakeup the task.
+		 * To maintain current semantics and wakeup order,
+		 * move the sender to the tail on behalf of the
+		 * blocked task.
+		 */
+		else if (!msg_fits_inqueue(msq, mss->msgsz)) {
+			if (!stop_tsk)
+				stop_tsk = mss->tsk;
+
+			list_move_tail(&mss->list, &msq->q_senders);
+			continue;
+		}
+
 		wake_q_add(wake_q, mss->tsk);
 	}
 }
@@ -204,7 +238,7 @@ static void freeque(struct ipc_namespace *ns, struct kern_ipc_perm *ipcp)
 	WAKE_Q(wake_q);
 
 	expunge_all(msq, -EIDRM, &wake_q);
-	ss_wakeup(&msq->q_senders, &wake_q, true);
+	ss_wakeup(msq, &wake_q, true);
 	msg_rmid(ns, msq);
 	ipc_unlock_object(&msq->q_perm);
 	wake_up_q(&wake_q);
@@ -388,7 +422,7 @@ static int msgctl_down(struct ipc_namespace *ns, int msqid, int cmd,
 		 * Sleeping senders might be able to send
 		 * due to a larger queue size.
 		 */
-		ss_wakeup(&msq->q_senders, &wake_q, false);
+		ss_wakeup(msq, &wake_q, false);
 		ipc_unlock_object(&msq->q_perm);
 		wake_up_q(&wake_q);
 
@@ -642,8 +676,7 @@ long do_msgsnd(int msqid, long mtype, void __user *mtext,
 		if (err)
 			goto out_unlock0;
 
-		if (msgsz + msq->q_cbytes <= msq->q_qbytes &&
-				1 + msq->q_qnum <= msq->q_qbytes) {
+		if (msg_fits_inqueue(msq, msgsz)) {
 			break;
 		}
 
@@ -654,7 +687,7 @@ long do_msgsnd(int msqid, long mtype, void __user *mtext,
 		}
 
 		/* enqueue the sender and prepare to block */
-		ss_add(msq, &s);
+		ss_add(msq, &s, msgsz);
 
 		if (!ipc_rcu_getref(msq)) {
 			err = -EIDRM;
@@ -683,6 +716,7 @@ long do_msgsnd(int msqid, long mtype, void __user *mtext,
 
 		ipc_unlock_object(&msq->q_perm);
 	}
+
 	msq->q_lspid = task_tgid_vnr(current);
 	msq->q_stime = get_seconds();
 
@@ -883,7 +917,7 @@ long do_msgrcv(int msqid, void __user *buf, size_t bufsz, long msgtyp, int msgfl
 			msq->q_cbytes -= msg->m_ts;
 			atomic_sub(msg->m_ts, &ns->msg_bytes);
 			atomic_dec(&ns->msg_hdrs);
-			ss_wakeup(&msq->q_senders, &wake_q, false);
+			ss_wakeup(msq, &wake_q, false);
 
 			goto out_unlock0;
 		}
-- 
2.6.6

^ permalink raw reply	[flat|nested] 9+ messages in thread

* Re: [PATCH 0/5] ipc/msg: Sender/receiver optimizations
  2016-07-28 23:33 [PATCH 0/5] ipc/msg: Sender/receiver optimizations Davidlohr Bueso
                   ` (4 preceding siblings ...)
  2016-07-28 23:33 ` [PATCH 5/5] ipc/msg: Avoid waking sender upon full queue Davidlohr Bueso
@ 2016-08-04 16:44 ` Peter Zijlstra
  2016-08-09 12:44   ` Peter Zijlstra
  5 siblings, 1 reply; 9+ messages in thread
From: Peter Zijlstra @ 2016-08-04 16:44 UTC (permalink / raw)
  To: Davidlohr Bueso; +Cc: akpm, manfred, bigeasy, tglx, linux-kernel

On Thu, Jul 28, 2016 at 04:33:34PM -0700, Davidlohr Bueso wrote:
> Hi,
> 
> I'm resending Sebastian's sysv msg queue use of wake_qs but updated
> to the last observations I need wrt the need of explicit barriers
> after removing the whole receiver busy-looping. After some irc exchange
> it seems we're both on the same page, and things now look like he had
> them earlier, in v2. This is all patch 1.
> 
> The rest of the patches are changes I noticed while reviewing patch 1,
> which are mainly sender-side rework/optimizations. Details are in each
> changelog.
> 
> The changes have survived ltp (which has some nasty corner cases for msgsnd
> changes), as well as pmsg-shared benchmark.

Not really my area, but over all the patches look good.

Acked-by: Peter Zijlstra (Intel) <peterz@infradead.org>

^ permalink raw reply	[flat|nested] 9+ messages in thread

* Re: [PATCH 0/5] ipc/msg: Sender/receiver optimizations
  2016-08-04 16:44 ` [PATCH 0/5] ipc/msg: Sender/receiver optimizations Peter Zijlstra
@ 2016-08-09 12:44   ` Peter Zijlstra
  2016-08-09 18:31     ` Andrew Morton
  0 siblings, 1 reply; 9+ messages in thread
From: Peter Zijlstra @ 2016-08-09 12:44 UTC (permalink / raw)
  To: Davidlohr Bueso; +Cc: akpm, manfred, bigeasy, tglx, linux-kernel

On Thu, Aug 04, 2016 at 06:44:09PM +0200, Peter Zijlstra wrote:
> On Thu, Jul 28, 2016 at 04:33:34PM -0700, Davidlohr Bueso wrote:
> > Hi,
> > 
> > I'm resending Sebastian's sysv msg queue use of wake_qs but updated
> > to the last observations I need wrt the need of explicit barriers
> > after removing the whole receiver busy-looping. After some irc exchange
> > it seems we're both on the same page, and things now look like he had
> > them earlier, in v2. This is all patch 1.
> > 
> > The rest of the patches are changes I noticed while reviewing patch 1,
> > which are mainly sender-side rework/optimizations. Details are in each
> > changelog.
> > 
> > The changes have survived ltp (which has some nasty corner cases for msgsnd
> > changes), as well as pmsg-shared benchmark.
> 
> Not really my area, but over all the patches look good.
> 
> Acked-by: Peter Zijlstra (Intel) <peterz@infradead.org>

Andrew, will you pick these up, or should I route then through
tip/locking or something?

^ permalink raw reply	[flat|nested] 9+ messages in thread

* Re: [PATCH 0/5] ipc/msg: Sender/receiver optimizations
  2016-08-09 12:44   ` Peter Zijlstra
@ 2016-08-09 18:31     ` Andrew Morton
  0 siblings, 0 replies; 9+ messages in thread
From: Andrew Morton @ 2016-08-09 18:31 UTC (permalink / raw)
  To: Peter Zijlstra; +Cc: Davidlohr Bueso, manfred, bigeasy, tglx, linux-kernel

On Tue, 9 Aug 2016 14:44:26 +0200 Peter Zijlstra <peterz@infradead.org> wrote:

> On Thu, Aug 04, 2016 at 06:44:09PM +0200, Peter Zijlstra wrote:
> > On Thu, Jul 28, 2016 at 04:33:34PM -0700, Davidlohr Bueso wrote:
> > > Hi,
> > > 
> > > I'm resending Sebastian's sysv msg queue use of wake_qs but updated
> > > to the last observations I need wrt the need of explicit barriers
> > > after removing the whole receiver busy-looping. After some irc exchange
> > > it seems we're both on the same page, and things now look like he had
> > > them earlier, in v2. This is all patch 1.
> > > 
> > > The rest of the patches are changes I noticed while reviewing patch 1,
> > > which are mainly sender-side rework/optimizations. Details are in each
> > > changelog.
> > > 
> > > The changes have survived ltp (which has some nasty corner cases for msgsnd
> > > changes), as well as pmsg-shared benchmark.
> > 
> > Not really my area, but over all the patches look good.
> > 
> > Acked-by: Peter Zijlstra (Intel) <peterz@infradead.org>

Thanks.

> Andrew, will you pick these up, or should I route then through
> tip/locking or something?

I'll be processing these later today/tomorrow. 

^ permalink raw reply	[flat|nested] 9+ messages in thread

end of thread, back to index

Thread overview: 9+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2016-07-28 23:33 [PATCH 0/5] ipc/msg: Sender/receiver optimizations Davidlohr Bueso
2016-07-28 23:33 ` [PATCH 1/5] ipc/msg: Implement lockless pipelined wakeups Davidlohr Bueso
2016-07-28 23:33 ` [PATCH 2/5] ipc/msg: Batch queue sender wakeups Davidlohr Bueso
2016-07-28 23:33 ` [PATCH 3/5] ipc/msg: Make ss_wakeup() kill arg boolean Davidlohr Bueso
2016-07-28 23:33 ` [PATCH 4/5] ipc/msg: Lockless security checks for msgsnd Davidlohr Bueso
2016-07-28 23:33 ` [PATCH 5/5] ipc/msg: Avoid waking sender upon full queue Davidlohr Bueso
2016-08-04 16:44 ` [PATCH 0/5] ipc/msg: Sender/receiver optimizations Peter Zijlstra
2016-08-09 12:44   ` Peter Zijlstra
2016-08-09 18:31     ` Andrew Morton

LKML Archive on lore.kernel.org

Archives are clonable:
	git clone --mirror https://lore.kernel.org/lkml/0 lkml/git/0.git
	git clone --mirror https://lore.kernel.org/lkml/1 lkml/git/1.git
	git clone --mirror https://lore.kernel.org/lkml/2 lkml/git/2.git
	git clone --mirror https://lore.kernel.org/lkml/3 lkml/git/3.git
	git clone --mirror https://lore.kernel.org/lkml/4 lkml/git/4.git
	git clone --mirror https://lore.kernel.org/lkml/5 lkml/git/5.git
	git clone --mirror https://lore.kernel.org/lkml/6 lkml/git/6.git
	git clone --mirror https://lore.kernel.org/lkml/7 lkml/git/7.git

	# If you have public-inbox 1.1+ installed, you may
	# initialize and index your mirror using the following commands:
	public-inbox-init -V2 lkml lkml/ https://lore.kernel.org/lkml \
		linux-kernel@vger.kernel.org linux-kernel@archiver.kernel.org
	public-inbox-index lkml


Newsgroup available over NNTP:
	nntp://nntp.lore.kernel.org/org.kernel.vger.linux-kernel


AGPL code for this site: git clone https://public-inbox.org/ public-inbox