linux-kernel.vger.kernel.org archive mirror
 help / color / mirror / Atom feed
* [Patch] ipc/mqueue: add rbtree node caching support
@ 2012-05-16 19:28 Doug Ledford
  2012-05-16 22:12 ` Andrew Morton
  0 siblings, 1 reply; 3+ messages in thread
From: Doug Ledford @ 2012-05-16 19:28 UTC (permalink / raw)
  To: akpm, manfred, levinsasha928, npiggin, linux-kernel; +Cc: Doug Ledford

When I wrote the first patch that added the rbtree support for message
queue insertion, it sped up the case where the queue was very full
drastically from the original code.  It, however, slowed down the
case where the queue was empty (not drastically though).

This patch caches the last freed rbtree node struct so we can quickly
reuse it when we get a new message.  This is the common path for any
queue that very frequently goes from 0 to 1 then back to 0 messages
in queue.

Andrew Morton didn't like that we were doing a GFP_ATOMIC allocation
in msg_insert, so this patch attempts to speculatively allocate a
new node struct outside of the spin lock when we know we need it, but
will still fall back to a GFP_ATOMIC allocation if it has to.

Once I added the caching, the necessary various ret = ; spin_unlock
gyrations in mq_timedsend were getting pretty ugly, so this also slightly
refactors that function to streamline the flow of the code and the
function exit.

Finally, while working on getting performance back I made sure that all
of the node structs were always fully initialized when they were first
used, rendering the use of kzalloc unnecessary and a waste of CPU cycles.

The net result of all of this is:

1) We will avoid a GFP_ATOMIC allocation when possible, but fall back on
it when necessary.
2) We will speculatively allocate a node struct using GFP_KERNEL if our
cache is empty (and save the struct to our cache if it's still empty after
we have obtained the spin lock).
3) The performance of the common queue empty case has significantly
improved and is now much more in line with the older performance for
this case.

The performance changes are:

            Old mqueue      new mqueue      new mqueue + caching
queue empty
send/recv   305/288ns       349/318ns       310/322ns

I don't think we'll ever be able to get the recv performance back, but
that's because the old recv performance was a direct result and consequence
of the old methods abysmal send performance.  The recv path simply must
do more so that the send path does not incur such a penalty under higher
queue depths.

As it turns out, the new caching code also sped up the various queue full
cases relative to my last patch.  That could be because of the difference
between the syscall path in 3.3.4-rc5 and 3.3.4-rc6, or because of the
change in code flow in the mq_timedsend routine.  Regardless, I'll take
it.  It wasn't huge, and I *would* say it was within the margin for error,
but after many repeated runs what I'm seeing is that the old numbers
trend slightly higher (about 10 to 20ns depending on which test is the
one running).

Signed-off-by: Doug Ledford <dledford@redhat.com>
---
 ipc/mqueue.c |  107 ++++++++++++++++++++++++++++++++++++++++++++--------------
 1 files changed, 81 insertions(+), 26 deletions(-)

diff --git a/ipc/mqueue.c b/ipc/mqueue.c
index 994c5e1..76d43fb 100644
--- a/ipc/mqueue.c
+++ b/ipc/mqueue.c
@@ -68,6 +68,7 @@ struct mqueue_inode_info {
 	wait_queue_head_t wait_q;
 
 	struct rb_root msg_tree;
+	struct posix_msg_tree_node *node_cache;
 	struct mq_attr attr;
 
 	struct sigevent notify;
@@ -115,7 +116,8 @@ static struct ipc_namespace *get_ns_from_inode(struct inode *inode)
 }
 
 /* Auxiliary functions to manipulate messages' list */
-static int msg_insert(struct msg_msg *msg, struct mqueue_inode_info *info)
+static int msg_insert(struct msg_msg *msg, struct mqueue_inode_info *info,
+		      struct posix_msg_tree_node **new_leaf)
 {
 	struct rb_node **p, *parent = NULL;
 	struct posix_msg_tree_node *leaf;
@@ -132,15 +134,24 @@ static int msg_insert(struct msg_msg *msg, struct mqueue_inode_info *info)
 		else
 			p = &(*p)->rb_right;
 	}
-	leaf = kzalloc(sizeof(*leaf), GFP_ATOMIC);
+	if (info->node_cache) {
+		leaf = info->node_cache;
+	} else if (new_leaf) {
+		leaf = *new_leaf;
+		*new_leaf = NULL;
+	} else
+		leaf = kmalloc(sizeof(*leaf), GFP_ATOMIC);
 	if (!leaf)
 		return -ENOMEM;
-	rb_init_node(&leaf->rb_node);
-	INIT_LIST_HEAD(&leaf->msg_list);
+	if (!info->node_cache) {
+		rb_init_node(&leaf->rb_node);
+		INIT_LIST_HEAD(&leaf->msg_list);
+		info->qsize += sizeof(*leaf);
+	} else
+		info->node_cache = NULL;
 	leaf->priority = msg->m_type;
 	rb_link_node(&leaf->rb_node, parent, p);
 	rb_insert_color(&leaf->rb_node, &info->msg_tree);
-	info->qsize += sizeof(struct posix_msg_tree_node);
 insert_msg:
 	info->attr.mq_curmsgs++;
 	info->qsize += msg->m_ts;
@@ -180,8 +191,11 @@ try_again:
 			     "empty leaf node but we haven't implemented "
 			     "lazy leaf delete!\n");
 		rb_erase(&leaf->rb_node, &info->msg_tree);
-		info->qsize -= sizeof(struct posix_msg_tree_node);
-		kfree(leaf);
+		if (info->node_cache) {
+			info->qsize -= sizeof(*leaf);
+			kfree(leaf);
+		} else
+			info->node_cache = leaf;
 		goto try_again;
 	} else {
 		msg = list_first_entry(&leaf->msg_list,
@@ -189,8 +203,11 @@ try_again:
 		list_del(&msg->m_list);
 		if (list_empty(&leaf->msg_list)) {
 			rb_erase(&leaf->rb_node, &info->msg_tree);
-			info->qsize -= sizeof(struct posix_msg_tree_node);
-			kfree(leaf);
+			if (info->node_cache) {
+				info->qsize -= sizeof(*leaf);
+				kfree(leaf);
+			} else
+				info->node_cache = leaf;
 		}
 	}
 	info->attr.mq_curmsgs--;
@@ -232,6 +249,7 @@ static struct inode *mqueue_get_inode(struct super_block *sb,
 		info->qsize = 0;
 		info->user = NULL;	/* set when all is ok */
 		info->msg_tree = RB_ROOT;
+		info->node_cache = NULL;
 		memset(&info->attr, 0, sizeof(info->attr));
 		info->attr.mq_maxmsg = min(ipc_ns->mq_msg_max,
 					   ipc_ns->mq_msg_default);
@@ -364,6 +382,7 @@ static void mqueue_evict_inode(struct inode *inode)
 	spin_lock(&info->lock);
 	while ((msg = msg_get(info)) != NULL)
 		free_msg(msg);
+	kfree(info->node_cache);
 	spin_unlock(&info->lock);
 
 	/* Total amount of bytes accounted for the mqueue */
@@ -928,7 +947,8 @@ static inline void pipelined_send(struct mqueue_inode_info *info,
 
 /* pipelined_receive() - if there is task waiting in sys_mq_timedsend()
  * gets its message and put to the queue (we have one free place for sure). */
-static inline void pipelined_receive(struct mqueue_inode_info *info)
+static inline void pipelined_receive(struct mqueue_inode_info *info,
+				     struct posix_msg_tree_node **new_leaf)
 {
 	struct ext_wait_queue *sender = wq_get_first_waiter(info, SEND);
 
@@ -937,7 +957,7 @@ static inline void pipelined_receive(struct mqueue_inode_info *info)
 		wake_up_interruptible(&info->wait_q);
 		return;
 	}
-	if (msg_insert(sender->msg, info))
+	if (msg_insert(sender->msg, info, new_leaf))
 		return;
 	list_del(&sender->list);
 	sender->state = STATE_PENDING;
@@ -958,7 +978,8 @@ SYSCALL_DEFINE5(mq_timedsend, mqd_t, mqdes, const char __user *, u_msg_ptr,
 	struct mqueue_inode_info *info;
 	ktime_t expires, *timeout = NULL;
 	struct timespec ts;
-	int ret;
+	struct posix_msg_tree_node *new_leaf = NULL;
+	int ret = 0;
 
 	if (u_abs_timeout) {
 		int res = prepare_timeout(u_abs_timeout, &expires, &ts);
@@ -1006,39 +1027,55 @@ SYSCALL_DEFINE5(mq_timedsend, mqd_t, mqdes, const char __user *, u_msg_ptr,
 	msg_ptr->m_ts = msg_len;
 	msg_ptr->m_type = msg_prio;
 
+	/* msg_insert really wants us to have a valid, spare node struct so
+	 * it doesn't have to kmalloc a GFP_ATOMIC allocation, but it will
+	 * fall back to that if necessary. */
+	if (!info->node_cache)
+		new_leaf = kmalloc(sizeof(*new_leaf), GFP_KERNEL);
+
 	spin_lock(&info->lock);
 
+	if (!info->node_cache && new_leaf) {
+		/* Save our speculative allocation into the cache */
+		rb_init_node(&new_leaf->rb_node);
+		INIT_LIST_HEAD(&new_leaf->msg_list);
+		info->node_cache = new_leaf;
+		info->qsize += sizeof(*new_leaf);
+		new_leaf = NULL;
+	}
+
 	if (info->attr.mq_curmsgs == info->attr.mq_maxmsg) {
-		if (filp->f_flags & O_NONBLOCK) {
-			spin_unlock(&info->lock);
+		if (filp->f_flags & O_NONBLOCK)
 			ret = -EAGAIN;
-		} else {
+		else {
 			wait.task = current;
 			wait.msg = (void *) msg_ptr;
 			wait.state = STATE_NONE;
 			ret = wq_sleep(info, SEND, timeout, &wait);
+			/* wq_sleep must be called with info->lock held, and
+			 * returns with the lock released */
+			goto out_free;
 		}
-		if (ret < 0)
-			free_msg(msg_ptr);
 	} else {
 		receiver = wq_get_first_waiter(info, RECV);
 		if (receiver) {
 			pipelined_send(info, msg_ptr, receiver);
 		} else {
 			/* adds message to the queue */
-			if (msg_insert(msg_ptr, info)) {
-				free_msg(msg_ptr);
-				ret = -ENOMEM;
-				spin_unlock(&info->lock);
-				goto out_fput;
-			}
+			ret = msg_insert(msg_ptr, info, &new_leaf);
+			if (ret)
+				goto out_unlock;
 			__do_notify(info);
 		}
 		inode->i_atime = inode->i_mtime = inode->i_ctime =
 				CURRENT_TIME;
-		spin_unlock(&info->lock);
-		ret = 0;
 	}
+out_unlock:
+	spin_unlock(&info->lock);
+out_free:
+	kfree(new_leaf);
+	if (ret)
+		free_msg(msg_ptr);
 out_fput:
 	fput(filp);
 out:
@@ -1057,6 +1094,7 @@ SYSCALL_DEFINE5(mq_timedreceive, mqd_t, mqdes, char __user *, u_msg_ptr,
 	struct ext_wait_queue wait;
 	ktime_t expires, *timeout = NULL;
 	struct timespec ts;
+	struct posix_msg_tree_node *new_leaf = NULL;
 
 	if (u_abs_timeout) {
 		int res = prepare_timeout(u_abs_timeout, &expires, &ts);
@@ -1092,7 +1130,23 @@ SYSCALL_DEFINE5(mq_timedreceive, mqd_t, mqdes, char __user *, u_msg_ptr,
 		goto out_fput;
 	}
 
+	/* msg_insert really wants us to have a valid, spare node struct so
+	 * it doesn't have to kmalloc a GFP_ATOMIC allocation, but it will
+	 * fall back to that if necessary. */
+	if (!info->node_cache)
+		new_leaf = kmalloc(sizeof(*new_leaf), GFP_KERNEL);
+
 	spin_lock(&info->lock);
+
+	if (!info->node_cache && new_leaf) {
+		/* Save our speculative allocation into the cache */
+		rb_init_node(&new_leaf->rb_node);
+		INIT_LIST_HEAD(&new_leaf->msg_list);
+		info->node_cache = new_leaf;
+		info->qsize += sizeof(*new_leaf);
+		new_leaf = NULL;
+	}
+
 	if (info->attr.mq_curmsgs == 0) {
 		if (filp->f_flags & O_NONBLOCK) {
 			spin_unlock(&info->lock);
@@ -1110,7 +1164,7 @@ SYSCALL_DEFINE5(mq_timedreceive, mqd_t, mqdes, char __user *, u_msg_ptr,
 				CURRENT_TIME;
 
 		/* There is now free space in queue. */
-		pipelined_receive(info);
+		pipelined_receive(info, &new_leaf);
 		spin_unlock(&info->lock);
 		ret = 0;
 	}
@@ -1123,6 +1177,7 @@ SYSCALL_DEFINE5(mq_timedreceive, mqd_t, mqdes, char __user *, u_msg_ptr,
 		}
 		free_msg(msg_ptr);
 	}
+	kfree(new_leaf);
 out_fput:
 	fput(filp);
 out:
-- 
1.7.7.6


^ permalink raw reply related	[flat|nested] 3+ messages in thread

* Re: [Patch] ipc/mqueue: add rbtree node caching support
  2012-05-16 19:28 [Patch] ipc/mqueue: add rbtree node caching support Doug Ledford
@ 2012-05-16 22:12 ` Andrew Morton
  2012-05-17  2:07   ` Doug Ledford
  0 siblings, 1 reply; 3+ messages in thread
From: Andrew Morton @ 2012-05-16 22:12 UTC (permalink / raw)
  To: Doug Ledford; +Cc: manfred, levinsasha928, npiggin, linux-kernel

On Wed, 16 May 2012 15:28:56 -0400
Doug Ledford <dledford@redhat.com> wrote:

> When I wrote the first patch that added the rbtree support for message
> queue insertion, it sped up the case where the queue was very full
> drastically from the original code.  It, however, slowed down the
> case where the queue was empty (not drastically though).
> 
> This patch caches the last freed rbtree node struct so we can quickly
> reuse it when we get a new message.  This is the common path for any
> queue that very frequently goes from 0 to 1 then back to 0 messages
> in queue.
> 
> Andrew Morton didn't like that we were doing a GFP_ATOMIC allocation
> in msg_insert, so this patch attempts to speculatively allocate a
> new node struct outside of the spin lock when we know we need it, but
> will still fall back to a GFP_ATOMIC allocation if it has to.
> 
> Once I added the caching, the necessary various ret = ; spin_unlock
> gyrations in mq_timedsend were getting pretty ugly, so this also slightly
> refactors that function to streamline the flow of the code and the
> function exit.
> 
> Finally, while working on getting performance back I made sure that all
> of the node structs were always fully initialized when they were first
> used, rendering the use of kzalloc unnecessary and a waste of CPU cycles.
> 
> The net result of all of this is:
> 
> 1) We will avoid a GFP_ATOMIC allocation when possible, but fall back on
> it when necessary.
> 2) We will speculatively allocate a node struct using GFP_KERNEL if our
> cache is empty (and save the struct to our cache if it's still empty after
> we have obtained the spin lock).
> 3) The performance of the common queue empty case has significantly
> improved and is now much more in line with the older performance for
> this case.
> 
> The performance changes are:
> 
>             Old mqueue      new mqueue      new mqueue + caching
> queue empty
> send/recv   305/288ns       349/318ns       310/322ns
> 
> I don't think we'll ever be able to get the recv performance back, but
> that's because the old recv performance was a direct result and consequence
> of the old methods abysmal send performance.  The recv path simply must
> do more so that the send path does not incur such a penalty under higher
> queue depths.
> 
> As it turns out, the new caching code also sped up the various queue full
> cases relative to my last patch.  That could be because of the difference
> between the syscall path in 3.3.4-rc5 and 3.3.4-rc6, or because of the
> change in code flow in the mq_timedsend routine.  Regardless, I'll take
> it.  It wasn't huge, and I *would* say it was within the margin for error,
> but after many repeated runs what I'm seeing is that the old numbers
> trend slightly higher (about 10 to 20ns depending on which test is the
> one running).
> 

LGTM.  Here be some coding-style fixes:

--- a/ipc/mqueue.c~ipc-mqueue-add-rbtree-node-caching-support-fix
+++ a/ipc/mqueue.c
@@ -141,16 +141,18 @@ static int msg_insert(struct msg_msg *ms
 	} else if (new_leaf) {
 		leaf = *new_leaf;
 		*new_leaf = NULL;
-	} else
+	} else {
 		leaf = kmalloc(sizeof(*leaf), GFP_ATOMIC);
+	}
 	if (!leaf)
 		return -ENOMEM;
 	if (!info->node_cache) {
 		rb_init_node(&leaf->rb_node);
 		INIT_LIST_HEAD(&leaf->msg_list);
 		info->qsize += sizeof(*leaf);
-	} else
+	} else {
 		info->node_cache = NULL;
+	}
 	leaf->priority = msg->m_type;
 	rb_link_node(&leaf->rb_node, parent, p);
 	rb_insert_color(&leaf->rb_node, &info->msg_tree);
@@ -196,8 +198,9 @@ try_again:
 		if (info->node_cache) {
 			info->qsize -= sizeof(*leaf);
 			kfree(leaf);
-		} else
+		} else {
 			info->node_cache = leaf;
+		}
 		goto try_again;
 	} else {
 		msg = list_first_entry(&leaf->msg_list,
@@ -208,8 +211,9 @@ try_again:
 			if (info->node_cache) {
 				info->qsize -= sizeof(*leaf);
 				kfree(leaf);
-			} else
+			} else {
 				info->node_cache = leaf;
+			}
 		}
 	}
 	info->attr.mq_curmsgs--;
@@ -1033,9 +1037,11 @@ SYSCALL_DEFINE5(mq_timedsend, mqd_t, mqd
 	msg_ptr->m_ts = msg_len;
 	msg_ptr->m_type = msg_prio;
 
-	/* msg_insert really wants us to have a valid, spare node struct so
-	 * it doesn't have to kmalloc a GFP_ATOMIC allocation, but it will
-	 * fall back to that if necessary. */
+	/*
+	 * msg_insert really wants us to have a valid, spare node struct so it
+	 * doesn't have to kmalloc a GFP_ATOMIC allocation, but it will fall
+	 * back to that if necessary.
+	 */
 	if (!info->node_cache)
 		new_leaf = kmalloc(sizeof(*new_leaf), GFP_KERNEL);
 
@@ -1058,8 +1064,10 @@ SYSCALL_DEFINE5(mq_timedsend, mqd_t, mqd
 			wait.msg = (void *) msg_ptr;
 			wait.state = STATE_NONE;
 			ret = wq_sleep(info, SEND, timeout, &wait);
-			/* wq_sleep must be called with info->lock held, and
-			 * returns with the lock released */
+			/*
+			 * wq_sleep must be called with info->lock held, and
+			 * returns with the lock released
+			 */
 			goto out_free;
 		}
 	} else {
@@ -1136,9 +1144,11 @@ SYSCALL_DEFINE5(mq_timedreceive, mqd_t, 
 		goto out_fput;
 	}
 
-	/* msg_insert really wants us to have a valid, spare node struct so
-	 * it doesn't have to kmalloc a GFP_ATOMIC allocation, but it will
-	 * fall back to that if necessary. */
+	/*
+	 * msg_insert really wants us to have a valid, spare node struct so it
+	 * doesn't have to kmalloc a GFP_ATOMIC allocation, but it will fall
+	 * back to that if necessary.
+	 */
 	if (!info->node_cache)
 		new_leaf = kmalloc(sizeof(*new_leaf), GFP_KERNEL);
 


> @@ -132,15 +134,24 @@ static int msg_insert(struct msg_msg *msg, struct mqueue_inode_info *info)
>  		else
>  			p = &(*p)->rb_right;
>  	}
> -	leaf = kzalloc(sizeof(*leaf), GFP_ATOMIC);
> +	if (info->node_cache) {
> +		leaf = info->node_cache;
> +	} else if (new_leaf) {
> +		leaf = *new_leaf;
> +		*new_leaf = NULL;
> +	} else
> +		leaf = kmalloc(sizeof(*leaf), GFP_ATOMIC);
>  	if (!leaf)
>  		return -ENOMEM;
> -	rb_init_node(&leaf->rb_node);
> -	INIT_LIST_HEAD(&leaf->msg_list);
> +	if (!info->node_cache) {
> +		rb_init_node(&leaf->rb_node);
> +		INIT_LIST_HEAD(&leaf->msg_list);
> +		info->qsize += sizeof(*leaf);
> +	} else
> +		info->node_cache = NULL;

So you have

: 	if (info->node_cache) {
: 		leaf = info->node_cache;
: 	} else if (new_leaf) {
: 		leaf = *new_leaf;
: 		*new_leaf = NULL;
: 	} else {
: 		leaf = kmalloc(sizeof(*leaf), GFP_ATOMIC);
: 	}
: 	if (!leaf)
: 		return -ENOMEM;
: 	if (!info->node_cache) {
: 		rb_init_node(&leaf->rb_node);
: 		INIT_LIST_HEAD(&leaf->msg_list);
: 		info->qsize += sizeof(*leaf);
: 	} else {
: 		info->node_cache = NULL;
:	}
 
But I think it's a bit simpler to do

: 	if (info->node_cache) {
: 		leaf = info->node_cache;
: 		info->node_cache = NULL;
: 	} else {
: 		if (new_leaf) {
: 			leaf = *new_leaf;
: 			*new_leaf = NULL;
: 		} else {
: 			leaf = kmalloc(sizeof(*leaf), GFP_ATOMIC);
: 			if (!leaf)
: 				return -ENOMEM;
: 		}
: 		rb_init_node(&leaf->rb_node);
: 		INIT_LIST_HEAD(&leaf->msg_list);
: 		info->qsize += sizeof(*leaf);
: 	}

and my version generates 25 bytes less text.

But why did we need to initialise *leaf's fields if the caller passed
us something in *new_leaf?  sys_mq_timedsend() already initialised the
fields for us?



^ permalink raw reply	[flat|nested] 3+ messages in thread

* Re: [Patch] ipc/mqueue: add rbtree node caching support
  2012-05-16 22:12 ` Andrew Morton
