linux-kernel.vger.kernel.org archive mirror
 help / color / mirror / Atom feed
From: Davidlohr Bueso <dave@stgolabs.net>
To: akpm@linux-foundation.org
Cc: manfred@colorfullife.com, bigeasy@linutronix.de,
	peterz@infradead.org, tglx@linutronix.de, dave@stgolabs.net,
	linux-kernel@vger.kernel.org, Davidlohr Bueso <dbueso@suse.de>
Subject: [PATCH 5/5] ipc/msg: Avoid waking sender upon full queue
Date: Thu, 28 Jul 2016 16:33:39 -0700	[thread overview]
Message-ID: <1469748819-19484-6-git-send-email-dave@stgolabs.net> (raw)
In-Reply-To: <1469748819-19484-1-git-send-email-dave@stgolabs.net>

Blocked tasks queued in q_senders waiting for their message to
fit in the queue are blindly awoken every time we think there's
a remote chance this might happen. This could cause numerous
(and expensive -- thundering herd-ish) bogus wakeups if the queue
is still really full. Adding to the scheduling cost/overhead,
there's also the fact that we need to take the ipc object lock
and requeue ourselves in the q_senders list.

By keeping track of the blocked sender's message size, we can
know previously if the wakeup ought to occur or not. Otherwise,
to maintain the current wakeup order we just move it to the
tail. This is exactly what occurs right now if the sender needs
to go back to sleep.

The case of EIDRM is left completely untouched, as we need to
wakeup all the tasks, and shouldn't be playing games in the
first place.

This patch was seen to save on the 'msgctl10' ltp testcase
~15% in context switches (avg out of ten runs). Although these
tests are really about functionality (as opposed to performance),
is does show the direct benefits of the optimization.

Signed-off-by: Davidlohr Bueso <dbueso@suse.de>
---
 ipc/msg.c | 52 +++++++++++++++++++++++++++++++++++++++++++---------
 1 file changed, 43 insertions(+), 9 deletions(-)

diff --git a/ipc/msg.c b/ipc/msg.c
index fe793304dddb..1a77dfacc0d6 100644
--- a/ipc/msg.c
+++ b/ipc/msg.c
@@ -58,6 +58,7 @@ struct msg_receiver {
 struct msg_sender {
 	struct list_head	list;
 	struct task_struct	*tsk;
+	size_t                  msgsz;
 };
 
 #define SEARCH_ANY		1
@@ -153,27 +154,60 @@ static int newque(struct ipc_namespace *ns, struct ipc_params *params)
 	return msq->q_perm.id;
 }
 
-static inline void ss_add(struct msg_queue *msq, struct msg_sender *mss)
+static inline bool msg_fits_inqueue(struct msg_queue *msq, size_t msgsz)
+{
+	return msgsz + msq->q_cbytes <= msq->q_qbytes &&
+		1 + msq->q_qnum <= msq->q_qbytes;
+}
+
+static inline void ss_add(struct msg_queue *msq,
+			  struct msg_sender *mss, size_t msgsz)
 {
 	mss->tsk = current;
+	mss->msgsz = msgsz;
 	__set_current_state(TASK_INTERRUPTIBLE);
 	list_add_tail(&mss->list, &msq->q_senders);
 }
 
 static inline void ss_del(struct msg_sender *mss)
 {
-	if (mss->list.next != NULL)
+	if (mss->list.next)
 		list_del(&mss->list);
 }
 
-static void ss_wakeup(struct list_head *h,
+static void ss_wakeup(struct msg_queue *msq,
 		      struct wake_q_head *wake_q, bool kill)
 {
 	struct msg_sender *mss, *t;
+	struct task_struct *stop_tsk = NULL;
+	struct list_head *h = &msq->q_senders;
 
 	list_for_each_entry_safe(mss, t, h, list) {
 		if (kill)
 			mss->list.next = NULL;
+
+		/*
+		 * Stop at the first task we don't wakeup,
+		 * we've already iterated the original
+		 * sender queue.
+		 */
+		else if (stop_tsk == mss->tsk)
+			break;
+		/*
+		 * We are not in an EIDRM scenario here, therefore
+		 * verify that we really need to wakeup the task.
+		 * To maintain current semantics and wakeup order,
+		 * move the sender to the tail on behalf of the
+		 * blocked task.
+		 */
+		else if (!msg_fits_inqueue(msq, mss->msgsz)) {
+			if (!stop_tsk)
+				stop_tsk = mss->tsk;
+
+			list_move_tail(&mss->list, &msq->q_senders);
+			continue;
+		}
+
 		wake_q_add(wake_q, mss->tsk);
 	}
 }
@@ -204,7 +238,7 @@ static void freeque(struct ipc_namespace *ns, struct kern_ipc_perm *ipcp)
 	WAKE_Q(wake_q);
 
 	expunge_all(msq, -EIDRM, &wake_q);
-	ss_wakeup(&msq->q_senders, &wake_q, true);
+	ss_wakeup(msq, &wake_q, true);
 	msg_rmid(ns, msq);
 	ipc_unlock_object(&msq->q_perm);
 	wake_up_q(&wake_q);
@@ -388,7 +422,7 @@ static int msgctl_down(struct ipc_namespace *ns, int msqid, int cmd,
 		 * Sleeping senders might be able to send
 		 * due to a larger queue size.
 		 */
-		ss_wakeup(&msq->q_senders, &wake_q, false);
+		ss_wakeup(msq, &wake_q, false);
 		ipc_unlock_object(&msq->q_perm);
 		wake_up_q(&wake_q);
 
@@ -642,8 +676,7 @@ long do_msgsnd(int msqid, long mtype, void __user *mtext,
 		if (err)
 			goto out_unlock0;
 
-		if (msgsz + msq->q_cbytes <= msq->q_qbytes &&
-				1 + msq->q_qnum <= msq->q_qbytes) {
+		if (msg_fits_inqueue(msq, msgsz)) {
 			break;
 		}
 
@@ -654,7 +687,7 @@ long do_msgsnd(int msqid, long mtype, void __user *mtext,
 		}
 
 		/* enqueue the sender and prepare to block */
-		ss_add(msq, &s);
+		ss_add(msq, &s, msgsz);
 
 		if (!ipc_rcu_getref(msq)) {
 			err = -EIDRM;
@@ -683,6 +716,7 @@ long do_msgsnd(int msqid, long mtype, void __user *mtext,
 
 		ipc_unlock_object(&msq->q_perm);
 	}
+
 	msq->q_lspid = task_tgid_vnr(current);
 	msq->q_stime = get_seconds();
 
@@ -883,7 +917,7 @@ long do_msgrcv(int msqid, void __user *buf, size_t bufsz, long msgtyp, int msgfl
 			msq->q_cbytes -= msg->m_ts;
 			atomic_sub(msg->m_ts, &ns->msg_bytes);
 			atomic_dec(&ns->msg_hdrs);
-			ss_wakeup(&msq->q_senders, &wake_q, false);
+			ss_wakeup(msq, &wake_q, false);
 
 			goto out_unlock0;
 		}
-- 
2.6.6

  parent reply	other threads:[~2016-07-28 23:34 UTC|newest]

Thread overview: 9+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2016-07-28 23:33 [PATCH 0/5] ipc/msg: Sender/receiver optimizations Davidlohr Bueso
2016-07-28 23:33 ` [PATCH 1/5] ipc/msg: Implement lockless pipelined wakeups Davidlohr Bueso
2016-07-28 23:33 ` [PATCH 2/5] ipc/msg: Batch queue sender wakeups Davidlohr Bueso
2016-07-28 23:33 ` [PATCH 3/5] ipc/msg: Make ss_wakeup() kill arg boolean Davidlohr Bueso
2016-07-28 23:33 ` [PATCH 4/5] ipc/msg: Lockless security checks for msgsnd Davidlohr Bueso
2016-07-28 23:33 ` Davidlohr Bueso [this message]
2016-08-04 16:44 ` [PATCH 0/5] ipc/msg: Sender/receiver optimizations Peter Zijlstra
2016-08-09 12:44   ` Peter Zijlstra
2016-08-09 18:31     ` Andrew Morton

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=1469748819-19484-6-git-send-email-dave@stgolabs.net \
    --to=dave@stgolabs.net \
    --cc=akpm@linux-foundation.org \
    --cc=bigeasy@linutronix.de \
    --cc=dbueso@suse.de \
    --cc=linux-kernel@vger.kernel.org \
    --cc=manfred@colorfullife.com \
    --cc=peterz@infradead.org \
    --cc=tglx@linutronix.de \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).