@ 2012-05-17  2:07   ` Doug Ledford
  0 siblings, 0 replies; 3+ messages in thread
From: Doug Ledford @ 2012-05-17  2:07 UTC (permalink / raw)
  To: Andrew Morton; +Cc: manfred, levinsasha928, npiggin, linux-kernel

[-- Attachment #1: Type: text/plain, Size: 8813 bytes --]

On 5/16/2012 6:12 PM, Andrew Morton wrote:
> On Wed, 16 May 2012 15:28:56 -0400
> Doug Ledford <dledford@redhat.com> wrote:
> 
>> When I wrote the first patch that added the rbtree support for message
>> queue insertion, it sped up the case where the queue was very full
>> drastically from the original code.  It, however, slowed down the
>> case where the queue was empty (not drastically though).
>>
>> This patch caches the last freed rbtree node struct so we can quickly
>> reuse it when we get a new message.  This is the common path for any
>> queue that very frequently goes from 0 to 1 then back to 0 messages
>> in queue.
>>
>> Andrew Morton didn't like that we were doing a GFP_ATOMIC allocation
>> in msg_insert, so this patch attempts to speculatively allocate a
>> new node struct outside of the spin lock when we know we need it, but
>> will still fall back to a GFP_ATOMIC allocation if it has to.
>>
>> Once I added the caching, the necessary various ret = ; spin_unlock
>> gyrations in mq_timedsend were getting pretty ugly, so this also slightly
>> refactors that function to streamline the flow of the code and the
>> function exit.
>>
>> Finally, while working on getting performance back I made sure that all
>> of the node structs were always fully initialized when they were first
>> used, rendering the use of kzalloc unnecessary and a waste of CPU cycles.
>>
>> The net result of all of this is:
>>
>> 1) We will avoid a GFP_ATOMIC allocation when possible, but fall back on
>> it when necessary.
>> 2) We will speculatively allocate a node struct using GFP_KERNEL if our
>> cache is empty (and save the struct to our cache if it's still empty after
>> we have obtained the spin lock).
>> 3) The performance of the common queue empty case has significantly
>> improved and is now much more in line with the older performance for
>> this case.
>>
>> The performance changes are:
>>
>>             Old mqueue      new mqueue      new mqueue + caching
>> queue empty
>> send/recv   305/288ns       349/318ns       310/322ns
>>
>> I don't think we'll ever be able to get the recv performance back, but
>> that's because the old recv performance was a direct result and consequence
>> of the old methods abysmal send performance.  The recv path simply must
>> do more so that the send path does not incur such a penalty under higher
>> queue depths.
>>
>> As it turns out, the new caching code also sped up the various queue full
>> cases relative to my last patch.  That could be because of the difference
>> between the syscall path in 3.3.4-rc5 and 3.3.4-rc6, or because of the
>> change in code flow in the mq_timedsend routine.  Regardless, I'll take
>> it.  It wasn't huge, and I *would* say it was within the margin for error,
>> but after many repeated runs what I'm seeing is that the old numbers
>> trend slightly higher (about 10 to 20ns depending on which test is the
>> one running).
>>
> 
> LGTM.  Here be some coding-style fixes:
> 
> --- a/ipc/mqueue.c~ipc-mqueue-add-rbtree-node-caching-support-fix
> +++ a/ipc/mqueue.c
> @@ -141,16 +141,18 @@ static int msg_insert(struct msg_msg *ms
>  	} else if (new_leaf) {
>  		leaf = *new_leaf;
>  		*new_leaf = NULL;
> -	} else
> +	} else {
>  		leaf = kmalloc(sizeof(*leaf), GFP_ATOMIC);
> +	}
>  	if (!leaf)
>  		return -ENOMEM;
>  	if (!info->node_cache) {
>  		rb_init_node(&leaf->rb_node);
>  		INIT_LIST_HEAD(&leaf->msg_list);
>  		info->qsize += sizeof(*leaf);
> -	} else
> +	} else {
>  		info->node_cache = NULL;
> +	}
>  	leaf->priority = msg->m_type;
>  	rb_link_node(&leaf->rb_node, parent, p);
>  	rb_insert_color(&leaf->rb_node, &info->msg_tree);
> @@ -196,8 +198,9 @@ try_again:
>  		if (info->node_cache) {
>  			info->qsize -= sizeof(*leaf);
>  			kfree(leaf);
> -		} else
> +		} else {
>  			info->node_cache = leaf;
> +		}
>  		goto try_again;
>  	} else {
>  		msg = list_first_entry(&leaf->msg_list,
> @@ -208,8 +211,9 @@ try_again:
>  			if (info->node_cache) {
>  				info->qsize -= sizeof(*leaf);
>  				kfree(leaf);
> -			} else
> +			} else {
>  				info->node_cache = leaf;
> +			}
>  		}
>  	}
>  	info->attr.mq_curmsgs--;
> @@ -1033,9 +1037,11 @@ SYSCALL_DEFINE5(mq_timedsend, mqd_t, mqd
>  	msg_ptr->m_ts = msg_len;
>  	msg_ptr->m_type = msg_prio;
>  
> -	/* msg_insert really wants us to have a valid, spare node struct so
> -	 * it doesn't have to kmalloc a GFP_ATOMIC allocation, but it will
> -	 * fall back to that if necessary. */
> +	/*
> +	 * msg_insert really wants us to have a valid, spare node struct so it
> +	 * doesn't have to kmalloc a GFP_ATOMIC allocation, but it will fall
> +	 * back to that if necessary.
> +	 */
>  	if (!info->node_cache)
>  		new_leaf = kmalloc(sizeof(*new_leaf), GFP_KERNEL);
>  
> @@ -1058,8 +1064,10 @@ SYSCALL_DEFINE5(mq_timedsend, mqd_t, mqd
>  			wait.msg = (void *) msg_ptr;
>  			wait.state = STATE_NONE;
>  			ret = wq_sleep(info, SEND, timeout, &wait);
> -			/* wq_sleep must be called with info->lock held, and
> -			 * returns with the lock released */
> +			/*
> +			 * wq_sleep must be called with info->lock held, and
> +			 * returns with the lock released
> +			 */
>  			goto out_free;
>  		}
>  	} else {
> @@ -1136,9 +1144,11 @@ SYSCALL_DEFINE5(mq_timedreceive, mqd_t, 
>  		goto out_fput;
>  	}
>  
> -	/* msg_insert really wants us to have a valid, spare node struct so
> -	 * it doesn't have to kmalloc a GFP_ATOMIC allocation, but it will
> -	 * fall back to that if necessary. */
> +	/*
> +	 * msg_insert really wants us to have a valid, spare node struct so it
> +	 * doesn't have to kmalloc a GFP_ATOMIC allocation, but it will fall
> +	 * back to that if necessary.
> +	 */
>  	if (!info->node_cache)
>  		new_leaf = kmalloc(sizeof(*new_leaf), GFP_KERNEL);
>  
> 
> 
>> @@ -132,15 +134,24 @@ static int msg_insert(struct msg_msg *msg, struct mqueue_inode_info *info)
>>  		else
>>  			p = &(*p)->rb_right;
>>  	}
>> -	leaf = kzalloc(sizeof(*leaf), GFP_ATOMIC);
>> +	if (info->node_cache) {
>> +		leaf = info->node_cache;
>> +	} else if (new_leaf) {
>> +		leaf = *new_leaf;
>> +		*new_leaf = NULL;
>> +	} else
>> +		leaf = kmalloc(sizeof(*leaf), GFP_ATOMIC);
>>  	if (!leaf)
>>  		return -ENOMEM;
>> -	rb_init_node(&leaf->rb_node);
>> -	INIT_LIST_HEAD(&leaf->msg_list);
>> +	if (!info->node_cache) {
>> +		rb_init_node(&leaf->rb_node);
>> +		INIT_LIST_HEAD(&leaf->msg_list);
>> +		info->qsize += sizeof(*leaf);
>> +	} else
>> +		info->node_cache = NULL;
> 
> So you have
> 
> : 	if (info->node_cache) {
> : 		leaf = info->node_cache;
> : 	} else if (new_leaf) {
> : 		leaf = *new_leaf;
> : 		*new_leaf = NULL;
> : 	} else {
> : 		leaf = kmalloc(sizeof(*leaf), GFP_ATOMIC);
> : 	}
> : 	if (!leaf)
> : 		return -ENOMEM;
> : 	if (!info->node_cache) {
> : 		rb_init_node(&leaf->rb_node);
> : 		INIT_LIST_HEAD(&leaf->msg_list);
> : 		info->qsize += sizeof(*leaf);
> : 	} else {
> : 		info->node_cache = NULL;
> :	}
>  
> But I think it's a bit simpler to do
> 
> : 	if (info->node_cache) {
> : 		leaf = info->node_cache;
> : 		info->node_cache = NULL;
> : 	} else {
> : 		if (new_leaf) {

Hmm, there be a bug there.  That should be if (*new_leaf) {

> : 			leaf = *new_leaf;
> : 			*new_leaf = NULL;
> : 		} else {
> : 			leaf = kmalloc(sizeof(*leaf), GFP_ATOMIC);
> : 			if (!leaf)
> : 				return -ENOMEM;
> : 		}
> : 		rb_init_node(&leaf->rb_node);
> : 		INIT_LIST_HEAD(&leaf->msg_list);
> : 		info->qsize += sizeof(*leaf);
> : 	}
> 
> and my version generates 25 bytes less text.
> 
> But why did we need to initialise *leaf's fields if the caller passed
> us something in *new_leaf?  sys_mq_timedsend() already initialised the
> fields for us?

Because in mq_timedsend if info->node_cache was NULL, then we would
speculatively allocate new_leaf.  Then we would call
spin_lock(&info->lock);.  We could end up waiting on a different thread
that is currently getting a message, and even though info->node_cache
was NULL when we checked it before the spin lock, it might not be
afterwards (although we are guaranteed to have an entry in node_cache at
that point, so I guess in mq_timedsend we could check if node_cache was
NULL, if so move new_leaf to node_cache and initialize it, and if not
free new_leaf because we are guaranteed that we will keep node_cache
through msg_insert since we don't release the spin lock in that time.

Let me send a new version of the patch that I like more.

-- 
Doug Ledford <dledford@redhat.com>
              GPG KeyID: 0E572FDD
	      http://people.redhat.com/dledford

Infiniband specific RPMs available at
	      http://people.redhat.com/dledford/Infiniband


[-- Attachment #2: OpenPGP digital signature --]
[-- Type: application/pgp-signature, Size: 898 bytes --]

^ permalink raw reply	[flat|nested] 3+ messages in thread

end of thread, other threads:[~2012-05-17  2:08 UTC | newest]

Thread overview: 3+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2012-05-16 19:28 [Patch] ipc/mqueue: add rbtree node caching support Doug Ledford
2012-05-16 22:12 ` Andrew Morton
2012-05-17  2:07   ` Doug Ledford

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).