All of lore.kernel.org
 help / color / mirror / Atom feed
* [RFC PATCH RT] rwsem: The return of multi-reader PI rwsems
@ 2014-04-09 19:19 Steven Rostedt
  2014-04-10 14:18 ` Mike Galbraith
                   ` (3 more replies)
  0 siblings, 4 replies; 28+ messages in thread
From: Steven Rostedt @ 2014-04-09 19:19 UTC (permalink / raw)
  To: LKML, linux-rt-users, Mike Galbraith, Paul E. McKenney, Paul Gortmaker
  Cc: Thomas Gleixner, Sebastian Andrzej Siewior, Clark Williams,
	Frederic Weisbecker, Peter Zijlstra, Ingo Molnar

A while back ago I wrote a patch that would allow for reader/writer
locks like rwlock and rwsems to have multiple readers in PREEMPT_RT. It
was slick and fast but unfortunately it was way too complex and ridden
with nasty little critters which earned me my large collection of
frozen sharks in the fridge (which are quite tasty).

The main problem with my previous solution was that I tried to be too
clever. I worked hard on making the rw mutex still have the "fast
path". That is, the cmpxchg that could allow a non contended grabbing
of the lock be one instruction and be off with it. But to get that
working required lots of tricks and black magic that was certainly
going to fail. Thus, with the raining of sharks on my parade, the
priority inheritance mutex with multiple owners died a slow painful
death.

So we thought.

But over the years, a new darkness was on the horizon. Complaints about
running highly threaded processes (did I hear Java?) were suffering
huge performance hits on the PREEMPT_RT kernel. Whether or not the
processes were real-time tasks, they still were horrible compared to
running the same tasks on the mainline kernel. Note, this was being
done on machines with many CPUs.

The culprit mostly was a single rwsem, the notorious mmap_sem that
can be taking several times for read, and as on RT, this is just a
single mutex, and it would serialize these accesses that would not
happen on mainline.

I looked back at my poor dead rw multi pi reader patch and thought to
myself. "How complex would this be if I removed the 'fast path' from
the code". I decided to build a new tower in Mordor.

I feel that I am correct. By removing the fast path and requiring all
accesses to the rwsem to go through the slow path (must take the
wait_lock to do anything). The code really wasn't that bad. I also only
focused on the rwsem and did not worry about the rwlocks as that hasn't
been pointed out as a bottle neck yet. If it does happen to be, this
code could easily work on rwlocks too.

I'm much more confident in this code than I was with my previous
version of the rwlock multi-reader patch. I added a bunch of comments
to this code to explain how things interact. The writer unlock was
still able to use the fast path as the writers are pretty much like a
normal mutex. Too bad that the writer unlock is not a high point of
contention.

This patch is built on top of the two other patches that I posted
earlier, which should not be as controversial.

If you have any benchmark on large machines I would be very happy if
you could test this patch against the unpatched version of -rt.

Cheers,

-- Steve

Signed-off-by: Steven Rostedt <rostedt@goodmis.org>
---
Index: linux-rt.git/kernel/rtmutex.c
===================================================================
--- linux-rt.git.orig/kernel/rtmutex.c
+++ linux-rt.git/kernel/rtmutex.c
@@ -26,6 +26,15 @@
 #include "rtmutex_common.h"
 
 /*
+ * rt_rw_limit is the number of simultaneous readers of a rwsem lock.
+ *
+ * rt_rw_limit gets updated on boot up to the number of
+ * possible CPUs, but we need to initialize it to something other
+ * than zero.
+ */
+unsigned rt_rw_limit = NR_CPUS;
+
+/*
  * lock->owner state tracking:
  *
  * lock->owner holds the task_struct pointer of the owner. Bit 0
@@ -110,19 +119,44 @@ static inline void init_lists(struct rt_
 		plist_head_init(&lock->wait_list);
 }
 
+static inline void init_rw_lists(struct rt_rw_mutex *rwlock)
+{
+	struct rt_mutex *lock = &rwlock->mutex;
+
+	/*
+	 * A rwsem priority is initialized to -1 and never will
+	 * be that again.
+	 */
+	if (unlikely(rwlock->prio < 0)) {
+		rwlock->prio = MAX_PRIO;
+		init_lists(lock);
+	}
+}
+
+static int rt_mutex_get_readers_prio(struct task_struct *task, int prio);
+static inline int task_has_reader_locks(struct task_struct *task);
+
 /*
  * Calculate task priority from the waiter list priority
  *
  * Return task->normal_prio when the waiter list is empty or when
  * the waiter is not allowed to do priority boosting
+ *
+ * On PREEMPT_RT, we also check the priorities of the list
+ * of read locks that the task holds.
  */
 int rt_mutex_getprio(struct task_struct *task)
 {
-	if (likely(!task_has_pi_waiters(task)))
-		return task->normal_prio;
+	int prio = task->normal_prio;
+
+	if (likely(!task_has_pi_waiters(task) &&
+		   !task_has_reader_locks(task)))
+		return prio;
+
+	if (task_has_reader_locks(task))
+		prio = rt_mutex_get_readers_prio(task, prio);
 
-	return min(task_top_pi_waiter(task)->pi_list_entry.prio,
-		   task->normal_prio);
+	return min(task_top_pi_waiter(task)->pi_list_entry.prio, prio);
 }
 
 /*
@@ -181,6 +215,11 @@ static void rt_mutex_wake_waiter(struct
  */
 int max_lock_depth = 1024;
 
+static int rt_mutex_adjust_readers(struct rt_mutex *orig_lock,
+				   struct rt_mutex_waiter *orig_waiter,
+				   struct task_struct *top_task,
+				   struct rt_mutex *lock,
+				   int recursion_depth);
 /*
  * Adjust the priority chain. Also used for deadlock detection.
  * Decreases task's usage by one - may thus free the task.
@@ -203,7 +242,8 @@ static int rt_mutex_adjust_prio_chain(st
 				      int deadlock_detect,
 				      struct rt_mutex *orig_lock,
 				      struct rt_mutex_waiter *orig_waiter,
-				      struct task_struct *top_task)
+				      struct task_struct *top_task,
+				      int recursion_depth)
 {
 	struct rt_mutex *lock;
 	struct rt_mutex_waiter *waiter, *top_waiter = orig_waiter;
@@ -316,6 +356,18 @@ static int rt_mutex_adjust_prio_chain(st
 
 	/* Grab the next task */
 	task = rt_mutex_owner(lock);
+
+	/*
+	 * Readers are special. We may need to boost more than one owner.
+	 */
+	if (unlikely(task == RT_RW_READER)) {
+		ret = rt_mutex_adjust_readers(orig_lock, orig_waiter,
+					      top_task, lock,
+					      recursion_depth);
+		raw_spin_unlock(&lock->wait_lock);
+		goto out;
+	}
+
 	get_task_struct(task);
 	raw_spin_lock_irqsave(&task->pi_lock, flags);
 
@@ -349,7 +401,7 @@ static int rt_mutex_adjust_prio_chain(st
 	raw_spin_unlock_irqrestore(&task->pi_lock, flags);
  out_put_task:
 	put_task_struct(task);
-
+ out:
 	return ret;
 }
 
@@ -518,6 +570,13 @@ static int task_blocks_on_rt_mutex(struc
 		return 0;
 
 	if (waiter == rt_mutex_top_waiter(lock)) {
+		/* readers are handled differently */
+		if (unlikely(owner == RT_RW_READER)) {
+			res = rt_mutex_adjust_readers(lock, waiter,
+						      current, lock, 0);
+			return res;
+		}
+
 		raw_spin_lock_irqsave(&owner->pi_lock, flags);
 		plist_del(&top_waiter->pi_list_entry, &owner->pi_waiters);
 		plist_add(&waiter->pi_list_entry, &owner->pi_waiters);
@@ -527,7 +586,8 @@ static int task_blocks_on_rt_mutex(struc
 			chain_walk = 1;
 		raw_spin_unlock_irqrestore(&owner->pi_lock, flags);
 	}
-	else if (debug_rt_mutex_detect_deadlock(waiter, detect_deadlock))
+	else if (debug_rt_mutex_detect_deadlock(waiter, detect_deadlock) &&
+		 owner != RT_RW_READER)
 		chain_walk = 1;
 
 	if (!chain_walk)
@@ -543,7 +603,7 @@ static int task_blocks_on_rt_mutex(struc
 	raw_spin_unlock(&lock->wait_lock);
 
 	res = rt_mutex_adjust_prio_chain(owner, detect_deadlock, lock, waiter,
-					 task);
+					 task, 0);
 
 	raw_spin_lock(&lock->wait_lock);
 
@@ -633,7 +693,7 @@ static void remove_waiter(struct rt_mute
 
 	raw_spin_unlock(&lock->wait_lock);
 
-	rt_mutex_adjust_prio_chain(owner, 0, lock, NULL, current);
+	rt_mutex_adjust_prio_chain(owner, 0, lock, NULL, current, 0);
 
 	raw_spin_lock(&lock->wait_lock);
 }
@@ -660,7 +720,7 @@ void rt_mutex_adjust_pi(struct task_stru
 	/* gets dropped in rt_mutex_adjust_prio_chain()! */
 	get_task_struct(task);
 	raw_spin_unlock_irqrestore(&task->pi_lock, flags);
-	rt_mutex_adjust_prio_chain(task, 0, NULL, NULL, task);
+	rt_mutex_adjust_prio_chain(task, 0, NULL, NULL, task, 0);
 }
 
 #ifdef CONFIG_PREEMPT_RT_FULL
@@ -739,7 +799,7 @@ static void  noinline __sched rt_spin_lo
 	struct rt_mutex_waiter waiter, *top_waiter;
 	int ret;
 
-	rt_mutex_init_waiter(&waiter, true);
+	rt_mutex_init_waiter(&waiter, true, false);
 
 	raw_spin_lock(&lock->wait_lock);
 	init_lists(lock);
@@ -1001,15 +1061,564 @@ __mutex_lock_check_stamp(struct rt_mutex
 
 	return 0;
 }
+
+/*
+ * Wake up the next waiter on a rw lock.
+ *
+ * Similar to wakeup_next_waiter() but the waiter is not on
+ * the owner's pi_waiters. Also, it does not reset the lock
+ * owner.
+ *
+ * Called with lock->wait_lock held.
+ */
+static void wakeup_next_rw_waiter(struct rt_mutex *lock)
+{
+	struct rt_mutex_waiter *waiter;
+
+	waiter = rt_mutex_top_waiter(lock);
+	rt_mutex_wake_waiter(waiter);
+}
+
+/* Called with rwmutex->mutex.wait_lock held */
+static inline void
+rt_rw_mutex_take_as_reader(struct task_struct *task,
+			   struct rt_rw_mutex *rwmutex,
+			   struct rt_mutex_waiter *waiter)
+{
+	struct reader_lock_struct *rls;
+	struct rt_mutex *mutex = &rwmutex->mutex;
+	unsigned long flags;
+	bool wakeup = false;
+
+	rwmutex->nr_owners++;
+	rt_mutex_set_owner(&rwmutex->mutex, RT_RW_READER);
+
+	if (waiter || rt_mutex_has_waiters(mutex)) {
+		unsigned long flags;
+		struct rt_mutex_waiter *top;
+
+		raw_spin_lock_irqsave(&task->pi_lock, flags);
+
+		/* remove the queued waiter. */
+		if (waiter) {
+			plist_del(&waiter->list_entry, &mutex->wait_list);
+			task->pi_blocked_on = NULL;
+		}
+
+		/*
+		 * Initialize the rwmutex prio to the priority of
+		 * the top waiter.
+		 */
+		if (rt_mutex_has_waiters(mutex)) {
+			top = rt_mutex_top_waiter(mutex);
+			top->pi_list_entry.prio = top->list_entry.prio;
+			/*
+			 * Readers set the lock priority for faster access
+			 * to read the priorities of the locks it owns
+			 * when boosting. This helps to not have to take
+			 * the pi_lock of the task. The rwmutex->prio
+			 * is protected by the rwmutex->mutex.wait_lock,
+			 * which is held during boosting.
+			 */
+			rwmutex->prio = top->list_entry.prio;
+
+			/*
+			 * If this waiter is a reader, and the reader limit
+			 * has not been hit, then we can wake this waiter
+			 * up too.
+			 */
+			if (!top->writer && rwmutex->nr_owners < rt_rw_limit)
+				wakeup = true;
+		} else
+			rwmutex->prio = MAX_PRIO;
+		raw_spin_unlock_irqrestore(&task->pi_lock, flags);
+
+	} else
+		rwmutex->prio = MAX_PRIO;
+
+	/*
+	 * It is possible to have holes in the owned_read_locks array.
+	 * If we take read lock A and then B, but then release A, we
+	 * can't move the pointer of B because if something blocks on
+	 * B, it can use the B pointer to boost this task. B can only
+	 * be moved, by owning the wait_list lock of B. Remember, the
+	 * B lock has its pointers to that index of our array.
+	 */
+	rls = &task->owned_read_locks[task->reader_lock_free];
+	BUG_ON(rls->lock);
+
+	/*
+	 * Grabing the pi_lock here as other tasks can boost this
+	 * lock via other held locks, and if free lock descriptor
+	 * was not at the end of the owned_read_locks array, then
+	 * it might get confused if it sees this descriptor with
+	 * a lock and no task.
+	 */
+	raw_spin_lock_irqsave(&task->pi_lock, flags);
+	rls->lock = rwmutex;
+	rls->task = task;
+	list_add(&rls->list, &rwmutex->owners);
+	raw_spin_unlock_irqrestore(&task->pi_lock, flags);
+
+	if (task->reader_lock_free == task->reader_lock_count) {
+		/*
+		 * If we nest too deep, then this task can never get the lock.
+		 * This task will then block for good. Warn about this and
+		 * hopefully, people will notice the warning. If not, they will notice
+		 * the dead task. Maybe this should be a BUG_ON()
+		 */
+		if (WARN_ON(task->reader_lock_count == MAX_RWLOCK_DEPTH))
+			return;
+
+		/*
+		 * We don't need to take the pi_lock here as we have the
+		 * wait_lock of the lock at the end of the list. If this task
+		 * was being boosted by a task blocked on this lock, it would
+		 * need to grab the wait_lock before boosting this task.
+		 */
+		task->reader_lock_count++;
+		task->reader_lock_free++;
+	} else {
+		/*
+		 * Find the next free lock in array. Again, we do not need
+		 * to grab the pi_lock because the boosting doesn't use
+		 * the reader_lock_free variable, which is the only thing
+		 * we are updating here.
+		 */
+		do {
+			rls = &task->owned_read_locks[++task->reader_lock_free];
+		} while (rls->lock &&
+			 task->reader_lock_free < task->reader_lock_count);
+	}
+
+	if (wakeup)
+		wakeup_next_rw_waiter(&rwmutex->mutex);
+}
+
+static int try_to_take_rw_read(struct rt_rw_mutex *rwmutex,
+			       struct rt_mutex_waiter *waiter)
+{
+	struct task_struct *task = current;
+	struct rt_mutex *mutex = &rwmutex->mutex;
+	struct task_struct *owner;
+
+	assert_raw_spin_locked(&mutex->wait_lock);
+
+	/* The writer unlock can use the fast path */
+	mark_rt_mutex_waiters(mutex);
+
+	/* If we are at the reader limit, we can't take the lock */
+	if (rt_rw_limit && unlikely(rwmutex->nr_owners >= rt_rw_limit))
+		return 0;
+
+	/*
+	 * If there's no waiters, and there is no owner or
+	 * the owner is a reader, we get the lock.
+	 * Note, if the waiter or pending bits are set in the owner
+	 * then we need to do more checks.
+	 */
+	if (likely(!mutex->owner || mutex->owner == RT_RW_READER))
+		goto taken;
+
+	owner = rt_mutex_owner(mutex);
+
+	/* If the lock is owned by a task, then it is a writer */
+	if (owner && owner != RT_RW_READER)
+		return 0;
+
+	/*
+	 * A writer or a reader may be waiting. In either case, we
+	 * may still be able to steal the lock. The RT rwsems are
+	 * not fair if the new reader comes in and is higher priority
+	 * than all the waiters.
+	 */
+	if (likely(rt_mutex_has_waiters(mutex))) {
+		struct task_struct *pown = rt_mutex_top_waiter(mutex)->task;
+
+		if (task != pown && !lock_is_stealable(task, pown, STEAL_NORMAL))
+			return 0;
+	}
+
+ taken:
+	rt_rw_mutex_take_as_reader(task, rwmutex, waiter);
+	return 1;
+}
+
+void rt_rw_mutex_read_lock(struct rt_rw_mutex *rwmutex)
+{
+	struct rt_mutex_waiter waiter;
+	struct rt_mutex *mutex = &rwmutex->mutex;
+	int ret;
+
+	raw_spin_lock(&mutex->wait_lock);
+	init_rw_lists(rwmutex);
+
+	if (likely(try_to_take_rw_read(rwmutex, NULL))) {
+		/* Got the lock */
+		raw_spin_unlock(&mutex->wait_lock);
+		return;
+	}
+
+	rt_mutex_init_waiter(&waiter, false, false);
+
+	set_current_state(TASK_UNINTERRUPTIBLE);
+
+	ret = task_blocks_on_rt_mutex(mutex, &waiter, current, 0);
+	BUG_ON(ret);
+
+	for (;;) {
+		/* Try to acquire the lock: */
+		if (try_to_take_rw_read(rwmutex, &waiter))
+			break;
+
+		raw_spin_unlock(&mutex->wait_lock);
+
+		debug_rt_mutex_print_deadlock(waiter);
+
+		schedule_rt_mutex(mutex);
+
+		raw_spin_lock(&mutex->wait_lock);
+		set_current_state(TASK_UNINTERRUPTIBLE);
+	}
+
+	set_current_state(TASK_RUNNING);
+
+	raw_spin_unlock(&mutex->wait_lock);
+
+	debug_rt_mutex_free_waiter(&waiter);
+}
+
+int rt_rw_mutex_read_trylock(struct rt_rw_mutex *rwmutex)
+{
+	struct rt_mutex *mutex = &rwmutex->mutex;
+	int ret = 0;
+
+	if (!raw_spin_trylock(&mutex->wait_lock))
+		return ret;
+
+	init_rw_lists(rwmutex);
+
+	if (try_to_take_rw_read(rwmutex, NULL))
+		ret = 1;
+
+	raw_spin_unlock(&mutex->wait_lock);
+
+	return ret;
+}
+
+static int
+try_to_take_rw_write(struct rt_rw_mutex *rwmutex, struct rt_mutex_waiter *waiter)
+{
+	struct rt_mutex *mutex = &rwmutex->mutex;
+
+	/* Writers block if there's any readers owning the lock */
+	if (rwmutex->nr_owners)
+		return 0;
+
+	/* No readers, then we can try to take the mutex normally. */
+	return try_to_take_rt_mutex(mutex, current, waiter);
+}
+
+void rt_rw_mutex_write_lock(struct rt_rw_mutex *rwmutex)
+{
+	struct rt_mutex *mutex = &rwmutex->mutex;
+	struct rt_mutex_waiter waiter;
+	struct task_struct *task = current;
+	int ret;
+
+	raw_spin_lock(&mutex->wait_lock);
+	init_rw_lists(rwmutex);
+
+	if (try_to_take_rw_write(rwmutex, NULL)) {
+		raw_spin_unlock(&mutex->wait_lock);
+		return;
+	}
+
+	/* Writers wake up differently than readers (flag it) */
+	rt_mutex_init_waiter(&waiter, false, true);
+
+	ret = task_blocks_on_rt_mutex(mutex, &waiter, task, 0);
+	BUG_ON(ret);
+
+	set_current_state(TASK_UNINTERRUPTIBLE);
+
+	for (;;) {
+		/* Try to acquire the lock: */
+		if (try_to_take_rw_write(rwmutex, &waiter))
+			break;
+
+		raw_spin_unlock(&mutex->wait_lock);
+
+		debug_rt_mutex_print_deadlock(waiter);
+
+		schedule_rt_mutex(mutex);
+
+		raw_spin_lock(&mutex->wait_lock);
+		set_current_state(TASK_UNINTERRUPTIBLE);
+	}
+
+	set_current_state(TASK_RUNNING);
+
+	raw_spin_unlock(&mutex->wait_lock);
+	WARN_ONCE(rwmutex->nr_owners, "Writer has lock with readers");
+
+	debug_rt_mutex_free_waiter(&waiter);
+}
+
+int rt_rw_mutex_write_trylock(struct rt_rw_mutex *rwmutex)
+{
+	struct rt_mutex *mutex = &rwmutex->mutex;
+	int ret = 0;
+
+	if (!raw_spin_trylock(&mutex->wait_lock))
+		return ret;
+
+	init_rw_lists(rwmutex);
+
+	if (try_to_take_rw_write(rwmutex, NULL))
+		ret = 1;
+
+	raw_spin_unlock(&mutex->wait_lock);
+
+	return ret;
+}
+
+/*
+ * When a reader lock is released, see if the reader_lock_count
+ * can be moved back.
+ */
+static void shrink_reader_lock_array(struct task_struct *task)
+{
+	struct reader_lock_struct *read_locks = task->owned_read_locks;
+
+	while (task->reader_lock_count &&
+	       read_locks[task->reader_lock_count - 1].lock == NULL)
+		task->reader_lock_count--;
+
+	if (task->reader_lock_free > task->reader_lock_count)
+		task->reader_lock_free = task->reader_lock_count;
+}
+
+void rt_rw_mutex_read_unlock(struct rt_rw_mutex *rwmutex)
+{
+	struct rt_mutex *mutex = &rwmutex->mutex;
+	struct rt_mutex_waiter *waiter;
+	struct reader_lock_struct *rls;
+	struct task_struct *task = current;
+	unsigned long flags;
+	int readers;
+	int i;
+
+	raw_spin_lock(&mutex->wait_lock);
+
+	rt_mutex_deadlock_account_unlock(current);
+
+	WARN_ON_ONCE(!rwmutex->nr_owners);
+
+	rwmutex->nr_owners--;
+
+	if (rt_mutex_has_waiters(mutex)) {
+		/*
+		 * If the top waiter is a reader and we are under
+		 * the limit, or the top waiter is a writer and
+		 * there's no more readers, then we can give the
+		 * top waiter pending ownership and wake it up.
+		 *
+		 * To simplify things, we only wake up one task, even
+		 * if its a reader. If the reader wakes up and gets the
+		 * lock, it will look to see if it can wake up the next
+		 * waiter, and so on. This way we only need to worry about
+		 * one task at a time.
+		 */
+		waiter = rt_mutex_top_waiter(mutex);
+		readers = rwmutex->nr_owners;
+		if ((waiter->writer && !readers) ||
+		    (!waiter->writer && readers < rt_rw_limit))
+			wakeup_next_rw_waiter(mutex);
+	} else
+		rwmutex->prio = MAX_PRIO;
+
+	if (!rwmutex->nr_owners)
+		rt_mutex_set_owner(&rwmutex->mutex, NULL);
+
+	raw_spin_lock_irqsave(&task->pi_lock, flags);
+	/* Remove the lock from this tasks list */
+	for (i = task->reader_lock_count - 1; i >= 0; i--) {
+		rls = &task->owned_read_locks[i];
+
+		if (rls->lock == rwmutex) {
+			rls->lock = NULL;
+			list_del_init(&rls->list);
+			/* Shrink the array if we can. */
+			if (i == task->reader_lock_count - 1)
+				shrink_reader_lock_array(task);
+			else if (i < task->reader_lock_free)
+				task->reader_lock_free = i;
+			break;
+		}
+	}
+	raw_spin_unlock_irqrestore(&task->pi_lock, flags);
+
+	WARN_ON_ONCE(i < 0);
+
+	raw_spin_unlock(&mutex->wait_lock);
+
+	/* Undo pi boosting if necessary */
+	rt_mutex_adjust_prio(current);
+}
+
+void rt_rw_mutex_downgrade_write(struct rt_rw_mutex *rwmutex)
+{
+	struct task_struct *task = current;
+	struct rt_mutex *mutex = &rwmutex->mutex;
+
+	raw_spin_lock(&mutex->wait_lock);
+
+	/*
+	 * Writers have normal pi with other tasks blocked
+	 * on the lock. That is, the top waiter will be in the
+	 * pi_list of this task. But for readers, waiters of
+	 * the lock are not included in the pi_list, only the
+	 * locks are. We must remove the top waiter of this
+	 * lock from this task.
+	 */
+	if (rt_mutex_has_waiters(mutex)) {
+		struct rt_mutex_waiter *waiter;
+		unsigned long flags;
+
+		waiter = rt_mutex_top_waiter(mutex);
+		raw_spin_lock_irqsave(&task->pi_lock, flags);
+		plist_del(&waiter->pi_list_entry, &task->pi_waiters);
+		/*
+		 * The rt_rw_mutex_take_as_reader() will update
+		 * the rwmutex prio.
+		 */
+		raw_spin_unlock_irqrestore(&task->pi_lock, flags);
+	}
+
+	WARN_ONCE(rwmutex->nr_owners, "Writer owned with readers");
+
+	rt_rw_mutex_take_as_reader(task, rwmutex, NULL);
+
+	raw_spin_unlock(&mutex->wait_lock);
+}
+
+static inline int task_has_reader_locks(struct task_struct *task)
+{
+	return task->reader_lock_count;
+}
+
+static int rt_mutex_get_readers_prio(struct task_struct *task, int prio)
+{
+	struct reader_lock_struct *rls;
+	struct rt_rw_mutex *rwmutex;
+	int lock_prio;
+	int i;
+
+	for (i = 0; i < task->reader_lock_count; i++) {
+		rls = &task->owned_read_locks[i];
+		rwmutex = rls->lock;
+		if (!rwmutex)
+			continue;
+		lock_prio = rwmutex->prio;
+		if (prio > lock_prio)
+			prio = lock_prio;
+	}
+	WARN_ON_ONCE(prio < 0);
+
+	return prio;
+}
+
+/* Expects to be called with lock->wait_lock held */
+static int rt_mutex_adjust_readers(struct rt_mutex *orig_lock,
+				   struct rt_mutex_waiter *orig_waiter,
+				   struct task_struct *top_task,
+				   struct rt_mutex *lock,
+				   int recursion_depth)
+{
+	struct reader_lock_struct *rls;
+	struct rt_mutex_waiter *waiter;
+	struct task_struct *task;
+	struct rt_rw_mutex *rwmutex = container_of(lock, struct rt_rw_mutex, mutex);
+	unsigned long flags;
+
+	/* Update the rwmutex's prio */
+	if (rt_mutex_has_waiters(lock)) {
+		waiter = rt_mutex_top_waiter(lock);
+		/*
+		 * Do we need to grab the task->pi_lock?
+		 * Really, we are only reading it. If it
+		 * changes, then that should follow this chain
+		 * too.
+		 */
+		rwmutex->prio = waiter->task->prio;
+	} else
+		rwmutex->prio = MAX_PRIO;
+
+	if (recursion_depth >= MAX_RWLOCK_DEPTH) {
+		WARN_ON(1);
+		return 1;
+	}
+
+	list_for_each_entry(rls, &rwmutex->owners, list) {
+		bool skip = false;
+
+		task = rls->task;
+
+		raw_spin_lock_irqsave(&task->pi_lock, flags);
+		__rt_mutex_adjust_prio(task);
+		/*
+		 * We need to grab the pi_lock to adjust the task prio
+		 * might as well use this to check if the task is blocked
+		 * as well, and save on a call to the prio chain that will
+		 * just grab the lock again and do the test.
+		 */
+		if (!rt_mutex_real_waiter(task->pi_blocked_on))
+			skip = true;
+		raw_spin_unlock_irqrestore(&task->pi_lock, flags);
+
+		if (skip)
+			continue;
+
+		get_task_struct(task);
+		/*
+		 * rt_mutex_adjust_prio_chain will do
+		 * the put_task_struct
+		 */
+		rt_mutex_adjust_prio_chain(task, 0, orig_lock,
+					   orig_waiter, top_task,
+					   recursion_depth+1);
+	}
+
+	return 0;
+}
 #else
+static int rt_mutex_adjust_readers(struct rt_mutex *orig_lock,
+				   struct rt_mutex_waiter *orig_waiter,
+				   struct task_struct *top_task,
+				   struct rt_mutex *lock,
+				   int recursion_depth)
+{
+	return 0;
+}
+
+static int rt_mutex_get_readers_prio(struct task_struct *task, int prio)
+{
+	return prio;
+}
+
+static inline int task_has_reader_locks(struct task_struct *task)
+{
+	return 0;
+}
+
 static inline int __sched
 __mutex_lock_check_stamp(struct rt_mutex *lock, struct ww_acquire_ctx *ctx)
 {
 	BUG();
 	return 0;
 }
-
-#endif
+#endif /* CONFIG_PREEMPT_RT */
 
 /**
  * __rt_mutex_slowlock() - Perform the wait-wake-try-to-take loop
@@ -1154,7 +1763,7 @@ rt_mutex_slowlock(struct rt_mutex *lock,
 	struct rt_mutex_waiter waiter;
 	int ret = 0;
 
-	rt_mutex_init_waiter(&waiter, false);
+	rt_mutex_init_waiter(&waiter, false, false);
 
 	raw_spin_lock(&lock->wait_lock);
 	init_lists(lock);
@@ -1718,4 +2327,12 @@ void __sched ww_mutex_unlock(struct ww_m
 	rt_mutex_unlock(&lock->base.lock);
 }
 EXPORT_SYMBOL(ww_mutex_unlock);
+
+static int __init rt_rw_limit_init(void)
+{
+	rt_rw_limit = num_possible_cpus();
+	return 0;
+}
+core_initcall(rt_rw_limit_init);
+
 #endif
Index: linux-rt.git/kernel/rtmutex_common.h
===================================================================
--- linux-rt.git.orig/kernel/rtmutex_common.h
+++ linux-rt.git/kernel/rtmutex_common.h
@@ -43,6 +43,7 @@ extern void schedule_rt_mutex_test(struc
  * @list_entry:		pi node to enqueue into the mutex waiters list
  * @pi_list_entry:	pi node to enqueue into the mutex owner waiters list
  * @task:		task reference to the blocked task
+ * @writer:		true if its a rwsem writer that is blocked
  */
 struct rt_mutex_waiter {
 	struct plist_node	list_entry;
@@ -50,6 +51,7 @@ struct rt_mutex_waiter {
 	struct task_struct	*task;
 	struct rt_mutex		*lock;
 	bool			savestate;
+	bool			writer;
 #ifdef CONFIG_DEBUG_RT_MUTEXES
 	unsigned long		ip;
 	struct pid		*deadlock_task_pid;
@@ -101,6 +103,20 @@ static inline struct task_struct *rt_mut
 		((unsigned long)lock->owner & ~RT_MUTEX_OWNER_MASKALL);
 }
 
+/* used as reader owner of the mutex */
+#define RT_RW_READER		(struct task_struct *)0x100
+
+#ifdef CONFIG_PREEMPT_RT_FULL
+
+void rt_rw_mutex_read_unlock(struct rt_rw_mutex *rwmutex);
+void rt_rw_mutex_downgrade_write(struct rt_rw_mutex *rwmutex);
+int rt_rw_mutex_write_trylock(struct rt_rw_mutex *rwmutex);
+void rt_rw_mutex_write_lock(struct rt_rw_mutex *rwmutex);
+int rt_rw_mutex_read_trylock(struct rt_rw_mutex *rwmutex);
+void rt_rw_mutex_read_lock(struct rt_rw_mutex *rwmutex);
+
+#endif /* CONFIG_PREEMPT_RT */
+
 /*
  * PI-futex support (proxy locking functions, etc.):
  */
@@ -128,11 +144,12 @@ extern int rt_mutex_finish_proxy_lock(st
 #endif
 
 static inline void
-rt_mutex_init_waiter(struct rt_mutex_waiter *waiter, bool savestate)
+rt_mutex_init_waiter(struct rt_mutex_waiter *waiter, bool savestate, bool writer)
 {
 	debug_rt_mutex_init_waiter(waiter);
 	waiter->task = NULL;
 	waiter->savestate = savestate;
+	waiter->writer = writer;
 }
 
 #endif
Index: linux-rt.git/kernel/rt.c
===================================================================
--- linux-rt.git.orig/kernel/rt.c
+++ linux-rt.git/kernel/rt.c
@@ -310,14 +310,19 @@ EXPORT_SYMBOL(__rt_rwlock_init);
 void  rt_up_write(struct rw_semaphore *rwsem)
 {
 	rwsem_release(&rwsem->dep_map, 1, _RET_IP_);
-	rt_mutex_unlock(&rwsem->lock);
+	/*
+	 * Unlocking a write is the same as unlocking the mutex.
+	 * The woken reader will do all the work if it needs to
+	 * wake up more than one reader.
+	 */
+	__rt_spin_unlock(&rwsem->rwlock.mutex);
 }
 EXPORT_SYMBOL(rt_up_write);
 
 void  rt_up_read(struct rw_semaphore *rwsem)
 {
 	rwsem_release(&rwsem->dep_map, 1, _RET_IP_);
-	rt_mutex_unlock(&rwsem->lock);
+	rt_rw_mutex_read_unlock(&rwsem->rwlock);
 }
 EXPORT_SYMBOL(rt_up_read);
 
@@ -327,13 +332,14 @@ EXPORT_SYMBOL(rt_up_read);
  */
 void  rt_downgrade_write(struct rw_semaphore *rwsem)
 {
-	BUG_ON(rt_mutex_owner(&rwsem->lock) != current);
+	BUG_ON(rt_mutex_owner(&rwsem->rwlock.mutex) != current);
+	rt_rw_mutex_downgrade_write(&rwsem->rwlock);
 }
 EXPORT_SYMBOL(rt_downgrade_write);
 
 int  rt_down_write_trylock(struct rw_semaphore *rwsem)
 {
-	int ret = rt_mutex_trylock(&rwsem->lock);
+	int ret = rt_rw_mutex_write_trylock(&rwsem->rwlock);
 
 	if (ret)
 		rwsem_acquire(&rwsem->dep_map, 0, 1, _RET_IP_);
@@ -344,14 +350,14 @@ EXPORT_SYMBOL(rt_down_write_trylock);
 void  rt_down_write(struct rw_semaphore *rwsem)
 {
 	rwsem_acquire(&rwsem->dep_map, 0, 0, _RET_IP_);
-	rt_mutex_lock(&rwsem->lock);
+	rt_rw_mutex_write_lock(&rwsem->rwlock);
 }
 EXPORT_SYMBOL(rt_down_write);
 
 void  rt_down_write_nested(struct rw_semaphore *rwsem, int subclass)
 {
 	rwsem_acquire(&rwsem->dep_map, subclass, 0, _RET_IP_);
-	rt_mutex_lock(&rwsem->lock);
+	rt_rw_mutex_write_lock(&rwsem->rwlock);
 }
 EXPORT_SYMBOL(rt_down_write_nested);
 
@@ -359,14 +365,14 @@ void rt_down_write_nested_lock(struct rw
 		struct lockdep_map *nest)
 {
 	rwsem_acquire_nest(&rwsem->dep_map, 0, 0, nest, _RET_IP_);
-	rt_mutex_lock(&rwsem->lock);
+	rt_rw_mutex_write_lock(&rwsem->rwlock);
 }
 
 int  rt_down_read_trylock(struct rw_semaphore *rwsem)
 {
 	int ret;
 
-	ret = rt_mutex_trylock(&rwsem->lock);
+	ret = rt_rw_mutex_read_trylock(&rwsem->rwlock);
 	if (ret)
 		rwsem_acquire(&rwsem->dep_map, 0, 1, _RET_IP_);
 
@@ -377,7 +383,7 @@ EXPORT_SYMBOL(rt_down_read_trylock);
 static void __rt_down_read(struct rw_semaphore *rwsem, int subclass)
 {
 	rwsem_acquire(&rwsem->dep_map, subclass, 0, _RET_IP_);
-	rt_mutex_lock(&rwsem->lock);
+	rt_rw_mutex_read_lock(&rwsem->rwlock);
 }
 
 void  rt_down_read(struct rw_semaphore *rwsem)
@@ -402,7 +408,8 @@ void  __rt_rwsem_init(struct rw_semaphor
 	debug_check_no_locks_freed((void *)rwsem, sizeof(*rwsem));
 	lockdep_init_map(&rwsem->dep_map, name, key, 0);
 #endif
-	rwsem->lock.save_state = 0;
+	rt_rw_mutex_init(&rwsem->rwlock);
+	rwsem->rwlock.mutex.save_state = 0;
 }
 EXPORT_SYMBOL(__rt_rwsem_init);
 
Index: linux-rt.git/include/linux/sched.h
===================================================================
--- linux-rt.git.orig/include/linux/sched.h
+++ linux-rt.git/include/linux/sched.h
@@ -993,6 +993,22 @@ struct sched_entity {
 #endif
 };
 
+#ifdef CONFIG_PREEMPT_RT_FULL
+struct rt_rw_mutex;
+/**
+ * reader_lock_struct
+ *
+ * @lock:	pointer to rwsem that is held
+ * @task:	pointer back to task, for lock code
+ * @list_head:	link into rt_rw_mutex owners list
+ */
+struct reader_lock_struct {
+	struct rt_rw_mutex		*lock;
+	struct task_struct		*task;
+	struct list_head		list;
+};
+#endif
+
 struct sched_rt_entity {
 	struct list_head run_list;
 	unsigned long timeout;
@@ -1224,6 +1240,10 @@ struct task_struct {
 #ifdef CONFIG_PREEMPT_RT_FULL
 	/* TODO: move me into ->restart_block ? */
 	struct siginfo forced_info;
+#define MAX_RWLOCK_DEPTH 5
+	int reader_lock_count;	/* index of last element in owned_read_locks */
+	int reader_lock_free;	/* index of next free element. */
+	struct reader_lock_struct owned_read_locks[MAX_RWLOCK_DEPTH];
 #endif
 
 	unsigned long sas_ss_sp;
Index: linux-rt.git/kernel/fork.c
===================================================================
--- linux-rt.git.orig/kernel/fork.c
+++ linux-rt.git/kernel/fork.c
@@ -1385,6 +1385,26 @@ static struct task_struct *copy_process(
 	if (retval)
 		goto bad_fork_cleanup_io;
 
+#ifdef CONFIG_PREEMPT_RT_FULL
+	p->reader_lock_count = 0;
+	p->reader_lock_free = 0;
+	/* Bracket to keep 'i' local */
+	{
+		int i;
+		/*
+		 * We could put the initialization of this list in
+		 * the grabbing of the lock, but it is safer to
+		 * do it now. The list head initialization may be
+		 * removed, but we'll keep it for now, just to be safe.
+		 */
+		for (i = 0; i < MAX_RWLOCK_DEPTH; i++) {
+			p->owned_read_locks[i].lock = NULL;
+			p->owned_read_locks[i].task = p;
+			INIT_LIST_HEAD(&p->owned_read_locks[i].list);
+		}
+	}
+#endif
+
 	if (pid != &init_struct_pid) {
 		retval = -ENOMEM;
 		pid = alloc_pid(p->nsproxy->pid_ns_for_children);
Index: linux-rt.git/kernel/sysctl.c
===================================================================
--- linux-rt.git.orig/kernel/sysctl.c
+++ linux-rt.git/kernel/sysctl.c
@@ -91,6 +91,10 @@
 #include <linux/nmi.h>
 #endif
 
+#ifdef CONFIG_PREEMPT_RT_FULL
+extern int rt_rw_limit;
+#endif
+
 
 #if defined(CONFIG_SYSCTL)
 
@@ -453,6 +457,15 @@ static struct ctl_table kern_table[] = {
 		.proc_handler	= proc_dointvec,
 	},
 #endif
+#ifdef CONFIG_PREEMPT_RT_FULL
+	{
+		.procname	= "rwsem_reader_limit",
+		.data		= &rt_rw_limit,
+		.maxlen		= sizeof(int),
+		.mode		= 0644,
+		.proc_handler	= &proc_dointvec,
+	},
+#endif
 	{
 		.procname	= "panic",
 		.data		= &panic_timeout,
Index: linux-rt.git/include/linux/rtmutex.h
===================================================================
--- linux-rt.git.orig/include/linux/rtmutex.h
+++ linux-rt.git/include/linux/rtmutex.h
@@ -42,6 +42,21 @@ struct rt_mutex {
 #endif
 };
 
+/**
+ * The rt_rw_mutex structure
+ *
+ * @rt_mutex:	The mutex to wait on
+ * @owners:	list of read owners of the mutex
+ * @nr_owners:	number of read owners
+ * @prio:	the priority of the highest waiter
+ */
+struct rt_rw_mutex {
+	struct rt_mutex		mutex;
+	struct list_head	owners;
+	int			nr_owners;
+	int			prio;
+};
+
 struct rt_mutex_waiter;
 struct hrtimer_sleeper;
 
@@ -75,6 +90,15 @@ struct hrtimer_sleeper;
 		__rt_mutex_init(mutex, #mutex);			\
 	} while (0)
 
+# define rt_rw_mutex_init(rwmutex)					\
+	do {								\
+		raw_spin_lock_init(&(rwmutex)->mutex.wait_lock);	\
+		INIT_LIST_HEAD(&(rwmutex)->owners);			\
+		(rwmutex)->nr_owners = 0;				\
+		(rwmutex)->prio = -1;					\
+		__rt_mutex_init(&(rwmutex)->mutex, #rwmutex);		\
+	} while (0)
+
 #define __RT_MUTEX_INITIALIZER_PLAIN(mutexname) \
 	.wait_lock = __RAW_SPIN_LOCK_UNLOCKED(mutexname.wait_lock) \
 	, .wait_list = PLIST_HEAD_INIT(mutexname.wait_list) \
@@ -85,6 +109,11 @@ struct hrtimer_sleeper;
 #define __RT_MUTEX_INITIALIZER(mutexname) \
 	{ __RT_MUTEX_INITIALIZER_PLAIN(mutexname) }
 
+#define __RT_RW_MUTEX_INITIALIZER(mutexname)			\
+	{ .owners = LIST_HEAD_INIT(mutexname.owners)		\
+	  , .prio = -1						\
+	  , .mutex = __RT_MUTEX_INITIALIZER(mutexname.mutex) }
+
 #define __RT_MUTEX_INITIALIZER_SAVE_STATE(mutexname) \
 	{ __RT_MUTEX_INITIALIZER_PLAIN(mutexname)    \
 	  , .save_state = 1 }
Index: linux-rt.git/include/linux/rwsem_rt.h
===================================================================
--- linux-rt.git.orig/include/linux/rwsem_rt.h
+++ linux-rt.git/include/linux/rwsem_rt.h
@@ -19,14 +19,14 @@
 #include <linux/rtmutex.h>
 
 struct rw_semaphore {
-	struct rt_mutex		lock;
+	struct rt_rw_mutex	rwlock;
 #ifdef CONFIG_DEBUG_LOCK_ALLOC
 	struct lockdep_map	dep_map;
 #endif
 };
 
 #define __RWSEM_INITIALIZER(name) \
-	{ .lock = __RT_MUTEX_INITIALIZER(name.lock), \
+	{ .rwlock = __RT_RW_MUTEX_INITIALIZER(name.rwlock), \
 	  RW_DEP_MAP_INIT(name) }
 
 #define DECLARE_RWSEM(lockname) \
@@ -37,7 +37,7 @@ extern void  __rt_rwsem_init(struct rw_s
 
 #define __rt_init_rwsem(sem, name, key)			\
 	do {						\
-		rt_mutex_init(&(sem)->lock);		\
+		rt_rw_mutex_init(&(sem)->rwlock);	\
 		__rt_rwsem_init((sem), (name), (key));\
 	} while (0)
 
@@ -63,7 +63,7 @@ extern void  rt_up_write(struct rw_semap
 extern void  rt_downgrade_write(struct rw_semaphore *rwsem);
 
 #define init_rwsem(sem)		rt_init_rwsem(sem)
-#define rwsem_is_locked(s)	rt_mutex_is_locked(&(s)->lock)
+#define rwsem_is_locked(s)	rt_mutex_is_locked(&(s)->rwlock.mutex)
 
 static inline void down_read(struct rw_semaphore *sem)
 {
Index: linux-rt.git/kernel/futex.c
===================================================================
--- linux-rt.git.orig/kernel/futex.c
+++ linux-rt.git/kernel/futex.c
@@ -2327,7 +2327,7 @@ static int futex_wait_requeue_pi(u32 __u
 	 * The waiter is allocated on our stack, manipulated by the requeue
 	 * code while we sleep on uaddr.
 	 */
-	rt_mutex_init_waiter(&rt_waiter, false);
+	rt_mutex_init_waiter(&rt_waiter, false, false);
 
 	ret = get_futex_key(uaddr2, flags & FLAGS_SHARED, &key2, VERIFY_WRITE);
 	if (unlikely(ret != 0))

^ permalink raw reply	[flat|nested] 28+ messages in thread

* Re: [RFC PATCH RT] rwsem: The return of multi-reader PI rwsems
  2014-04-09 19:19 [RFC PATCH RT] rwsem: The return of multi-reader PI rwsems Steven Rostedt
@ 2014-04-10 14:18 ` Mike Galbraith
  2014-04-10 14:28   ` Mike Galbraith
  2014-04-10 14:44 ` Clark Williams
                   ` (2 subsequent siblings)
  3 siblings, 1 reply; 28+ messages in thread
From: Mike Galbraith @ 2014-04-10 14:18 UTC (permalink / raw)
  To: Steven Rostedt
  Cc: LKML, linux-rt-users, Paul E. McKenney, Paul Gortmaker,
	Thomas Gleixner, Sebastian Andrzej Siewior, Clark Williams,
	Frederic Weisbecker, Peter Zijlstra, Ingo Molnar

On Wed, 2014-04-09 at 15:19 -0400, Steven Rostedt wrote:

> If you have any benchmark on large machines I would be very happy if
> you could test this patch against the unpatched version of -rt.

Too bad I don't have (and know how to use) specjbb.

I dug up old vmark, thinking I'd be able to get some halfway useful
relative numbers from it, but that was a waste of a day.  The thing
performs so badly on 40 core box that rt _beats_ nopreempt, and after 2
nodes, you're going backward.  40 Westmere EX cores does a whopping ~2.5
* dinky old Q6600 box throughput.

-Mike


^ permalink raw reply	[flat|nested] 28+ messages in thread

* Re: [RFC PATCH RT] rwsem: The return of multi-reader PI rwsems
  2014-04-10 14:18 ` Mike Galbraith
@ 2014-04-10 14:28   ` Mike Galbraith
  2014-04-10 14:32     ` Steven Rostedt
  2014-04-11  2:50     ` Mike Galbraith
  0 siblings, 2 replies; 28+ messages in thread
From: Mike Galbraith @ 2014-04-10 14:28 UTC (permalink / raw)
  To: Steven Rostedt
  Cc: LKML, linux-rt-users, Paul E. McKenney, Paul Gortmaker,
	Thomas Gleixner, Sebastian Andrzej Siewior, Clark Williams,
	Frederic Weisbecker, Peter Zijlstra, Ingo Molnar

On Thu, 2014-04-10 at 16:18 +0200, Mike Galbraith wrote: 
> On Wed, 2014-04-09 at 15:19 -0400, Steven Rostedt wrote:
> 
> > If you have any benchmark on large machines I would be very happy if
> > you could test this patch against the unpatched version of -rt.
> 
> Too bad I don't have (and know how to use) specjbb.
> 
> I dug up old vmark, thinking I'd be able to get some halfway useful
> relative numbers from it, but that was a waste of a day.  The thing
> performs so badly on 40 core box that rt _beats_ nopreempt, and after 2
> nodes, you're going backward.  40 Westmere EX cores does a whopping ~2.5
> * dinky old Q6600 box throughput.

P.S.  What I didn't see was any sign of a delta patched/unpatched..
which may mean nothing at all :-/


^ permalink raw reply	[flat|nested] 28+ messages in thread

* Re: [RFC PATCH RT] rwsem: The return of multi-reader PI rwsems
  2014-04-10 14:28   ` Mike Galbraith
@ 2014-04-10 14:32     ` Steven Rostedt
  2014-04-11  2:50     ` Mike Galbraith
  1 sibling, 0 replies; 28+ messages in thread
From: Steven Rostedt @ 2014-04-10 14:32 UTC (permalink / raw)
  To: Mike Galbraith
  Cc: LKML, linux-rt-users, Paul E. McKenney, Paul Gortmaker,
	Thomas Gleixner, Sebastian Andrzej Siewior, Clark Williams,
	Frederic Weisbecker, Peter Zijlstra, Ingo Molnar

On Thu, 10 Apr 2014 16:28:43 +0200
Mike Galbraith <umgwanakikbuti@gmail.com> wrote:

> On Thu, 2014-04-10 at 16:18 +0200, Mike Galbraith wrote: 
> > On Wed, 2014-04-09 at 15:19 -0400, Steven Rostedt wrote:
> > 
> > > If you have any benchmark on large machines I would be very happy if
> > > you could test this patch against the unpatched version of -rt.
> > 
> > Too bad I don't have (and know how to use) specjbb.
> > 
> > I dug up old vmark, thinking I'd be able to get some halfway useful
> > relative numbers from it, but that was a waste of a day.  The thing
> > performs so badly on 40 core box that rt _beats_ nopreempt, and after 2
> > nodes, you're going backward.  40 Westmere EX cores does a whopping ~2.5
> > * dinky old Q6600 box throughput.
> 
> P.S.  What I didn't see was any sign of a delta patched/unpatched..
> which may mean nothing at all :-/

Actually that at least means it didn't cause any regressions for you.

-- Steve

^ permalink raw reply	[flat|nested] 28+ messages in thread

* Re: [RFC PATCH RT] rwsem: The return of multi-reader PI rwsems
  2014-04-09 19:19 [RFC PATCH RT] rwsem: The return of multi-reader PI rwsems Steven Rostedt
  2014-04-10 14:18 ` Mike Galbraith
@ 2014-04-10 14:44 ` Clark Williams
  2014-04-10 15:01   ` Steven Rostedt
                     ` (2 more replies)
  2014-04-10 17:42 ` [RFC PATCH RT V2] " Steven Rostedt
  2014-04-14  9:55 ` [RFC PATCH RT] " Ingo Molnar
  3 siblings, 3 replies; 28+ messages in thread
From: Clark Williams @ 2014-04-10 14:44 UTC (permalink / raw)
  To: Steven Rostedt
  Cc: LKML, linux-rt-users, Mike Galbraith, Paul E. McKenney,
	Paul Gortmaker, Thomas Gleixner, Sebastian Andrzej Siewior,
	Frederic Weisbecker, Peter Zijlstra, Ingo Molnar

[-- Attachment #1: Type: text/plain, Size: 2069 bytes --]

On Wed, 9 Apr 2014 15:19:22 -0400
Steven Rostedt <rostedt@goodmis.org> wrote:

 
> This patch is built on top of the two other patches that I posted
> earlier, which should not be as controversial.
> 
> If you have any benchmark on large machines I would be very happy if
> you could test this patch against the unpatched version of -rt.
> 
> Cheers,
> 
> -- Steve
>

Steven

I wrote a program named whack_mmap_sem which creates a large (4GB)
buffer, then creates 2 x ncpus threads that are affined across all the
available cpus. These threads then randomly write into the buffer,
which should cause page faults galore.

I then built the following kernel configs:

  vanilla-3.13.15  - no RT patches applied
  rt-3.12.15       - PREEMPT_RT patchset
  rt-3.12.15-fixes - PREEMPT_RT + rwsem fixes
  rt-3.12.15-multi - PREEMPT_RT + rwsem fixes + rwsem-multi patch

My test h/w was a Dell R520 with a 6-core Intel(R) Xeon(R) CPU E5-2430
0 @ 2.20GHz (hyperthreaded). So whack_mmap_sem created 24 threads
which all partied in the 4GB address range.

I ran whack_mmap_sem with the argument -w 100000 which means each
thread does 100k writes to random locations inside the buffer and then
did five runs per each kernel. At the end of the run whack_mmap_sem
prints out the time of the run in microseconds.

The means of each group of five test runs are:

  vanilla.log:  1210117
       rt.log:  17210953 (14.2 x slower than vanilla)
 rt-fixes.log:  10062027 (8.3 x slower than vanilla)
 rt-multi.log:  3179582  (2.x x slower than vanilla)


As expected, vanilla kicked RT's butt when hammering on the
mmap_sem. But somewhat unexpectedly, your fixups helped quite a bit
and the multi+fixups got RT back into being almost respectable.

Obviously these are just preliminary results on one piece of h/w but
it looks promising.

Clark

P.S. If you want the source to whack_mmap_sem you can grab it from 
http://people.redhat.com/williams/whack_mmap_sem. If it looks like its
worth keeping around I can add it to rt-tests. 


[-- Attachment #2: signature.asc --]
[-- Type: application/pgp-signature, Size: 198 bytes --]

^ permalink raw reply	[flat|nested] 28+ messages in thread

* Re: [RFC PATCH RT] rwsem: The return of multi-reader PI rwsems
  2014-04-10 14:44 ` Clark Williams
@ 2014-04-10 15:01   ` Steven Rostedt
  2014-04-10 15:03   ` Sebastian Andrzej Siewior
  2014-04-11 21:39   ` Daniel Bristot de Oliveira
  2 siblings, 0 replies; 28+ messages in thread
From: Steven Rostedt @ 2014-04-10 15:01 UTC (permalink / raw)
  To: Clark Williams
  Cc: LKML, linux-rt-users, Mike Galbraith, Paul E. McKenney,
	Paul Gortmaker, Thomas Gleixner, Sebastian Andrzej Siewior,
	Frederic Weisbecker, Peter Zijlstra, Ingo Molnar

On Thu, 10 Apr 2014 09:44:30 -0500
Clark Williams <williams@redhat.com> wrote:

> I wrote a program named whack_mmap_sem which creates a large (4GB)
> buffer, then creates 2 x ncpus threads that are affined across all the
> available cpus. These threads then randomly write into the buffer,
> which should cause page faults galore.
> 
> I then built the following kernel configs:
> 
>   vanilla-3.13.15  - no RT patches applied

 vanilla-3.*12*.15?

>   rt-3.12.15       - PREEMPT_RT patchset
>   rt-3.12.15-fixes - PREEMPT_RT + rwsem fixes
>   rt-3.12.15-multi - PREEMPT_RT + rwsem fixes + rwsem-multi patch
> 
> My test h/w was a Dell R520 with a 6-core Intel(R) Xeon(R) CPU E5-2430
> 0 @ 2.20GHz (hyperthreaded). So whack_mmap_sem created 24 threads
> which all partied in the 4GB address range.
> 
> I ran whack_mmap_sem with the argument -w 100000 which means each
> thread does 100k writes to random locations inside the buffer and then
> did five runs per each kernel. At the end of the run whack_mmap_sem
> prints out the time of the run in microseconds.
> 
> The means of each group of five test runs are:
> 
>   vanilla.log:  1210117
>        rt.log:  17210953 (14.2 x slower than vanilla)
>  rt-fixes.log:  10062027 (8.3 x slower than vanilla)
>  rt-multi.log:  3179582  (2.x x slower than vanilla)
> 
> 
> As expected, vanilla kicked RT's butt when hammering on the
> mmap_sem. But somewhat unexpectedly, your fixups helped quite a bit

That doesn't surprise me too much. As I removed the check for the
nesting, which also shrunk the size of the rwsem itself (removed the
read_depth from the struct). This itself can give a bonus boost.

Now the question is, how much will this affect real use case scenarios?

-- Steve


> and the multi+fixups got RT back into being almost respectable.
> 

^ permalink raw reply	[flat|nested] 28+ messages in thread

* Re: [RFC PATCH RT] rwsem: The return of multi-reader PI rwsems
  2014-04-10 14:44 ` Clark Williams
  2014-04-10 15:01   ` Steven Rostedt
@ 2014-04-10 15:03   ` Sebastian Andrzej Siewior
  2014-04-10 15:36     ` Peter Zijlstra
  2014-04-10 15:38     ` Steven Rostedt
  2014-04-11 21:39   ` Daniel Bristot de Oliveira
  2 siblings, 2 replies; 28+ messages in thread
From: Sebastian Andrzej Siewior @ 2014-04-10 15:03 UTC (permalink / raw)
  To: Clark Williams, Steven Rostedt
  Cc: LKML, linux-rt-users, Mike Galbraith, Paul E. McKenney,
	Paul Gortmaker, Thomas Gleixner, Frederic Weisbecker,
	Peter Zijlstra, Ingo Molnar

On 04/10/2014 04:44 PM, Clark Williams wrote:
> The means of each group of five test runs are:
> 
> vanilla.log:  1210117 rt.log:  17210953 (14.2 x slower than
> vanilla) rt-fixes.log:  10062027 (8.3 x slower than vanilla) 
> rt-multi.log:  3179582  (2.x x slower than vanilla)
> 
> 
> As expected, vanilla kicked RT's butt when hammering on the 
> mmap_sem. But somewhat unexpectedly, your fixups helped quite a
> bit and the multi+fixups got RT back into being almost
> respectable.
> 
> Obviously these are just preliminary results on one piece of h/w
> but it looks promising.

Is it easy to look at the latency when you have multiple readers and
and a high prio writer which has to boost all those readers away
instead just one?
Or is this something that should not happen for a high prio RT task
because it has all memory already allocated?

> 
> Clark

Sebastian

^ permalink raw reply	[flat|nested] 28+ messages in thread

* Re: [RFC PATCH RT] rwsem: The return of multi-reader PI rwsems
  2014-04-10 15:03   ` Sebastian Andrzej Siewior
@ 2014-04-10 15:36     ` Peter Zijlstra
  2014-04-10 19:17       ` Steven Rostedt
  2014-04-10 15:38     ` Steven Rostedt
  1 sibling, 1 reply; 28+ messages in thread
From: Peter Zijlstra @ 2014-04-10 15:36 UTC (permalink / raw)
  To: Sebastian Andrzej Siewior
  Cc: Clark Williams, Steven Rostedt, LKML, linux-rt-users,
	Mike Galbraith, Paul E. McKenney, Paul Gortmaker,
	Thomas Gleixner, Frederic Weisbecker, Ingo Molnar

On Thu, Apr 10, 2014 at 05:03:36PM +0200, Sebastian Andrzej Siewior wrote:
> On 04/10/2014 04:44 PM, Clark Williams wrote:
> > The means of each group of five test runs are:
> > 
> > vanilla.log:  1210117 rt.log:  17210953 (14.2 x slower than
> > vanilla) rt-fixes.log:  10062027 (8.3 x slower than vanilla) 
> > rt-multi.log:  3179582  (2.x x slower than vanilla)
> > 
> > 
> > As expected, vanilla kicked RT's butt when hammering on the 
> > mmap_sem. But somewhat unexpectedly, your fixups helped quite a
> > bit and the multi+fixups got RT back into being almost
> > respectable.
> > 
> > Obviously these are just preliminary results on one piece of h/w
> > but it looks promising.
> 
> Is it easy to look at the latency when you have multiple readers and
> and a high prio writer which has to boost all those readers away
> instead just one?
> Or is this something that should not happen for a high prio RT task
> because it has all memory already allocated?

With care it should not happen; it should be relatively straight forward
to avoid all system calls that take mmap_sem for writing.

But yes, the total latency is a concern, that said, that is the very
reason there is a hard limit to the reader concurrency and why this
limit is a tunable.

It defaults to the total number of CPUs in the system, given the default
setup (all CPUs in a single balance domain), this should result in all
CPUs working concurrently on the boosted read sides.

So while there is always some overhead, the worst case should not be
nr_readers * read-hold-time.

Although, with more (unrelated) higher prio threads you can indeed wreck
this. Similarly by partitioning the system and not adjusting the max
reader you also get into trouble.

But then, the above nr_readers * read-hold-time is still an upper bound,
and the entire thing does stay deterministic.

^ permalink raw reply	[flat|nested] 28+ messages in thread

* Re: [RFC PATCH RT] rwsem: The return of multi-reader PI rwsems
  2014-04-10 15:03   ` Sebastian Andrzej Siewior
  2014-04-10 15:36     ` Peter Zijlstra
@ 2014-04-10 15:38     ` Steven Rostedt
  1 sibling, 0 replies; 28+ messages in thread
From: Steven Rostedt @ 2014-04-10 15:38 UTC (permalink / raw)
  To: Sebastian Andrzej Siewior
  Cc: Clark Williams, LKML, linux-rt-users, Mike Galbraith,
	Paul E. McKenney, Paul Gortmaker, Thomas Gleixner,
	Frederic Weisbecker, Peter Zijlstra, Ingo Molnar

On Thu, 10 Apr 2014 17:03:36 +0200
Sebastian Andrzej Siewior <bigeasy@linutronix.de> wrote:

> On 04/10/2014 04:44 PM, Clark Williams wrote:
> > The means of each group of five test runs are:
> > 
> > vanilla.log:  1210117 rt.log:  17210953 (14.2 x slower than
> > vanilla) rt-fixes.log:  10062027 (8.3 x slower than vanilla) 
> > rt-multi.log:  3179582  (2.x x slower than vanilla)
> > 
> > 
> > As expected, vanilla kicked RT's butt when hammering on the 
> > mmap_sem. But somewhat unexpectedly, your fixups helped quite a
> > bit and the multi+fixups got RT back into being almost
> > respectable.
> > 
> > Obviously these are just preliminary results on one piece of h/w
> > but it looks promising.
> 
> Is it easy to look at the latency when you have multiple readers and
> and a high prio writer which has to boost all those readers away
> instead just one?
> Or is this something that should not happen for a high prio RT task
> because it has all memory already allocated?

Note, the patch includes a /proc/sys/kernel/rwsem_reader_limit that is
default value set at possible_cpus. The user can change it to any
number. A number of zero means unlimited, and a number of 1 makes it
work like it did without the patch.

This means that a writer will only have to wait for rwsem_reader_limit
readers, which may give a longer latency, but it is still bounded. Also
note that the rwsem is not fair respect to first come first served, but
priority driven. Same priority tasks may be first in first out, but if
there's a writer waiting and a higher priority reader comes along, it
can still get the lock, making the writer wait longer. But as the
reader is higher priority than the writer, that is expected. The same
priority reader will block if there's a writer waiting.

Clark, it would be interesting if you run the test again with my
patches but set rwsem_reader_limit to 1, and see what the impact of
that is.

-- Steve

^ permalink raw reply	[flat|nested] 28+ messages in thread

* [RFC PATCH RT V2] rwsem: The return of multi-reader PI rwsems
  2014-04-09 19:19 [RFC PATCH RT] rwsem: The return of multi-reader PI rwsems Steven Rostedt
  2014-04-10 14:18 ` Mike Galbraith
  2014-04-10 14:44 ` Clark Williams
@ 2014-04-10 17:42 ` Steven Rostedt
  2014-04-11  2:35   ` [RFC PATCH RT V3] " Steven Rostedt
  2014-04-14  9:55 ` [RFC PATCH RT] " Ingo Molnar
  3 siblings, 1 reply; 28+ messages in thread
From: Steven Rostedt @ 2014-04-10 17:42 UTC (permalink / raw)
  To: LKML, linux-rt-users
  Cc: Mike Galbraith, Paul E. McKenney, Paul Gortmaker,
	Thomas Gleixner, Sebastian Andrzej Siewior, Clark Williams,
	Frederic Weisbecker, Peter Zijlstra, Ingo Molnar

OK, I added rt_rw_limit == 0 being unlimited as the last change and
never tested it. Clark did, and found that it locked up the system.
That's because it zero wasn't properly checked at all locations and in
some cases it could never grab the lock for read.

I fixed that with this patch. And tested it too (that's a bonus).

Signed-off-by: Steven Rostedt <rostedt@goodmis.org>

Index: linux-rt.git/kernel/rtmutex.c
===================================================================
--- linux-rt.git.orig/kernel/rtmutex.c
+++ linux-rt.git/kernel/rtmutex.c
@@ -26,6 +26,21 @@
 #include "rtmutex_common.h"
 
 /*
+ * rt_rw_limit is the number of simultaneous readers of a rwsem lock.
+ *
+ * rt_rw_limit gets updated on boot up to the number of
+ * possible CPUs, but we need to initialize it to something other
+ * than zero.
+ */
+unsigned rt_rw_limit = NR_CPUS;
+
+/* cnt == 0 means unlimited */
+static inline int under_rt_rw_limit(int cnt)
+{
+	return !cnt || cnt < rt_rw_limit;
+}
+
+/*
  * lock->owner state tracking:
  *
  * lock->owner holds the task_struct pointer of the owner. Bit 0
@@ -110,19 +125,44 @@ static inline void init_lists(struct rt_
 		plist_head_init(&lock->wait_list);
 }
 
+static inline void init_rw_lists(struct rt_rw_mutex *rwlock)
+{
+	struct rt_mutex *lock = &rwlock->mutex;
+
+	/*
+	 * A rwsem priority is initialized to -1 and never will
+	 * be that again.
+	 */
+	if (unlikely(rwlock->prio < 0)) {
+		rwlock->prio = MAX_PRIO;
+		init_lists(lock);
+	}
+}
+
+static int rt_mutex_get_readers_prio(struct task_struct *task, int prio);
+static inline int task_has_reader_locks(struct task_struct *task);
+
 /*
  * Calculate task priority from the waiter list priority
  *
  * Return task->normal_prio when the waiter list is empty or when
  * the waiter is not allowed to do priority boosting
+ *
+ * On PREEMPT_RT, we also check the priorities of the list
+ * of read locks that the task holds.
  */
 int rt_mutex_getprio(struct task_struct *task)
 {
-	if (likely(!task_has_pi_waiters(task)))
-		return task->normal_prio;
+	int prio = task->normal_prio;
+
+	if (likely(!task_has_pi_waiters(task) &&
+		   !task_has_reader_locks(task)))
+		return prio;
 
-	return min(task_top_pi_waiter(task)->pi_list_entry.prio,
-		   task->normal_prio);
+	if (task_has_reader_locks(task))
+		prio = rt_mutex_get_readers_prio(task, prio);
+
+	return min(task_top_pi_waiter(task)->pi_list_entry.prio, prio);
 }
 
 /*
@@ -181,6 +221,11 @@ static void rt_mutex_wake_waiter(struct
  */
 int max_lock_depth = 1024;
 
+static int rt_mutex_adjust_readers(struct rt_mutex *orig_lock,
+				   struct rt_mutex_waiter *orig_waiter,
+				   struct task_struct *top_task,
+				   struct rt_mutex *lock,
+				   int recursion_depth);
 /*
  * Adjust the priority chain. Also used for deadlock detection.
  * Decreases task's usage by one - may thus free the task.
@@ -203,7 +248,8 @@ static int rt_mutex_adjust_prio_chain(st
 				      int deadlock_detect,
 				      struct rt_mutex *orig_lock,
 				      struct rt_mutex_waiter *orig_waiter,
-				      struct task_struct *top_task)
+				      struct task_struct *top_task,
+				      int recursion_depth)
 {
 	struct rt_mutex *lock;
 	struct rt_mutex_waiter *waiter, *top_waiter = orig_waiter;
@@ -316,6 +362,18 @@ static int rt_mutex_adjust_prio_chain(st
 
 	/* Grab the next task */
 	task = rt_mutex_owner(lock);
+
+	/*
+	 * Readers are special. We may need to boost more than one owner.
+	 */
+	if (unlikely(task == RT_RW_READER)) {
+		ret = rt_mutex_adjust_readers(orig_lock, orig_waiter,
+					      top_task, lock,
+					      recursion_depth);
+		raw_spin_unlock(&lock->wait_lock);
+		goto out;
+	}
+
 	get_task_struct(task);
 	raw_spin_lock_irqsave(&task->pi_lock, flags);
 
@@ -349,7 +407,7 @@ static int rt_mutex_adjust_prio_chain(st
 	raw_spin_unlock_irqrestore(&task->pi_lock, flags);
  out_put_task:
 	put_task_struct(task);
-
+ out:
 	return ret;
 }
 
@@ -518,6 +576,13 @@ static int task_blocks_on_rt_mutex(struc
 		return 0;
 
 	if (waiter == rt_mutex_top_waiter(lock)) {
+		/* readers are handled differently */
+		if (unlikely(owner == RT_RW_READER)) {
+			res = rt_mutex_adjust_readers(lock, waiter,
+						      current, lock, 0);
+			return res;
+		}
+
 		raw_spin_lock_irqsave(&owner->pi_lock, flags);
 		plist_del(&top_waiter->pi_list_entry, &owner->pi_waiters);
 		plist_add(&waiter->pi_list_entry, &owner->pi_waiters);
@@ -527,7 +592,8 @@ static int task_blocks_on_rt_mutex(struc
 			chain_walk = 1;
 		raw_spin_unlock_irqrestore(&owner->pi_lock, flags);
 	}
-	else if (debug_rt_mutex_detect_deadlock(waiter, detect_deadlock))
+	else if (debug_rt_mutex_detect_deadlock(waiter, detect_deadlock) &&
+		 owner != RT_RW_READER)
 		chain_walk = 1;
 
 	if (!chain_walk)
@@ -543,7 +609,7 @@ static int task_blocks_on_rt_mutex(struc
 	raw_spin_unlock(&lock->wait_lock);
 
 	res = rt_mutex_adjust_prio_chain(owner, detect_deadlock, lock, waiter,
-					 task);
+					 task, 0);
 
 	raw_spin_lock(&lock->wait_lock);
 
@@ -633,7 +699,7 @@ static void remove_waiter(struct rt_mute
 
 	raw_spin_unlock(&lock->wait_lock);
 
-	rt_mutex_adjust_prio_chain(owner, 0, lock, NULL, current);
+	rt_mutex_adjust_prio_chain(owner, 0, lock, NULL, current, 0);
 
 	raw_spin_lock(&lock->wait_lock);
 }
@@ -660,7 +726,7 @@ void rt_mutex_adjust_pi(struct task_stru
 	/* gets dropped in rt_mutex_adjust_prio_chain()! */
 	get_task_struct(task);
 	raw_spin_unlock_irqrestore(&task->pi_lock, flags);
-	rt_mutex_adjust_prio_chain(task, 0, NULL, NULL, task);
+	rt_mutex_adjust_prio_chain(task, 0, NULL, NULL, task, 0);
 }
 
 #ifdef CONFIG_PREEMPT_RT_FULL
@@ -739,7 +805,7 @@ static void  noinline __sched rt_spin_lo
 	struct rt_mutex_waiter waiter, *top_waiter;
 	int ret;
 
-	rt_mutex_init_waiter(&waiter, true);
+	rt_mutex_init_waiter(&waiter, true, false);
 
 	raw_spin_lock(&lock->wait_lock);
 	init_lists(lock);
@@ -1001,15 +1067,564 @@ __mutex_lock_check_stamp(struct rt_mutex
 
 	return 0;
 }
+
+/*
+ * Wake up the next waiter on a rw lock.
+ *
+ * Similar to wakeup_next_waiter() but the waiter is not on
+ * the owner's pi_waiters. Also, it does not reset the lock
+ * owner.
+ *
+ * Called with lock->wait_lock held.
+ */
+static void wakeup_next_rw_waiter(struct rt_mutex *lock)
+{
+	struct rt_mutex_waiter *waiter;
+
+	waiter = rt_mutex_top_waiter(lock);
+	rt_mutex_wake_waiter(waiter);
+}
+
+/* Called with rwmutex->mutex.wait_lock held */
+static inline void
+rt_rw_mutex_take_as_reader(struct task_struct *task,
+			   struct rt_rw_mutex *rwmutex,
+			   struct rt_mutex_waiter *waiter)
+{
+	struct reader_lock_struct *rls;
+	struct rt_mutex *mutex = &rwmutex->mutex;
+	unsigned long flags;
+	bool wakeup = false;
+
+	rwmutex->nr_owners++;
+	rt_mutex_set_owner(&rwmutex->mutex, RT_RW_READER);
+
+	if (waiter || rt_mutex_has_waiters(mutex)) {
+		unsigned long flags;
+		struct rt_mutex_waiter *top;
+
+		raw_spin_lock_irqsave(&task->pi_lock, flags);
+
+		/* remove the queued waiter. */
+		if (waiter) {
+			plist_del(&waiter->list_entry, &mutex->wait_list);
+			task->pi_blocked_on = NULL;
+		}
+
+		/*
+		 * Initialize the rwmutex prio to the priority of
+		 * the top waiter.
+		 */
+		if (rt_mutex_has_waiters(mutex)) {
+			top = rt_mutex_top_waiter(mutex);
+			top->pi_list_entry.prio = top->list_entry.prio;
+			/*
+			 * Readers set the lock priority for faster access
+			 * to read the priorities of the locks it owns
+			 * when boosting. This helps to not have to take
+			 * the pi_lock of the task. The rwmutex->prio
+			 * is protected by the rwmutex->mutex.wait_lock,
+			 * which is held during boosting.
+			 */
+			rwmutex->prio = top->list_entry.prio;
+
+			/*
+			 * If this waiter is a reader, and the reader limit
+			 * has not been hit, then we can wake this waiter
+			 * up too.
+			 */
+			if (!top->writer && under_rt_rw_limit(rwmutex->nr_owners))
+				wakeup = true;
+		} else
+			rwmutex->prio = MAX_PRIO;
+		raw_spin_unlock_irqrestore(&task->pi_lock, flags);
+
+	} else
+		rwmutex->prio = MAX_PRIO;
+
+	/*
+	 * It is possible to have holes in the owned_read_locks array.
+	 * If we take read lock A and then B, but then release A, we
+	 * can't move the pointer of B because if something blocks on
+	 * B, it can use the B pointer to boost this task. B can only
+	 * be moved, by owning the wait_list lock of B. Remember, the
+	 * B lock has its pointers to that index of our array.
+	 */
+	rls = &task->owned_read_locks[task->reader_lock_free];
+	BUG_ON(rls->lock);
+
+	/*
+	 * Grabing the pi_lock here as other tasks can boost this
+	 * lock via other held locks, and if free lock descriptor
+	 * was not at the end of the owned_read_locks array, then
+	 * it might get confused if it sees this descriptor with
+	 * a lock and no task.
+	 */
+	raw_spin_lock_irqsave(&task->pi_lock, flags);
+	rls->lock = rwmutex;
+	rls->task = task;
+	list_add(&rls->list, &rwmutex->owners);
+	raw_spin_unlock_irqrestore(&task->pi_lock, flags);
+
+	if (task->reader_lock_free == task->reader_lock_count) {
+		/*
+		 * If we nest too deep, then this task can never get the lock.
+		 * This task will then block for good. Warn about this and
+		 * hopefully, people will notice the warning. If not, they will notice
+		 * the dead task. Maybe this should be a BUG_ON()
+		 */
+		if (WARN_ON(task->reader_lock_count == MAX_RWLOCK_DEPTH))
+			return;
+
+		/*
+		 * We don't need to take the pi_lock here as we have the
+		 * wait_lock of the lock at the end of the list. If this task
+		 * was being boosted by a task blocked on this lock, it would
+		 * need to grab the wait_lock before boosting this task.
+		 */
+		task->reader_lock_count++;
+		task->reader_lock_free++;
+	} else {
+		/*
+		 * Find the next free lock in array. Again, we do not need
+		 * to grab the pi_lock because the boosting doesn't use
+		 * the reader_lock_free variable, which is the only thing
+		 * we are updating here.
+		 */
+		do {
+			rls = &task->owned_read_locks[++task->reader_lock_free];
+		} while (rls->lock &&
+			 task->reader_lock_free < task->reader_lock_count);
+	}
+
+	if (wakeup)
+		wakeup_next_rw_waiter(&rwmutex->mutex);
+}
+
+static int try_to_take_rw_read(struct rt_rw_mutex *rwmutex,
+			       struct rt_mutex_waiter *waiter)
+{
+	struct task_struct *task = current;
+	struct rt_mutex *mutex = &rwmutex->mutex;
+	struct task_struct *owner;
+
+	assert_raw_spin_locked(&mutex->wait_lock);
+
+	/* The writer unlock can use the fast path */
+	mark_rt_mutex_waiters(mutex);
+
+	/* If we are at the reader limit, we can't take the lock */
+	if (unlikely(!under_rt_rw_limit(rwmutex->nr_owners)))
+		return 0;
+
+	/*
+	 * If there's no waiters, and there is no owner or
+	 * the owner is a reader, we get the lock.
+	 * Note, if the waiter or pending bits are set in the owner
+	 * then we need to do more checks.
+	 */
+	if (likely(!mutex->owner || mutex->owner == RT_RW_READER))
+		goto taken;
+
+	owner = rt_mutex_owner(mutex);
+
+	/* If the lock is owned by a task, then it is a writer */
+	if (owner && owner != RT_RW_READER)
+		return 0;
+
+	/*
+	 * A writer or a reader may be waiting. In either case, we
+	 * may still be able to steal the lock. The RT rwsems are
+	 * not fair if the new reader comes in and is higher priority
+	 * than all the waiters.
+	 */
+	if (likely(rt_mutex_has_waiters(mutex))) {
+		struct task_struct *pown = rt_mutex_top_waiter(mutex)->task;
+
+		if (task != pown && !lock_is_stealable(task, pown, STEAL_NORMAL))
+			return 0;
+	}
+
+ taken:
+	rt_rw_mutex_take_as_reader(task, rwmutex, waiter);
+	return 1;
+}
+
+void rt_rw_mutex_read_lock(struct rt_rw_mutex *rwmutex)
+{
+	struct rt_mutex_waiter waiter;
+	struct rt_mutex *mutex = &rwmutex->mutex;
+	int ret;
+
+	raw_spin_lock(&mutex->wait_lock);
+	init_rw_lists(rwmutex);
+
+	if (likely(try_to_take_rw_read(rwmutex, NULL))) {
+		/* Got the lock */
+		raw_spin_unlock(&mutex->wait_lock);
+		return;
+	}
+
+	rt_mutex_init_waiter(&waiter, false, false);
+
+	set_current_state(TASK_UNINTERRUPTIBLE);
+
+	ret = task_blocks_on_rt_mutex(mutex, &waiter, current, 0);
+	BUG_ON(ret);
+
+	for (;;) {
+		/* Try to acquire the lock: */
+		if (try_to_take_rw_read(rwmutex, &waiter))
+			break;
+
+		raw_spin_unlock(&mutex->wait_lock);
+
+		debug_rt_mutex_print_deadlock(waiter);
+
+		schedule_rt_mutex(mutex);
+
+		raw_spin_lock(&mutex->wait_lock);
+		set_current_state(TASK_UNINTERRUPTIBLE);
+	}
+
+	set_current_state(TASK_RUNNING);
+
+	raw_spin_unlock(&mutex->wait_lock);
+
+	debug_rt_mutex_free_waiter(&waiter);
+}
+
+int rt_rw_mutex_read_trylock(struct rt_rw_mutex *rwmutex)
+{
+	struct rt_mutex *mutex = &rwmutex->mutex;
+	int ret = 0;
+
+	if (!raw_spin_trylock(&mutex->wait_lock))
+		return ret;
+
+	init_rw_lists(rwmutex);
+
+	if (try_to_take_rw_read(rwmutex, NULL))
+		ret = 1;
+
+	raw_spin_unlock(&mutex->wait_lock);
+
+	return ret;
+}
+
+static int
+try_to_take_rw_write(struct rt_rw_mutex *rwmutex, struct rt_mutex_waiter *waiter)
+{
+	struct rt_mutex *mutex = &rwmutex->mutex;
+
+	/* Writers block if there's any readers owning the lock */
+	if (rwmutex->nr_owners)
+		return 0;
+
+	/* No readers, then we can try to take the mutex normally. */
+	return try_to_take_rt_mutex(mutex, current, waiter);
+}
+
+void rt_rw_mutex_write_lock(struct rt_rw_mutex *rwmutex)
+{
+	struct rt_mutex *mutex = &rwmutex->mutex;
+	struct rt_mutex_waiter waiter;
+	struct task_struct *task = current;
+	int ret;
+
+	raw_spin_lock(&mutex->wait_lock);
+	init_rw_lists(rwmutex);
+
+	if (try_to_take_rw_write(rwmutex, NULL)) {
+		raw_spin_unlock(&mutex->wait_lock);
+		return;
+	}
+
+	/* Writers wake up differently than readers (flag it) */
+	rt_mutex_init_waiter(&waiter, false, true);
+
+	ret = task_blocks_on_rt_mutex(mutex, &waiter, task, 0);
+	BUG_ON(ret);
+
+	set_current_state(TASK_UNINTERRUPTIBLE);
+
+	for (;;) {
+		/* Try to acquire the lock: */
+		if (try_to_take_rw_write(rwmutex, &waiter))
+			break;
+
+		raw_spin_unlock(&mutex->wait_lock);
+
+		debug_rt_mutex_print_deadlock(waiter);
+
+		schedule_rt_mutex(mutex);
+
+		raw_spin_lock(&mutex->wait_lock);
+		set_current_state(TASK_UNINTERRUPTIBLE);
+	}
+
+	set_current_state(TASK_RUNNING);
+
+	raw_spin_unlock(&mutex->wait_lock);
+	WARN_ONCE(rwmutex->nr_owners, "Writer has lock with readers");
+
+	debug_rt_mutex_free_waiter(&waiter);
+}
+
+int rt_rw_mutex_write_trylock(struct rt_rw_mutex *rwmutex)
+{
+	struct rt_mutex *mutex = &rwmutex->mutex;
+	int ret = 0;
+
+	if (!raw_spin_trylock(&mutex->wait_lock))
+		return ret;
+
+	init_rw_lists(rwmutex);
+
+	if (try_to_take_rw_write(rwmutex, NULL))
+		ret = 1;
+
+	raw_spin_unlock(&mutex->wait_lock);
+
+	return ret;
+}
+
+/*
+ * When a reader lock is released, see if the reader_lock_count
+ * can be moved back.
+ */
+static void shrink_reader_lock_array(struct task_struct *task)
+{
+	struct reader_lock_struct *read_locks = task->owned_read_locks;
+
+	while (task->reader_lock_count &&
+	       read_locks[task->reader_lock_count - 1].lock == NULL)
+		task->reader_lock_count--;
+
+	if (task->reader_lock_free > task->reader_lock_count)
+		task->reader_lock_free = task->reader_lock_count;
+}
+
+void rt_rw_mutex_read_unlock(struct rt_rw_mutex *rwmutex)
+{
+	struct rt_mutex *mutex = &rwmutex->mutex;
+	struct rt_mutex_waiter *waiter;
+	struct reader_lock_struct *rls;
+	struct task_struct *task = current;
+	unsigned long flags;
+	int readers;
+	int i;
+
+	raw_spin_lock(&mutex->wait_lock);
+
+	rt_mutex_deadlock_account_unlock(current);
+
+	WARN_ON_ONCE(!rwmutex->nr_owners);
+
+	rwmutex->nr_owners--;
+
+	if (rt_mutex_has_waiters(mutex)) {
+		/*
+		 * If the top waiter is a reader and we are under
+		 * the limit, or the top waiter is a writer and
+		 * there's no more readers, then we can give the
+		 * top waiter pending ownership and wake it up.
+		 *
+		 * To simplify things, we only wake up one task, even
+		 * if its a reader. If the reader wakes up and gets the
+		 * lock, it will look to see if it can wake up the next
+		 * waiter, and so on. This way we only need to worry about
+		 * one task at a time.
+		 */
+		waiter = rt_mutex_top_waiter(mutex);
+		readers = rwmutex->nr_owners;
+		if ((waiter->writer && !readers) ||
+		    (!waiter->writer && under_rt_rw_limit(readers)))
+			wakeup_next_rw_waiter(mutex);
+	} else
+		rwmutex->prio = MAX_PRIO;
+
+	if (!rwmutex->nr_owners)
+		rt_mutex_set_owner(&rwmutex->mutex, NULL);
+
+	raw_spin_lock_irqsave(&task->pi_lock, flags);
+	/* Remove the lock from this tasks list */
+	for (i = task->reader_lock_count - 1; i >= 0; i--) {
+		rls = &task->owned_read_locks[i];
+
+		if (rls->lock == rwmutex) {
+			rls->lock = NULL;
+			list_del_init(&rls->list);
+			/* Shrink the array if we can. */
+			if (i == task->reader_lock_count - 1)
+				shrink_reader_lock_array(task);
+			else if (i < task->reader_lock_free)
+				task->reader_lock_free = i;
+			break;
+		}
+	}
+	raw_spin_unlock_irqrestore(&task->pi_lock, flags);
+
+	WARN_ON_ONCE(i < 0);
+
+	raw_spin_unlock(&mutex->wait_lock);
+
+	/* Undo pi boosting if necessary */
+	rt_mutex_adjust_prio(current);
+}
+
+void rt_rw_mutex_downgrade_write(struct rt_rw_mutex *rwmutex)
+{
+	struct task_struct *task = current;
+	struct rt_mutex *mutex = &rwmutex->mutex;
+
+	raw_spin_lock(&mutex->wait_lock);
+
+	/*
+	 * Writers have normal pi with other tasks blocked
+	 * on the lock. That is, the top waiter will be in the
+	 * pi_list of this task. But for readers, waiters of
+	 * the lock are not included in the pi_list, only the
+	 * locks are. We must remove the top waiter of this
+	 * lock from this task.
+	 */
+	if (rt_mutex_has_waiters(mutex)) {
+		struct rt_mutex_waiter *waiter;
+		unsigned long flags;
+
+		waiter = rt_mutex_top_waiter(mutex);
+		raw_spin_lock_irqsave(&task->pi_lock, flags);
+		plist_del(&waiter->pi_list_entry, &task->pi_waiters);
+		/*
+		 * The rt_rw_mutex_take_as_reader() will update
+		 * the rwmutex prio.
+		 */
+		raw_spin_unlock_irqrestore(&task->pi_lock, flags);
+	}
+
+	WARN_ONCE(rwmutex->nr_owners, "Writer owned with readers");
+
+	rt_rw_mutex_take_as_reader(task, rwmutex, NULL);
+
+	raw_spin_unlock(&mutex->wait_lock);
+}
+
+static inline int task_has_reader_locks(struct task_struct *task)
+{
+	return task->reader_lock_count;
+}
+
+static int rt_mutex_get_readers_prio(struct task_struct *task, int prio)
+{
+	struct reader_lock_struct *rls;
+	struct rt_rw_mutex *rwmutex;
+	int lock_prio;
+	int i;
+
+	for (i = 0; i < task->reader_lock_count; i++) {
+		rls = &task->owned_read_locks[i];
+		rwmutex = rls->lock;
+		if (!rwmutex)
+			continue;
+		lock_prio = rwmutex->prio;
+		if (prio > lock_prio)
+			prio = lock_prio;
+	}
+	WARN_ON_ONCE(prio < 0);
+
+	return prio;
+}
+
+/* Expects to be called with lock->wait_lock held */
+static int rt_mutex_adjust_readers(struct rt_mutex *orig_lock,
+				   struct rt_mutex_waiter *orig_waiter,
+				   struct task_struct *top_task,
+				   struct rt_mutex *lock,
+				   int recursion_depth)
+{
+	struct reader_lock_struct *rls;
+	struct rt_mutex_waiter *waiter;
+	struct task_struct *task;
+	struct rt_rw_mutex *rwmutex = container_of(lock, struct rt_rw_mutex, mutex);
+	unsigned long flags;
+
+	/* Update the rwmutex's prio */
+	if (rt_mutex_has_waiters(lock)) {
+		waiter = rt_mutex_top_waiter(lock);
+		/*
+		 * Do we need to grab the task->pi_lock?
+		 * Really, we are only reading it. If it
+		 * changes, then that should follow this chain
+		 * too.
+		 */
+		rwmutex->prio = waiter->task->prio;
+	} else
+		rwmutex->prio = MAX_PRIO;
+
+	if (recursion_depth >= MAX_RWLOCK_DEPTH) {
+		WARN_ON(1);
+		return 1;
+	}
+
+	list_for_each_entry(rls, &rwmutex->owners, list) {
+		bool skip = false;
+
+		task = rls->task;
+
+		raw_spin_lock_irqsave(&task->pi_lock, flags);
+		__rt_mutex_adjust_prio(task);
+		/*
+		 * We need to grab the pi_lock to adjust the task prio
+		 * might as well use this to check if the task is blocked
+		 * as well, and save on a call to the prio chain that will
+		 * just grab the lock again and do the test.
+		 */
+		if (!rt_mutex_real_waiter(task->pi_blocked_on))
+			skip = true;
+		raw_spin_unlock_irqrestore(&task->pi_lock, flags);
+
+		if (skip)
+			continue;
+
+		get_task_struct(task);
+		/*
+		 * rt_mutex_adjust_prio_chain will do
+		 * the put_task_struct
+		 */
+		rt_mutex_adjust_prio_chain(task, 0, orig_lock,
+					   orig_waiter, top_task,
+					   recursion_depth+1);
+	}
+
+	return 0;
+}
 #else
+static int rt_mutex_adjust_readers(struct rt_mutex *orig_lock,
+				   struct rt_mutex_waiter *orig_waiter,
+				   struct task_struct *top_task,
+				   struct rt_mutex *lock,
+				   int recursion_depth)
+{
+	return 0;
+}
+
+static int rt_mutex_get_readers_prio(struct task_struct *task, int prio)
+{
+	return prio;
+}
+
+static inline int task_has_reader_locks(struct task_struct *task)
+{
+	return 0;
+}
+
 static inline int __sched
 __mutex_lock_check_stamp(struct rt_mutex *lock, struct ww_acquire_ctx *ctx)
 {
 	BUG();
 	return 0;
 }
-
-#endif
+#endif /* CONFIG_PREEMPT_RT */
 
 /**
  * __rt_mutex_slowlock() - Perform the wait-wake-try-to-take loop
@@ -1154,7 +1769,7 @@ rt_mutex_slowlock(struct rt_mutex *lock,
 	struct rt_mutex_waiter waiter;
 	int ret = 0;
 
-	rt_mutex_init_waiter(&waiter, false);
+	rt_mutex_init_waiter(&waiter, false, false);
 
 	raw_spin_lock(&lock->wait_lock);
 	init_lists(lock);
@@ -1718,4 +2333,12 @@ void __sched ww_mutex_unlock(struct ww_m
 	rt_mutex_unlock(&lock->base.lock);
 }
 EXPORT_SYMBOL(ww_mutex_unlock);
+
+static int __init rt_rw_limit_init(void)
+{
+	rt_rw_limit = num_possible_cpus();
+	return 0;
+}
+core_initcall(rt_rw_limit_init);
+
 #endif
Index: linux-rt.git/kernel/rtmutex_common.h
===================================================================
--- linux-rt.git.orig/kernel/rtmutex_common.h
+++ linux-rt.git/kernel/rtmutex_common.h
@@ -43,6 +43,7 @@ extern void schedule_rt_mutex_test(struc
  * @list_entry:		pi node to enqueue into the mutex waiters list
  * @pi_list_entry:	pi node to enqueue into the mutex owner waiters list
  * @task:		task reference to the blocked task
+ * @writer:		true if its a rwsem writer that is blocked
  */
 struct rt_mutex_waiter {
 	struct plist_node	list_entry;
@@ -50,6 +51,7 @@ struct rt_mutex_waiter {
 	struct task_struct	*task;
 	struct rt_mutex		*lock;
 	bool			savestate;
+	bool			writer;
 #ifdef CONFIG_DEBUG_RT_MUTEXES
 	unsigned long		ip;
 	struct pid		*deadlock_task_pid;
@@ -101,6 +103,20 @@ static inline struct task_struct *rt_mut
 		((unsigned long)lock->owner & ~RT_MUTEX_OWNER_MASKALL);
 }
 
+/* used as reader owner of the mutex */
+#define RT_RW_READER		(struct task_struct *)0x100
+
+#ifdef CONFIG_PREEMPT_RT_FULL
+
+void rt_rw_mutex_read_unlock(struct rt_rw_mutex *rwmutex);
+void rt_rw_mutex_downgrade_write(struct rt_rw_mutex *rwmutex);
+int rt_rw_mutex_write_trylock(struct rt_rw_mutex *rwmutex);
+void rt_rw_mutex_write_lock(struct rt_rw_mutex *rwmutex);
+int rt_rw_mutex_read_trylock(struct rt_rw_mutex *rwmutex);
+void rt_rw_mutex_read_lock(struct rt_rw_mutex *rwmutex);
+
+#endif /* CONFIG_PREEMPT_RT */
+
 /*
  * PI-futex support (proxy locking functions, etc.):
  */
@@ -128,11 +144,12 @@ extern int rt_mutex_finish_proxy_lock(st
 #endif
 
 static inline void
-rt_mutex_init_waiter(struct rt_mutex_waiter *waiter, bool savestate)
+rt_mutex_init_waiter(struct rt_mutex_waiter *waiter, bool savestate, bool writer)
 {
 	debug_rt_mutex_init_waiter(waiter);
 	waiter->task = NULL;
 	waiter->savestate = savestate;
+	waiter->writer = writer;
 }
 
 #endif
Index: linux-rt.git/kernel/rt.c
===================================================================
--- linux-rt.git.orig/kernel/rt.c
+++ linux-rt.git/kernel/rt.c
@@ -310,14 +310,19 @@ EXPORT_SYMBOL(__rt_rwlock_init);
 void  rt_up_write(struct rw_semaphore *rwsem)
 {
 	rwsem_release(&rwsem->dep_map, 1, _RET_IP_);
-	rt_mutex_unlock(&rwsem->lock);
+	/*
+	 * Unlocking a write is the same as unlocking the mutex.
+	 * The woken reader will do all the work if it needs to
+	 * wake up more than one reader.
+	 */
+	__rt_spin_unlock(&rwsem->rwlock.mutex);
 }
 EXPORT_SYMBOL(rt_up_write);
 
 void  rt_up_read(struct rw_semaphore *rwsem)
 {
 	rwsem_release(&rwsem->dep_map, 1, _RET_IP_);
-	rt_mutex_unlock(&rwsem->lock);
+	rt_rw_mutex_read_unlock(&rwsem->rwlock);
 }
 EXPORT_SYMBOL(rt_up_read);
 
@@ -327,13 +332,14 @@ EXPORT_SYMBOL(rt_up_read);
  */
 void  rt_downgrade_write(struct rw_semaphore *rwsem)
 {
-	BUG_ON(rt_mutex_owner(&rwsem->lock) != current);
+	BUG_ON(rt_mutex_owner(&rwsem->rwlock.mutex) != current);
+	rt_rw_mutex_downgrade_write(&rwsem->rwlock);
 }
 EXPORT_SYMBOL(rt_downgrade_write);
 
 int  rt_down_write_trylock(struct rw_semaphore *rwsem)
 {
-	int ret = rt_mutex_trylock(&rwsem->lock);
+	int ret = rt_rw_mutex_write_trylock(&rwsem->rwlock);
 
 	if (ret)
 		rwsem_acquire(&rwsem->dep_map, 0, 1, _RET_IP_);
@@ -344,14 +350,14 @@ EXPORT_SYMBOL(rt_down_write_trylock);
 void  rt_down_write(struct rw_semaphore *rwsem)
 {
 	rwsem_acquire(&rwsem->dep_map, 0, 0, _RET_IP_);
-	rt_mutex_lock(&rwsem->lock);
+	rt_rw_mutex_write_lock(&rwsem->rwlock);
 }
 EXPORT_SYMBOL(rt_down_write);
 
 void  rt_down_write_nested(struct rw_semaphore *rwsem, int subclass)
 {
 	rwsem_acquire(&rwsem->dep_map, subclass, 0, _RET_IP_);
-	rt_mutex_lock(&rwsem->lock);
+	rt_rw_mutex_write_lock(&rwsem->rwlock);
 }
 EXPORT_SYMBOL(rt_down_write_nested);
 
@@ -359,14 +365,14 @@ void rt_down_write_nested_lock(struct rw
 		struct lockdep_map *nest)
 {
 	rwsem_acquire_nest(&rwsem->dep_map, 0, 0, nest, _RET_IP_);
-	rt_mutex_lock(&rwsem->lock);
+	rt_rw_mutex_write_lock(&rwsem->rwlock);
 }
 
 int  rt_down_read_trylock(struct rw_semaphore *rwsem)
 {
 	int ret;
 
-	ret = rt_mutex_trylock(&rwsem->lock);
+	ret = rt_rw_mutex_read_trylock(&rwsem->rwlock);
 	if (ret)
 		rwsem_acquire(&rwsem->dep_map, 0, 1, _RET_IP_);
 
@@ -377,7 +383,7 @@ EXPORT_SYMBOL(rt_down_read_trylock);
 static void __rt_down_read(struct rw_semaphore *rwsem, int subclass)
 {
 	rwsem_acquire(&rwsem->dep_map, subclass, 0, _RET_IP_);
-	rt_mutex_lock(&rwsem->lock);
+	rt_rw_mutex_read_lock(&rwsem->rwlock);
 }
 
 void  rt_down_read(struct rw_semaphore *rwsem)
@@ -402,7 +408,8 @@ void  __rt_rwsem_init(struct rw_semaphor
 	debug_check_no_locks_freed((void *)rwsem, sizeof(*rwsem));
 	lockdep_init_map(&rwsem->dep_map, name, key, 0);
 #endif
-	rwsem->lock.save_state = 0;
+	rt_rw_mutex_init(&rwsem->rwlock);
+	rwsem->rwlock.mutex.save_state = 0;
 }
 EXPORT_SYMBOL(__rt_rwsem_init);
 
Index: linux-rt.git/include/linux/sched.h
===================================================================
--- linux-rt.git.orig/include/linux/sched.h
+++ linux-rt.git/include/linux/sched.h
@@ -993,6 +993,22 @@ struct sched_entity {
 #endif
 };
 
+#ifdef CONFIG_PREEMPT_RT_FULL
+struct rt_rw_mutex;
+/**
+ * reader_lock_struct
+ *
+ * @lock:	pointer to rwsem that is held
+ * @task:	pointer back to task, for lock code
+ * @list_head:	link into rt_rw_mutex owners list
+ */
+struct reader_lock_struct {
+	struct rt_rw_mutex		*lock;
+	struct task_struct		*task;
+	struct list_head		list;
+};
+#endif
+
 struct sched_rt_entity {
 	struct list_head run_list;
 	unsigned long timeout;
@@ -1224,6 +1240,10 @@ struct task_struct {
 #ifdef CONFIG_PREEMPT_RT_FULL
 	/* TODO: move me into ->restart_block ? */
 	struct siginfo forced_info;
+#define MAX_RWLOCK_DEPTH 5
+	int reader_lock_count;	/* index of last element in owned_read_locks */
+	int reader_lock_free;	/* index of next free element. */
+	struct reader_lock_struct owned_read_locks[MAX_RWLOCK_DEPTH];
 #endif
 
 	unsigned long sas_ss_sp;
Index: linux-rt.git/kernel/fork.c
===================================================================
--- linux-rt.git.orig/kernel/fork.c
+++ linux-rt.git/kernel/fork.c
@@ -1385,6 +1385,26 @@ static struct task_struct *copy_process(
 	if (retval)
 		goto bad_fork_cleanup_io;
 
+#ifdef CONFIG_PREEMPT_RT_FULL
+	p->reader_lock_count = 0;
+	p->reader_lock_free = 0;
+	/* Bracket to keep 'i' local */
+	{
+		int i;
+		/*
+		 * We could put the initialization of this list in
+		 * the grabbing of the lock, but it is safer to
+		 * do it now. The list head initialization may be
+		 * removed, but we'll keep it for now, just to be safe.
+		 */
+		for (i = 0; i < MAX_RWLOCK_DEPTH; i++) {
+			p->owned_read_locks[i].lock = NULL;
+			p->owned_read_locks[i].task = p;
+			INIT_LIST_HEAD(&p->owned_read_locks[i].list);
+		}
+	}
+#endif
+
 	if (pid != &init_struct_pid) {
 		retval = -ENOMEM;
 		pid = alloc_pid(p->nsproxy->pid_ns_for_children);
Index: linux-rt.git/kernel/sysctl.c
===================================================================
--- linux-rt.git.orig/kernel/sysctl.c
+++ linux-rt.git/kernel/sysctl.c
@@ -91,6 +91,10 @@
 #include <linux/nmi.h>
 #endif
 
+#ifdef CONFIG_PREEMPT_RT_FULL
+extern int rt_rw_limit;
+#endif
+
 
 #if defined(CONFIG_SYSCTL)
 
@@ -453,6 +457,15 @@ static struct ctl_table kern_table[] = {
 		.proc_handler	= proc_dointvec,
 	},
 #endif
+#ifdef CONFIG_PREEMPT_RT_FULL
+	{
+		.procname	= "rwsem_reader_limit",
+		.data		= &rt_rw_limit,
+		.maxlen		= sizeof(int),
+		.mode		= 0644,
+		.proc_handler	= &proc_dointvec,
+	},
+#endif
 	{
 		.procname	= "panic",
 		.data		= &panic_timeout,
Index: linux-rt.git/include/linux/rtmutex.h
===================================================================
--- linux-rt.git.orig/include/linux/rtmutex.h
+++ linux-rt.git/include/linux/rtmutex.h
@@ -42,6 +42,21 @@ struct rt_mutex {
 #endif
 };
 
+/**
+ * The rt_rw_mutex structure
+ *
+ * @rt_mutex:	The mutex to wait on
+ * @owners:	list of read owners of the mutex
+ * @nr_owners:	number of read owners
+ * @prio:	the priority of the highest waiter
+ */
+struct rt_rw_mutex {
+	struct rt_mutex		mutex;
+	struct list_head	owners;
+	int			nr_owners;
+	int			prio;
+};
+
 struct rt_mutex_waiter;
 struct hrtimer_sleeper;
 
@@ -75,6 +90,15 @@ struct hrtimer_sleeper;
 		__rt_mutex_init(mutex, #mutex);			\
 	} while (0)
 
+# define rt_rw_mutex_init(rwmutex)					\
+	do {								\
+		raw_spin_lock_init(&(rwmutex)->mutex.wait_lock);	\
+		INIT_LIST_HEAD(&(rwmutex)->owners);			\
+		(rwmutex)->nr_owners = 0;				\
+		(rwmutex)->prio = -1;					\
+		__rt_mutex_init(&(rwmutex)->mutex, #rwmutex);		\
+	} while (0)
+
 #define __RT_MUTEX_INITIALIZER_PLAIN(mutexname) \
 	.wait_lock = __RAW_SPIN_LOCK_UNLOCKED(mutexname.wait_lock) \
 	, .wait_list = PLIST_HEAD_INIT(mutexname.wait_list) \
@@ -85,6 +109,11 @@ struct hrtimer_sleeper;
 #define __RT_MUTEX_INITIALIZER(mutexname) \
 	{ __RT_MUTEX_INITIALIZER_PLAIN(mutexname) }
 
+#define __RT_RW_MUTEX_INITIALIZER(mutexname)			\
+	{ .owners = LIST_HEAD_INIT(mutexname.owners)		\
+	  , .prio = -1						\
+	  , .mutex = __RT_MUTEX_INITIALIZER(mutexname.mutex) }
+
 #define __RT_MUTEX_INITIALIZER_SAVE_STATE(mutexname) \
 	{ __RT_MUTEX_INITIALIZER_PLAIN(mutexname)    \
 	  , .save_state = 1 }
Index: linux-rt.git/include/linux/rwsem_rt.h
===================================================================
--- linux-rt.git.orig/include/linux/rwsem_rt.h
+++ linux-rt.git/include/linux/rwsem_rt.h
@@ -19,14 +19,14 @@
 #include <linux/rtmutex.h>
 
 struct rw_semaphore {
-	struct rt_mutex		lock;
+	struct rt_rw_mutex	rwlock;
 #ifdef CONFIG_DEBUG_LOCK_ALLOC
 	struct lockdep_map	dep_map;
 #endif
 };
 
 #define __RWSEM_INITIALIZER(name) \
-	{ .lock = __RT_MUTEX_INITIALIZER(name.lock), \
+	{ .rwlock = __RT_RW_MUTEX_INITIALIZER(name.rwlock), \
 	  RW_DEP_MAP_INIT(name) }
 
 #define DECLARE_RWSEM(lockname) \
@@ -37,7 +37,7 @@ extern void  __rt_rwsem_init(struct rw_s
 
 #define __rt_init_rwsem(sem, name, key)			\
 	do {						\
-		rt_mutex_init(&(sem)->lock);		\
+		rt_rw_mutex_init(&(sem)->rwlock);	\
 		__rt_rwsem_init((sem), (name), (key));\
 	} while (0)
 
@@ -63,7 +63,7 @@ extern void  rt_up_write(struct rw_semap
 extern void  rt_downgrade_write(struct rw_semaphore *rwsem);
 
 #define init_rwsem(sem)		rt_init_rwsem(sem)
-#define rwsem_is_locked(s)	rt_mutex_is_locked(&(s)->lock)
+#define rwsem_is_locked(s)	rt_mutex_is_locked(&(s)->rwlock.mutex)
 
 static inline void down_read(struct rw_semaphore *sem)
 {
Index: linux-rt.git/kernel/futex.c
===================================================================
--- linux-rt.git.orig/kernel/futex.c
+++ linux-rt.git/kernel/futex.c
@@ -2327,7 +2327,7 @@ static int futex_wait_requeue_pi(u32 __u
 	 * The waiter is allocated on our stack, manipulated by the requeue
 	 * code while we sleep on uaddr.
 	 */
-	rt_mutex_init_waiter(&rt_waiter, false);
+	rt_mutex_init_waiter(&rt_waiter, false, false);
 
 	ret = get_futex_key(uaddr2, flags & FLAGS_SHARED, &key2, VERIFY_WRITE);
 	if (unlikely(ret != 0))

^ permalink raw reply	[flat|nested] 28+ messages in thread

* Re: [RFC PATCH RT] rwsem: The return of multi-reader PI rwsems
  2014-04-10 15:36     ` Peter Zijlstra
@ 2014-04-10 19:17       ` Steven Rostedt
  2014-04-10 20:48         ` Peter Zijlstra
  2014-04-10 21:30         ` Paul E. McKenney
  0 siblings, 2 replies; 28+ messages in thread
From: Steven Rostedt @ 2014-04-10 19:17 UTC (permalink / raw)
  To: Peter Zijlstra
  Cc: Sebastian Andrzej Siewior, Clark Williams, LKML, linux-rt-users,
	Mike Galbraith, Paul E. McKenney, Paul Gortmaker,
	Thomas Gleixner, Frederic Weisbecker, Ingo Molnar

On Thu, 10 Apr 2014 17:36:17 +0200
Peter Zijlstra <peterz@infradead.org> wrote:


> It defaults to the total number of CPUs in the system, given the default
> setup (all CPUs in a single balance domain), this should result in all
> CPUs working concurrently on the boosted read sides.

Unfortunately, it currently defaults to the number of possible CPUs in
the system. I should probably move the default assignment to after SMP
is setup. Currently it happens in early boot before all the CPUs are
running. On boot up, the limit is set to NR_CPUS which should be much
higher than what the system has, but shouldn't matter during boot. But
after all the CPUs are up and running, it can lower it to online CPUs.

I think I'll go and make v3 of this patch.

-- Steve

^ permalink raw reply	[flat|nested] 28+ messages in thread

* Re: [RFC PATCH RT] rwsem: The return of multi-reader PI rwsems
  2014-04-10 19:17       ` Steven Rostedt
@ 2014-04-10 20:48         ` Peter Zijlstra
  2014-04-10 21:30         ` Paul E. McKenney
  1 sibling, 0 replies; 28+ messages in thread
From: Peter Zijlstra @ 2014-04-10 20:48 UTC (permalink / raw)
  To: Steven Rostedt
  Cc: Sebastian Andrzej Siewior, Clark Williams, LKML, linux-rt-users,
	Mike Galbraith, Paul E. McKenney, Paul Gortmaker,
	Thomas Gleixner, Frederic Weisbecker, Ingo Molnar

On Thu, Apr 10, 2014 at 03:17:41PM -0400, Steven Rostedt wrote:
> On Thu, 10 Apr 2014 17:36:17 +0200
> Peter Zijlstra <peterz@infradead.org> wrote:
> 
> 
> > It defaults to the total number of CPUs in the system, given the default
> > setup (all CPUs in a single balance domain), this should result in all
> > CPUs working concurrently on the boosted read sides.
> 
> Unfortunately, it currently defaults to the number of possible CPUs in
> the system. I should probably move the default assignment to after SMP
> is setup. Currently it happens in early boot before all the CPUs are
> running. On boot up, the limit is set to NR_CPUS which should be much
> higher than what the system has, but shouldn't matter during boot. But
> after all the CPUs are up and running, it can lower it to online CPUs.
> 
> I think I'll go and make v3 of this patch.

Yah, I should have read the patch in more than 5 seconds flat I suppose
:-)

modifying the number post smp bringup should be fine.

^ permalink raw reply	[flat|nested] 28+ messages in thread

* Re: [RFC PATCH RT] rwsem: The return of multi-reader PI rwsems
  2014-04-10 19:17       ` Steven Rostedt
  2014-04-10 20:48         ` Peter Zijlstra
@ 2014-04-10 21:30         ` Paul E. McKenney
  2014-04-10 22:02           ` Steven Rostedt
  1 sibling, 1 reply; 28+ messages in thread
From: Paul E. McKenney @ 2014-04-10 21:30 UTC (permalink / raw)
  To: Steven Rostedt
  Cc: Peter Zijlstra, Sebastian Andrzej Siewior, Clark Williams, LKML,
	linux-rt-users, Mike Galbraith, Paul Gortmaker, Thomas Gleixner,
	Frederic Weisbecker, Ingo Molnar

On Thu, Apr 10, 2014 at 03:17:41PM -0400, Steven Rostedt wrote:
> On Thu, 10 Apr 2014 17:36:17 +0200
> Peter Zijlstra <peterz@infradead.org> wrote:
> 
> 
> > It defaults to the total number of CPUs in the system, given the default
> > setup (all CPUs in a single balance domain), this should result in all
> > CPUs working concurrently on the boosted read sides.
> 
> Unfortunately, it currently defaults to the number of possible CPUs in
> the system. I should probably move the default assignment to after SMP
> is setup. Currently it happens in early boot before all the CPUs are
> running. On boot up, the limit is set to NR_CPUS which should be much
> higher than what the system has, but shouldn't matter during boot. But
> after all the CPUs are up and running, it can lower it to online CPUs.

Another approach is to use nr_cpu_ids, which is the maximum number of
CPUs that the particular booting system could ever have.  I use this in
RCU to resize the data structures down from their NR_CPUS compile-time
hugeness.

							Thanx, Paul

> I think I'll go and make v3 of this patch.
> 
> -- Steve
> 


^ permalink raw reply	[flat|nested] 28+ messages in thread

* Re: [RFC PATCH RT] rwsem: The return of multi-reader PI rwsems
  2014-04-10 21:30         ` Paul E. McKenney
@ 2014-04-10 22:02           ` Steven Rostedt
  0 siblings, 0 replies; 28+ messages in thread
From: Steven Rostedt @ 2014-04-10 22:02 UTC (permalink / raw)
  To: paulmck
  Cc: Peter Zijlstra, Sebastian Andrzej Siewior, Clark Williams, LKML,
	linux-rt-users, Mike Galbraith, Paul Gortmaker, Thomas Gleixner,
	Frederic Weisbecker, Ingo Molnar

On Thu, 10 Apr 2014 14:30:03 -0700
"Paul E. McKenney" <paulmck@linux.vnet.ibm.com> wrote:

> On Thu, Apr 10, 2014 at 03:17:41PM -0400, Steven Rostedt wrote:
> > On Thu, 10 Apr 2014 17:36:17 +0200
> > Peter Zijlstra <peterz@infradead.org> wrote:
> > 
> > 
> > > It defaults to the total number of CPUs in the system, given the default
> > > setup (all CPUs in a single balance domain), this should result in all
> > > CPUs working concurrently on the boosted read sides.
> > 
> > Unfortunately, it currently defaults to the number of possible CPUs in
> > the system. I should probably move the default assignment to after SMP
> > is setup. Currently it happens in early boot before all the CPUs are
> > running. On boot up, the limit is set to NR_CPUS which should be much
> > higher than what the system has, but shouldn't matter during boot. But
> > after all the CPUs are up and running, it can lower it to online CPUs.
> 
> Another approach is to use nr_cpu_ids, which is the maximum number of
> CPUs that the particular booting system could ever have.  I use this in
> RCU to resize the data structures down from their NR_CPUS compile-time
> hugeness.
> 

OK, also, in doing our benchmarks, there's a big difference with
rt_rw_limit being num_online_cpus and 2 * num_online_cpus. It doesn't
seem to get better adding more than that. This was shown on a case with
12 cpus as well as 8 cpus. Same result.

I really like to see a real use case benefit to find the best default.
But as our mmap_sem stress test shows 2xCPUS as being the best, I'm
going to go with that until someone comes up with a better test.

-- Steve

^ permalink raw reply	[flat|nested] 28+ messages in thread

* [RFC PATCH RT V3] rwsem: The return of multi-reader PI rwsems
  2014-04-10 17:42 ` [RFC PATCH RT V2] " Steven Rostedt
@ 2014-04-11  2:35   ` Steven Rostedt
  2014-04-11 12:47     ` Carsten Emde
  0 siblings, 1 reply; 28+ messages in thread
From: Steven Rostedt @ 2014-04-11  2:35 UTC (permalink / raw)
  To: LKML, linux-rt-users
  Cc: Mike Galbraith, Paul E. McKenney, Paul Gortmaker,
	Thomas Gleixner, Sebastian Andrzej Siewior, Clark Williams,
	Frederic Weisbecker, Peter Zijlstra, Ingo Molnar, Carsten Emde

Changes since v2 - move the setting of the rt_rw_limit to
late_initcall, and use 2 x nr_cpu_ids. Thanks to Paul McKenney for
suggesting using nr_cpu_ids instead of num_possible_cpus(). The 2x is
because out testing shows that 2x gives twice the performance as 1x
using Clark's whack_mmap_sem benchmark.

Also fixed the rwsem_reader_limit == 0. Now it gives much better
performance. The wrapper function to test rt_rw_limit tested zero
against the counter and not the rt_rw_limit. It didn't lock up because
it allowed a reader to take the lock if it was the only one. Basically
it simulated rt_rw_limit == 1.

I added Carsten to the Cc, so I'll post the entire change log of v1
here again.

-----

A while back ago I wrote a patch that would allow for reader/writer
locks like rwlock and rwsems to have multiple readers in PREEMPT_RT. It
was slick and fast but unfortunately it was way too complex and ridden
with nasty little critters which earned me my large collection of
frozen sharks in the fridge (which are quite tasty).

The main problem with my previous solution was that I tried to be too
clever. I worked hard on making the rw mutex still have the "fast
path". That is, the cmpxchg that could allow a non contended grabbing
of the lock be one instruction and be off with it. But to get that
working required lots of tricks and black magic that was certainly
going to fail. Thus, with the raining of sharks on my parade, the
priority inheritance mutex with multiple owners died a slow painful
death.

So we thought.

But over the years, a new darkness was on the horizon. Complaints about
running highly threaded processes (did I hear Java?) were suffering
huge performance hits on the PREEMPT_RT kernel. Whether or not the
processes were real-time tasks, they still were horrible compared to
running the same tasks on the mainline kernel. Note, this was being
done on machines with many CPUs.

The culprit mostly was a single rwsem, the notorious mmap_sem that
can be taking several times for read, and as on RT, this is just a
single mutex, and it would serialize these accesses that would not
happen on mainline.

I looked back at my poor dead rw multi pi reader patch and thought to
myself. "How complex would this be if I removed the 'fast path' from
the code". I decided to build a new tower in Mordor.

I feel that I am correct. By removing the fast path and requiring all
accesses to the rwsem to go through the slow path (must take the
wait_lock to do anything). The code really wasn't that bad. I also only
focused on the rwsem and did not worry about the rwlocks as that hasn't
been pointed out as a bottle neck yet. If it does happen to be, this
code could easily work on rwlocks too.

I'm much more confident in this code than I was with my previous
version of the rwlock multi-reader patch. I added a bunch of comments
to this code to explain how things interact. The writer unlock was
still able to use the fast path as the writers are pretty much like a
normal mutex. Too bad that the writer unlock is not a high point of
contention.

This patch is built on top of the two other patches that I posted
earlier, which should not be as controversial.

If you have any benchmark on large machines I would be very happy if
you could test this patch against the unpatched version of -rt.

Cheers,

-- Steve

Signed-off-by: Steven Rostedt <rostedt@goodmis.org>

Index: linux-rt.git/kernel/rtmutex.c
===================================================================
--- linux-rt.git.orig/kernel/rtmutex.c
+++ linux-rt.git/kernel/rtmutex.c
@@ -26,6 +26,21 @@
 #include "rtmutex_common.h"
 
 /*
+ * rt_rw_limit is the number of simultaneous readers of a rwsem lock.
+ *
+ * rt_rw_limit gets updated on boot up to the number of
+ * possible CPUs, but we need to initialize it to something other
+ * than zero.
+ */
+unsigned rt_rw_limit = NR_CPUS;
+
+/* cnt == 0 means unlimited */
+static inline int under_rt_rw_limit(int cnt)
+{
+	return !rt_rw_limit || cnt < rt_rw_limit;
+}
+
+/*
  * lock->owner state tracking:
  *
  * lock->owner holds the task_struct pointer of the owner. Bit 0
@@ -110,19 +125,44 @@ static inline void init_lists(struct rt_
 		plist_head_init(&lock->wait_list);
 }
 
+static inline void init_rw_lists(struct rt_rw_mutex *rwlock)
+{
+	struct rt_mutex *lock = &rwlock->mutex;
+
+	/*
+	 * A rwsem priority is initialized to -1 and never will
+	 * be that again.
+	 */
+	if (unlikely(rwlock->prio < 0)) {
+		rwlock->prio = MAX_PRIO;
+		init_lists(lock);
+	}
+}
+
+static int rt_mutex_get_readers_prio(struct task_struct *task, int prio);
+static inline int task_has_reader_locks(struct task_struct *task);
+
 /*
  * Calculate task priority from the waiter list priority
  *
  * Return task->normal_prio when the waiter list is empty or when
  * the waiter is not allowed to do priority boosting
+ *
+ * On PREEMPT_RT, we also check the priorities of the list
+ * of read locks that the task holds.
  */
 int rt_mutex_getprio(struct task_struct *task)
 {
-	if (likely(!task_has_pi_waiters(task)))
-		return task->normal_prio;
+	int prio = task->normal_prio;
+
+	if (likely(!task_has_pi_waiters(task) &&
+		   !task_has_reader_locks(task)))
+		return prio;
 
-	return min(task_top_pi_waiter(task)->pi_list_entry.prio,
-		   task->normal_prio);
+	if (task_has_reader_locks(task))
+		prio = rt_mutex_get_readers_prio(task, prio);
+
+	return min(task_top_pi_waiter(task)->pi_list_entry.prio, prio);
 }
 
 /*
@@ -181,6 +221,11 @@ static void rt_mutex_wake_waiter(struct
  */
 int max_lock_depth = 1024;
 
+static int rt_mutex_adjust_readers(struct rt_mutex *orig_lock,
+				   struct rt_mutex_waiter *orig_waiter,
+				   struct task_struct *top_task,
+				   struct rt_mutex *lock,
+				   int recursion_depth);
 /*
  * Adjust the priority chain. Also used for deadlock detection.
  * Decreases task's usage by one - may thus free the task.
@@ -203,7 +248,8 @@ static int rt_mutex_adjust_prio_chain(st
 				      int deadlock_detect,
 				      struct rt_mutex *orig_lock,
 				      struct rt_mutex_waiter *orig_waiter,
-				      struct task_struct *top_task)
+				      struct task_struct *top_task,
+				      int recursion_depth)
 {
 	struct rt_mutex *lock;
 	struct rt_mutex_waiter *waiter, *top_waiter = orig_waiter;
@@ -316,6 +362,18 @@ static int rt_mutex_adjust_prio_chain(st
 
 	/* Grab the next task */
 	task = rt_mutex_owner(lock);
+
+	/*
+	 * Readers are special. We may need to boost more than one owner.
+	 */
+	if (unlikely(task == RT_RW_READER)) {
+		ret = rt_mutex_adjust_readers(orig_lock, orig_waiter,
+					      top_task, lock,
+					      recursion_depth);
+		raw_spin_unlock(&lock->wait_lock);
+		goto out;
+	}
+
 	get_task_struct(task);
 	raw_spin_lock_irqsave(&task->pi_lock, flags);
 
@@ -349,7 +407,7 @@ static int rt_mutex_adjust_prio_chain(st
 	raw_spin_unlock_irqrestore(&task->pi_lock, flags);
  out_put_task:
 	put_task_struct(task);
-
+ out:
 	return ret;
 }
 
@@ -518,6 +576,13 @@ static int task_blocks_on_rt_mutex(struc
 		return 0;
 
 	if (waiter == rt_mutex_top_waiter(lock)) {
+		/* readers are handled differently */
+		if (unlikely(owner == RT_RW_READER)) {
+			res = rt_mutex_adjust_readers(lock, waiter,
+						      current, lock, 0);
+			return res;
+		}
+
 		raw_spin_lock_irqsave(&owner->pi_lock, flags);
 		plist_del(&top_waiter->pi_list_entry, &owner->pi_waiters);
 		plist_add(&waiter->pi_list_entry, &owner->pi_waiters);
@@ -527,7 +592,8 @@ static int task_blocks_on_rt_mutex(struc
 			chain_walk = 1;
 		raw_spin_unlock_irqrestore(&owner->pi_lock, flags);
 	}
-	else if (debug_rt_mutex_detect_deadlock(waiter, detect_deadlock))
+	else if (debug_rt_mutex_detect_deadlock(waiter, detect_deadlock) &&
+		 owner != RT_RW_READER)
 		chain_walk = 1;
 
 	if (!chain_walk)
@@ -543,7 +609,7 @@ static int task_blocks_on_rt_mutex(struc
 	raw_spin_unlock(&lock->wait_lock);
 
 	res = rt_mutex_adjust_prio_chain(owner, detect_deadlock, lock, waiter,
-					 task);
+					 task, 0);
 
 	raw_spin_lock(&lock->wait_lock);
 
@@ -633,7 +699,7 @@ static void remove_waiter(struct rt_mute
 
 	raw_spin_unlock(&lock->wait_lock);
 
-	rt_mutex_adjust_prio_chain(owner, 0, lock, NULL, current);
+	rt_mutex_adjust_prio_chain(owner, 0, lock, NULL, current, 0);
 
 	raw_spin_lock(&lock->wait_lock);
 }
@@ -660,7 +726,7 @@ void rt_mutex_adjust_pi(struct task_stru
 	/* gets dropped in rt_mutex_adjust_prio_chain()! */
 	get_task_struct(task);
 	raw_spin_unlock_irqrestore(&task->pi_lock, flags);
-	rt_mutex_adjust_prio_chain(task, 0, NULL, NULL, task);
+	rt_mutex_adjust_prio_chain(task, 0, NULL, NULL, task, 0);
 }
 
 #ifdef CONFIG_PREEMPT_RT_FULL
@@ -739,7 +805,7 @@ static void  noinline __sched rt_spin_lo
 	struct rt_mutex_waiter waiter, *top_waiter;
 	int ret;
 
-	rt_mutex_init_waiter(&waiter, true);
+	rt_mutex_init_waiter(&waiter, true, false);
 
 	raw_spin_lock(&lock->wait_lock);
 	init_lists(lock);
@@ -1001,15 +1067,564 @@ __mutex_lock_check_stamp(struct rt_mutex
 
 	return 0;
 }
+
+/*
+ * Wake up the next waiter on a rw lock.
+ *
+ * Similar to wakeup_next_waiter() but the waiter is not on
+ * the owner's pi_waiters. Also, it does not reset the lock
+ * owner.
+ *
+ * Called with lock->wait_lock held.
+ */
+static void wakeup_next_rw_waiter(struct rt_mutex *lock)
+{
+	struct rt_mutex_waiter *waiter;
+
+	waiter = rt_mutex_top_waiter(lock);
+	rt_mutex_wake_waiter(waiter);
+}
+
+/* Called with rwmutex->mutex.wait_lock held */
+static inline void
+rt_rw_mutex_take_as_reader(struct task_struct *task,
+			   struct rt_rw_mutex *rwmutex,
+			   struct rt_mutex_waiter *waiter)
+{
+	struct reader_lock_struct *rls;
+	struct rt_mutex *mutex = &rwmutex->mutex;
+	unsigned long flags;
+	bool wakeup = false;
+
+	rwmutex->nr_owners++;
+	rt_mutex_set_owner(&rwmutex->mutex, RT_RW_READER);
+
+	if (waiter || rt_mutex_has_waiters(mutex)) {
+		unsigned long flags;
+		struct rt_mutex_waiter *top;
+
+		raw_spin_lock_irqsave(&task->pi_lock, flags);
+
+		/* remove the queued waiter. */
+		if (waiter) {
+			plist_del(&waiter->list_entry, &mutex->wait_list);
+			task->pi_blocked_on = NULL;
+		}
+
+		/*
+		 * Initialize the rwmutex prio to the priority of
+		 * the top waiter.
+		 */
+		if (rt_mutex_has_waiters(mutex)) {
+			top = rt_mutex_top_waiter(mutex);
+			top->pi_list_entry.prio = top->list_entry.prio;
+			/*
+			 * Readers set the lock priority for faster access
+			 * to read the priorities of the locks it owns
+			 * when boosting. This helps to not have to take
+			 * the pi_lock of the task. The rwmutex->prio
+			 * is protected by the rwmutex->mutex.wait_lock,
+			 * which is held during boosting.
+			 */
+			rwmutex->prio = top->list_entry.prio;
+
+			/*
+			 * If this waiter is a reader, and the reader limit
+			 * has not been hit, then we can wake this waiter
+			 * up too.
+			 */
+			if (!top->writer && under_rt_rw_limit(rwmutex->nr_owners))
+				wakeup = true;
+		} else
+			rwmutex->prio = MAX_PRIO;
+		raw_spin_unlock_irqrestore(&task->pi_lock, flags);
+
+	} else
+		rwmutex->prio = MAX_PRIO;
+
+	/*
+	 * It is possible to have holes in the owned_read_locks array.
+	 * If we take read lock A and then B, but then release A, we
+	 * can't move the pointer of B because if something blocks on
+	 * B, it can use the B pointer to boost this task. B can only
+	 * be moved, by owning the wait_list lock of B. Remember, the
+	 * B lock has its pointers to that index of our array.
+	 */
+	rls = &task->owned_read_locks[task->reader_lock_free];
+	BUG_ON(rls->lock);
+
+	/*
+	 * Grabing the pi_lock here as other tasks can boost this
+	 * lock via other held locks, and if free lock descriptor
+	 * was not at the end of the owned_read_locks array, then
+	 * it might get confused if it sees this descriptor with
+	 * a lock and no task.
+	 */
+	raw_spin_lock_irqsave(&task->pi_lock, flags);
+	rls->lock = rwmutex;
+	rls->task = task;
+	list_add(&rls->list, &rwmutex->owners);
+	raw_spin_unlock_irqrestore(&task->pi_lock, flags);
+
+	if (task->reader_lock_free == task->reader_lock_count) {
+		/*
+		 * If we nest too deep, then this task can never get the lock.
+		 * This task will then block for good. Warn about this and
+		 * hopefully, people will notice the warning. If not, they will notice
+		 * the dead task. Maybe this should be a BUG_ON()
+		 */
+		if (WARN_ON(task->reader_lock_count == MAX_RWLOCK_DEPTH))
+			return;
+
+		/*
+		 * We don't need to take the pi_lock here as we have the
+		 * wait_lock of the lock at the end of the list. If this task
+		 * was being boosted by a task blocked on this lock, it would
+		 * need to grab the wait_lock before boosting this task.
+		 */
+		task->reader_lock_count++;
+		task->reader_lock_free++;
+	} else {
+		/*
+		 * Find the next free lock in array. Again, we do not need
+		 * to grab the pi_lock because the boosting doesn't use
+		 * the reader_lock_free variable, which is the only thing
+		 * we are updating here.
+		 */
+		do {
+			rls = &task->owned_read_locks[++task->reader_lock_free];
+		} while (rls->lock &&
+			 task->reader_lock_free < task->reader_lock_count);
+	}
+
+	if (wakeup)
+		wakeup_next_rw_waiter(&rwmutex->mutex);
+}
+
+static int try_to_take_rw_read(struct rt_rw_mutex *rwmutex,
+			       struct rt_mutex_waiter *waiter)
+{
+	struct task_struct *task = current;
+	struct rt_mutex *mutex = &rwmutex->mutex;
+	struct task_struct *owner;
+
+	assert_raw_spin_locked(&mutex->wait_lock);
+
+	/* The writer unlock can use the fast path */
+	mark_rt_mutex_waiters(mutex);
+
+	/* If we are at the reader limit, we can't take the lock */
+	if (unlikely(!under_rt_rw_limit(rwmutex->nr_owners)))
+		return 0;
+
+	/*
+	 * If there's no waiters, and there is no owner or
+	 * the owner is a reader, we get the lock.
+	 * Note, if the waiter or pending bits are set in the owner
+	 * then we need to do more checks.
+	 */
+	if (likely(!mutex->owner || mutex->owner == RT_RW_READER))
+		goto taken;
+
+	owner = rt_mutex_owner(mutex);
+
+	/* If the lock is owned by a task, then it is a writer */
+	if (owner && owner != RT_RW_READER)
+		return 0;
+
+	/*
+	 * A writer or a reader may be waiting. In either case, we
+	 * may still be able to steal the lock. The RT rwsems are
+	 * not fair if the new reader comes in and is higher priority
+	 * than all the waiters.
+	 */
+	if (likely(rt_mutex_has_waiters(mutex))) {
+		struct task_struct *pown = rt_mutex_top_waiter(mutex)->task;
+
+		if (task != pown && !lock_is_stealable(task, pown, STEAL_NORMAL))
+			return 0;
+	}
+
+ taken:
+	rt_rw_mutex_take_as_reader(task, rwmutex, waiter);
+	return 1;
+}
+
+void rt_rw_mutex_read_lock(struct rt_rw_mutex *rwmutex)
+{
+	struct rt_mutex_waiter waiter;
+	struct rt_mutex *mutex = &rwmutex->mutex;
+	int ret;
+
+	raw_spin_lock(&mutex->wait_lock);
+	init_rw_lists(rwmutex);
+
+	if (likely(try_to_take_rw_read(rwmutex, NULL))) {
+		/* Got the lock */
+		raw_spin_unlock(&mutex->wait_lock);
+		return;
+	}
+
+	rt_mutex_init_waiter(&waiter, false, false);
+
+	set_current_state(TASK_UNINTERRUPTIBLE);
+
+	ret = task_blocks_on_rt_mutex(mutex, &waiter, current, 0);
+	BUG_ON(ret);
+
+	for (;;) {
+		/* Try to acquire the lock: */
+		if (try_to_take_rw_read(rwmutex, &waiter))
+			break;
+
+		raw_spin_unlock(&mutex->wait_lock);
+
+		debug_rt_mutex_print_deadlock(waiter);
+
+		schedule_rt_mutex(mutex);
+
+		raw_spin_lock(&mutex->wait_lock);
+		set_current_state(TASK_UNINTERRUPTIBLE);
+	}
+
+	set_current_state(TASK_RUNNING);
+
+	raw_spin_unlock(&mutex->wait_lock);
+
+	debug_rt_mutex_free_waiter(&waiter);
+}
+
+int rt_rw_mutex_read_trylock(struct rt_rw_mutex *rwmutex)
+{
+	struct rt_mutex *mutex = &rwmutex->mutex;
+	int ret = 0;
+
+	if (!raw_spin_trylock(&mutex->wait_lock))
+		return ret;
+
+	init_rw_lists(rwmutex);
+
+	if (try_to_take_rw_read(rwmutex, NULL))
+		ret = 1;
+
+	raw_spin_unlock(&mutex->wait_lock);
+
+	return ret;
+}
+
+static int
+try_to_take_rw_write(struct rt_rw_mutex *rwmutex, struct rt_mutex_waiter *waiter)
+{
+	struct rt_mutex *mutex = &rwmutex->mutex;
+
+	/* Writers block if there's any readers owning the lock */
+	if (rwmutex->nr_owners)
+		return 0;
+
+	/* No readers, then we can try to take the mutex normally. */
+	return try_to_take_rt_mutex(mutex, current, waiter);
+}
+
+void rt_rw_mutex_write_lock(struct rt_rw_mutex *rwmutex)
+{
+	struct rt_mutex *mutex = &rwmutex->mutex;
+	struct rt_mutex_waiter waiter;
+	struct task_struct *task = current;
+	int ret;
+
+	raw_spin_lock(&mutex->wait_lock);
+	init_rw_lists(rwmutex);
+
+	if (try_to_take_rw_write(rwmutex, NULL)) {
+		raw_spin_unlock(&mutex->wait_lock);
+		return;
+	}
+
+	/* Writers wake up differently than readers (flag it) */
+	rt_mutex_init_waiter(&waiter, false, true);
+
+	ret = task_blocks_on_rt_mutex(mutex, &waiter, task, 0);
+	BUG_ON(ret);
+
+	set_current_state(TASK_UNINTERRUPTIBLE);
+
+	for (;;) {
+		/* Try to acquire the lock: */
+		if (try_to_take_rw_write(rwmutex, &waiter))
+			break;
+
+		raw_spin_unlock(&mutex->wait_lock);
+
+		debug_rt_mutex_print_deadlock(waiter);
+
+		schedule_rt_mutex(mutex);
+
+		raw_spin_lock(&mutex->wait_lock);
+		set_current_state(TASK_UNINTERRUPTIBLE);
+	}
+
+	set_current_state(TASK_RUNNING);
+
+	raw_spin_unlock(&mutex->wait_lock);
+	WARN_ONCE(rwmutex->nr_owners, "Writer has lock with readers");
+
+	debug_rt_mutex_free_waiter(&waiter);
+}
+
+int rt_rw_mutex_write_trylock(struct rt_rw_mutex *rwmutex)
+{
+	struct rt_mutex *mutex = &rwmutex->mutex;
+	int ret = 0;
+
+	if (!raw_spin_trylock(&mutex->wait_lock))
+		return ret;
+
+	init_rw_lists(rwmutex);
+
+	if (try_to_take_rw_write(rwmutex, NULL))
+		ret = 1;
+
+	raw_spin_unlock(&mutex->wait_lock);
+
+	return ret;
+}
+
+/*
+ * When a reader lock is released, see if the reader_lock_count
+ * can be moved back.
+ */
+static void shrink_reader_lock_array(struct task_struct *task)
+{
+	struct reader_lock_struct *read_locks = task->owned_read_locks;
+
+	while (task->reader_lock_count &&
+	       read_locks[task->reader_lock_count - 1].lock == NULL)
+		task->reader_lock_count--;
+
+	if (task->reader_lock_free > task->reader_lock_count)
+		task->reader_lock_free = task->reader_lock_count;
+}
+
+void rt_rw_mutex_read_unlock(struct rt_rw_mutex *rwmutex)
+{
+	struct rt_mutex *mutex = &rwmutex->mutex;
+	struct rt_mutex_waiter *waiter;
+	struct reader_lock_struct *rls;
+	struct task_struct *task = current;
+	unsigned long flags;
+	int readers;
+	int i;
+
+	raw_spin_lock(&mutex->wait_lock);
+
+	rt_mutex_deadlock_account_unlock(current);
+
+	WARN_ON_ONCE(!rwmutex->nr_owners);
+
+	rwmutex->nr_owners--;
+
+	if (rt_mutex_has_waiters(mutex)) {
+		/*
+		 * If the top waiter is a reader and we are under
+		 * the limit, or the top waiter is a writer and
+		 * there's no more readers, then we can give the
+		 * top waiter pending ownership and wake it up.
+		 *
+		 * To simplify things, we only wake up one task, even
+		 * if its a reader. If the reader wakes up and gets the
+		 * lock, it will look to see if it can wake up the next
+		 * waiter, and so on. This way we only need to worry about
+		 * one task at a time.
+		 */
+		waiter = rt_mutex_top_waiter(mutex);
+		readers = rwmutex->nr_owners;
+		if ((waiter->writer && !readers) ||
+		    (!waiter->writer && under_rt_rw_limit(readers)))
+			wakeup_next_rw_waiter(mutex);
+	} else
+		rwmutex->prio = MAX_PRIO;
+
+	if (!rwmutex->nr_owners)
+		rt_mutex_set_owner(&rwmutex->mutex, NULL);
+
+	raw_spin_lock_irqsave(&task->pi_lock, flags);
+	/* Remove the lock from this tasks list */
+	for (i = task->reader_lock_count - 1; i >= 0; i--) {
+		rls = &task->owned_read_locks[i];
+
+		if (rls->lock == rwmutex) {
+			rls->lock = NULL;
+			list_del_init(&rls->list);
+			/* Shrink the array if we can. */
+			if (i == task->reader_lock_count - 1)
+				shrink_reader_lock_array(task);
+			else if (i < task->reader_lock_free)
+				task->reader_lock_free = i;
+			break;
+		}
+	}
+	raw_spin_unlock_irqrestore(&task->pi_lock, flags);
+
+	WARN_ON_ONCE(i < 0);
+
+	raw_spin_unlock(&mutex->wait_lock);
+
+	/* Undo pi boosting if necessary */
+	rt_mutex_adjust_prio(current);
+}
+
+void rt_rw_mutex_downgrade_write(struct rt_rw_mutex *rwmutex)
+{
+	struct task_struct *task = current;
+	struct rt_mutex *mutex = &rwmutex->mutex;
+
+	raw_spin_lock(&mutex->wait_lock);
+
+	/*
+	 * Writers have normal pi with other tasks blocked
+	 * on the lock. That is, the top waiter will be in the
+	 * pi_list of this task. But for readers, waiters of
+	 * the lock are not included in the pi_list, only the
+	 * locks are. We must remove the top waiter of this
+	 * lock from this task.
+	 */
+	if (rt_mutex_has_waiters(mutex)) {
+		struct rt_mutex_waiter *waiter;
+		unsigned long flags;
+
+		waiter = rt_mutex_top_waiter(mutex);
+		raw_spin_lock_irqsave(&task->pi_lock, flags);
+		plist_del(&waiter->pi_list_entry, &task->pi_waiters);
+		/*
+		 * The rt_rw_mutex_take_as_reader() will update
+		 * the rwmutex prio.
+		 */
+		raw_spin_unlock_irqrestore(&task->pi_lock, flags);
+	}
+
+	WARN_ONCE(rwmutex->nr_owners, "Writer owned with readers");
+
+	rt_rw_mutex_take_as_reader(task, rwmutex, NULL);
+
+	raw_spin_unlock(&mutex->wait_lock);
+}
+
+static inline int task_has_reader_locks(struct task_struct *task)
+{
+	return task->reader_lock_count;
+}
+
+static int rt_mutex_get_readers_prio(struct task_struct *task, int prio)
+{
+	struct reader_lock_struct *rls;
+	struct rt_rw_mutex *rwmutex;
+	int lock_prio;
+	int i;
+
+	for (i = 0; i < task->reader_lock_count; i++) {
+		rls = &task->owned_read_locks[i];
+		rwmutex = rls->lock;
+		if (!rwmutex)
+			continue;
+		lock_prio = rwmutex->prio;
+		if (prio > lock_prio)
+			prio = lock_prio;
+	}
+	WARN_ON_ONCE(prio < 0);
+
+	return prio;
+}
+
+/* Expects to be called with lock->wait_lock held */
+static int rt_mutex_adjust_readers(struct rt_mutex *orig_lock,
+				   struct rt_mutex_waiter *orig_waiter,
+				   struct task_struct *top_task,
+				   struct rt_mutex *lock,
+				   int recursion_depth)
+{
+	struct reader_lock_struct *rls;
+	struct rt_mutex_waiter *waiter;
+	struct task_struct *task;
+	struct rt_rw_mutex *rwmutex = container_of(lock, struct rt_rw_mutex, mutex);
+	unsigned long flags;
+
+	/* Update the rwmutex's prio */
+	if (rt_mutex_has_waiters(lock)) {
+		waiter = rt_mutex_top_waiter(lock);
+		/*
+		 * Do we need to grab the task->pi_lock?
+		 * Really, we are only reading it. If it
+		 * changes, then that should follow this chain
+		 * too.
+		 */
+		rwmutex->prio = waiter->task->prio;
+	} else
+		rwmutex->prio = MAX_PRIO;
+
+	if (recursion_depth >= MAX_RWLOCK_DEPTH) {
+		WARN_ON(1);
+		return 1;
+	}
+
+	list_for_each_entry(rls, &rwmutex->owners, list) {
+		bool skip = false;
+
+		task = rls->task;
+
+		raw_spin_lock_irqsave(&task->pi_lock, flags);
+		__rt_mutex_adjust_prio(task);
+		/*
+		 * We need to grab the pi_lock to adjust the task prio
+		 * might as well use this to check if the task is blocked
+		 * as well, and save on a call to the prio chain that will
+		 * just grab the lock again and do the test.
+		 */
+		if (!rt_mutex_real_waiter(task->pi_blocked_on))
+			skip = true;
+		raw_spin_unlock_irqrestore(&task->pi_lock, flags);
+
+		if (skip)
+			continue;
+
+		get_task_struct(task);
+		/*
+		 * rt_mutex_adjust_prio_chain will do
+		 * the put_task_struct
+		 */
+		rt_mutex_adjust_prio_chain(task, 0, orig_lock,
+					   orig_waiter, top_task,
+					   recursion_depth+1);
+	}
+
+	return 0;
+}
 #else
+static int rt_mutex_adjust_readers(struct rt_mutex *orig_lock,
+				   struct rt_mutex_waiter *orig_waiter,
+				   struct task_struct *top_task,
+				   struct rt_mutex *lock,
+				   int recursion_depth)
+{
+	return 0;
+}
+
+static int rt_mutex_get_readers_prio(struct task_struct *task, int prio)
+{
+	return prio;
+}
+
+static inline int task_has_reader_locks(struct task_struct *task)
+{
+	return 0;
+}
+
 static inline int __sched
 __mutex_lock_check_stamp(struct rt_mutex *lock, struct ww_acquire_ctx *ctx)
 {
 	BUG();
 	return 0;
 }
-
-#endif
+#endif /* CONFIG_PREEMPT_RT */
 
 /**
  * __rt_mutex_slowlock() - Perform the wait-wake-try-to-take loop
@@ -1154,7 +1769,7 @@ rt_mutex_slowlock(struct rt_mutex *lock,
 	struct rt_mutex_waiter waiter;
 	int ret = 0;
 
-	rt_mutex_init_waiter(&waiter, false);
+	rt_mutex_init_waiter(&waiter, false, false);
 
 	raw_spin_lock(&lock->wait_lock);
 	init_lists(lock);
@@ -1718,4 +2333,21 @@ void __sched ww_mutex_unlock(struct ww_m
 	rt_mutex_unlock(&lock->base.lock);
 }
 EXPORT_SYMBOL(ww_mutex_unlock);
+
+/*
+ * On boot up, rt_rw_limit is set to NR_CPUS. At the end of boot
+ * we can lower that to actual CPUs as everything should be running
+ * as it most likely will on a normal system.
+ *
+ * Note, benchmarks have shown that the best performance we get
+ * from doing a page fault stress test on threads, is when
+ * rt_rw_limit is set to 2x online CPUs.
+ */
+static int __init rt_rw_limit_init(void)
+{
+	rt_rw_limit = nr_cpu_ids * 2;
+	return 0;
+}
+late_initcall(rt_rw_limit_init);
+
 #endif
Index: linux-rt.git/kernel/rtmutex_common.h
===================================================================
--- linux-rt.git.orig/kernel/rtmutex_common.h
+++ linux-rt.git/kernel/rtmutex_common.h
@@ -43,6 +43,7 @@ extern void schedule_rt_mutex_test(struc
  * @list_entry:		pi node to enqueue into the mutex waiters list
  * @pi_list_entry:	pi node to enqueue into the mutex owner waiters list
  * @task:		task reference to the blocked task
+ * @writer:		true if its a rwsem writer that is blocked
  */
 struct rt_mutex_waiter {
 	struct plist_node	list_entry;
@@ -50,6 +51,7 @@ struct rt_mutex_waiter {
 	struct task_struct	*task;
 	struct rt_mutex		*lock;
 	bool			savestate;
+	bool			writer;
 #ifdef CONFIG_DEBUG_RT_MUTEXES
 	unsigned long		ip;
 	struct pid		*deadlock_task_pid;
@@ -101,6 +103,20 @@ static inline struct task_struct *rt_mut
 		((unsigned long)lock->owner & ~RT_MUTEX_OWNER_MASKALL);
 }
 
+/* used as reader owner of the mutex */
+#define RT_RW_READER		(struct task_struct *)0x100
+
+#ifdef CONFIG_PREEMPT_RT_FULL
+
+void rt_rw_mutex_read_unlock(struct rt_rw_mutex *rwmutex);
+void rt_rw_mutex_downgrade_write(struct rt_rw_mutex *rwmutex);
+int rt_rw_mutex_write_trylock(struct rt_rw_mutex *rwmutex);
+void rt_rw_mutex_write_lock(struct rt_rw_mutex *rwmutex);
+int rt_rw_mutex_read_trylock(struct rt_rw_mutex *rwmutex);
+void rt_rw_mutex_read_lock(struct rt_rw_mutex *rwmutex);
+
+#endif /* CONFIG_PREEMPT_RT */
+
 /*
  * PI-futex support (proxy locking functions, etc.):
  */
@@ -128,11 +144,12 @@ extern int rt_mutex_finish_proxy_lock(st
 #endif
 
 static inline void
-rt_mutex_init_waiter(struct rt_mutex_waiter *waiter, bool savestate)
+rt_mutex_init_waiter(struct rt_mutex_waiter *waiter, bool savestate, bool writer)
 {
 	debug_rt_mutex_init_waiter(waiter);
 	waiter->task = NULL;
 	waiter->savestate = savestate;
+	waiter->writer = writer;
 }
 
 #endif
Index: linux-rt.git/kernel/rt.c
===================================================================
--- linux-rt.git.orig/kernel/rt.c
+++ linux-rt.git/kernel/rt.c
@@ -310,14 +310,19 @@ EXPORT_SYMBOL(__rt_rwlock_init);
 void  rt_up_write(struct rw_semaphore *rwsem)
 {
 	rwsem_release(&rwsem->dep_map, 1, _RET_IP_);
-	rt_mutex_unlock(&rwsem->lock);
+	/*
+	 * Unlocking a write is the same as unlocking the mutex.
+	 * The woken reader will do all the work if it needs to
+	 * wake up more than one reader.
+	 */
+	__rt_spin_unlock(&rwsem->rwlock.mutex);
 }
 EXPORT_SYMBOL(rt_up_write);
 
 void  rt_up_read(struct rw_semaphore *rwsem)
 {
 	rwsem_release(&rwsem->dep_map, 1, _RET_IP_);
-	rt_mutex_unlock(&rwsem->lock);
+	rt_rw_mutex_read_unlock(&rwsem->rwlock);
 }
 EXPORT_SYMBOL(rt_up_read);
 
@@ -327,13 +332,14 @@ EXPORT_SYMBOL(rt_up_read);
  */
 void  rt_downgrade_write(struct rw_semaphore *rwsem)
 {
-	BUG_ON(rt_mutex_owner(&rwsem->lock) != current);
+	BUG_ON(rt_mutex_owner(&rwsem->rwlock.mutex) != current);
+	rt_rw_mutex_downgrade_write(&rwsem->rwlock);
 }
 EXPORT_SYMBOL(rt_downgrade_write);
 
 int  rt_down_write_trylock(struct rw_semaphore *rwsem)
 {
-	int ret = rt_mutex_trylock(&rwsem->lock);
+	int ret = rt_rw_mutex_write_trylock(&rwsem->rwlock);
 
 	if (ret)
 		rwsem_acquire(&rwsem->dep_map, 0, 1, _RET_IP_);
@@ -344,14 +350,14 @@ EXPORT_SYMBOL(rt_down_write_trylock);
 void  rt_down_write(struct rw_semaphore *rwsem)
 {
 	rwsem_acquire(&rwsem->dep_map, 0, 0, _RET_IP_);
-	rt_mutex_lock(&rwsem->lock);
+	rt_rw_mutex_write_lock(&rwsem->rwlock);
 }
 EXPORT_SYMBOL(rt_down_write);
 
 void  rt_down_write_nested(struct rw_semaphore *rwsem, int subclass)
 {
 	rwsem_acquire(&rwsem->dep_map, subclass, 0, _RET_IP_);
-	rt_mutex_lock(&rwsem->lock);
+	rt_rw_mutex_write_lock(&rwsem->rwlock);
 }
 EXPORT_SYMBOL(rt_down_write_nested);
 
@@ -359,14 +365,14 @@ void rt_down_write_nested_lock(struct rw
 		struct lockdep_map *nest)
 {
 	rwsem_acquire_nest(&rwsem->dep_map, 0, 0, nest, _RET_IP_);
-	rt_mutex_lock(&rwsem->lock);
+	rt_rw_mutex_write_lock(&rwsem->rwlock);
 }
 
 int  rt_down_read_trylock(struct rw_semaphore *rwsem)
 {
 	int ret;
 
-	ret = rt_mutex_trylock(&rwsem->lock);
+	ret = rt_rw_mutex_read_trylock(&rwsem->rwlock);
 	if (ret)
 		rwsem_acquire(&rwsem->dep_map, 0, 1, _RET_IP_);
 
@@ -377,7 +383,7 @@ EXPORT_SYMBOL(rt_down_read_trylock);
 static void __rt_down_read(struct rw_semaphore *rwsem, int subclass)
 {
 	rwsem_acquire(&rwsem->dep_map, subclass, 0, _RET_IP_);
-	rt_mutex_lock(&rwsem->lock);
+	rt_rw_mutex_read_lock(&rwsem->rwlock);
 }
 
 void  rt_down_read(struct rw_semaphore *rwsem)
@@ -402,7 +408,8 @@ void  __rt_rwsem_init(struct rw_semaphor
 	debug_check_no_locks_freed((void *)rwsem, sizeof(*rwsem));
 	lockdep_init_map(&rwsem->dep_map, name, key, 0);
 #endif
-	rwsem->lock.save_state = 0;
+	rt_rw_mutex_init(&rwsem->rwlock);
+	rwsem->rwlock.mutex.save_state = 0;
 }
 EXPORT_SYMBOL(__rt_rwsem_init);
 
Index: linux-rt.git/include/linux/sched.h
===================================================================
--- linux-rt.git.orig/include/linux/sched.h
+++ linux-rt.git/include/linux/sched.h
@@ -993,6 +993,22 @@ struct sched_entity {
 #endif
 };
 
+#ifdef CONFIG_PREEMPT_RT_FULL
+struct rt_rw_mutex;
+/**
+ * reader_lock_struct
+ *
+ * @lock:	pointer to rwsem that is held
+ * @task:	pointer back to task, for lock code
+ * @list_head:	link into rt_rw_mutex owners list
+ */
+struct reader_lock_struct {
+	struct rt_rw_mutex		*lock;
+	struct task_struct		*task;
+	struct list_head		list;
+};
+#endif
+
 struct sched_rt_entity {
 	struct list_head run_list;
 	unsigned long timeout;
@@ -1224,6 +1240,10 @@ struct task_struct {
 #ifdef CONFIG_PREEMPT_RT_FULL
 	/* TODO: move me into ->restart_block ? */
 	struct siginfo forced_info;
+#define MAX_RWLOCK_DEPTH 5
+	int reader_lock_count;	/* index of last element in owned_read_locks */
+	int reader_lock_free;	/* index of next free element. */
+	struct reader_lock_struct owned_read_locks[MAX_RWLOCK_DEPTH];
 #endif
 
 	unsigned long sas_ss_sp;
Index: linux-rt.git/kernel/fork.c
===================================================================
--- linux-rt.git.orig/kernel/fork.c
+++ linux-rt.git/kernel/fork.c
@@ -1385,6 +1385,26 @@ static struct task_struct *copy_process(
 	if (retval)
 		goto bad_fork_cleanup_io;
 
+#ifdef CONFIG_PREEMPT_RT_FULL
+	p->reader_lock_count = 0;
+	p->reader_lock_free = 0;
+	/* Bracket to keep 'i' local */
+	{
+		int i;
+		/*
+		 * We could put the initialization of this list in
+		 * the grabbing of the lock, but it is safer to
+		 * do it now. The list head initialization may be
+		 * removed, but we'll keep it for now, just to be safe.
+		 */
+		for (i = 0; i < MAX_RWLOCK_DEPTH; i++) {
+			p->owned_read_locks[i].lock = NULL;
+			p->owned_read_locks[i].task = p;
+			INIT_LIST_HEAD(&p->owned_read_locks[i].list);
+		}
+	}
+#endif
+
 	if (pid != &init_struct_pid) {
 		retval = -ENOMEM;
 		pid = alloc_pid(p->nsproxy->pid_ns_for_children);
Index: linux-rt.git/kernel/sysctl.c
===================================================================
--- linux-rt.git.orig/kernel/sysctl.c
+++ linux-rt.git/kernel/sysctl.c
@@ -91,6 +91,10 @@
 #include <linux/nmi.h>
 #endif
 
+#ifdef CONFIG_PREEMPT_RT_FULL
+extern int rt_rw_limit;
+#endif
+
 
 #if defined(CONFIG_SYSCTL)
 
@@ -453,6 +457,15 @@ static struct ctl_table kern_table[] = {
 		.proc_handler	= proc_dointvec,
 	},
 #endif
+#ifdef CONFIG_PREEMPT_RT_FULL
+	{
+		.procname	= "rwsem_reader_limit",
+		.data		= &rt_rw_limit,
+		.maxlen		= sizeof(int),
+		.mode		= 0644,
+		.proc_handler	= &proc_dointvec,
+	},
+#endif
 	{
 		.procname	= "panic",
 		.data		= &panic_timeout,
Index: linux-rt.git/include/linux/rtmutex.h
===================================================================
--- linux-rt.git.orig/include/linux/rtmutex.h
+++ linux-rt.git/include/linux/rtmutex.h
@@ -42,6 +42,21 @@ struct rt_mutex {
 #endif
 };
 
+/**
+ * The rt_rw_mutex structure
+ *
+ * @rt_mutex:	The mutex to wait on
+ * @owners:	list of read owners of the mutex
+ * @nr_owners:	number of read owners
+ * @prio:	the priority of the highest waiter
+ */
+struct rt_rw_mutex {
+	struct rt_mutex		mutex;
+	struct list_head	owners;
+	int			nr_owners;
+	int			prio;
+};
+
 struct rt_mutex_waiter;
 struct hrtimer_sleeper;
 
@@ -75,6 +90,15 @@ struct hrtimer_sleeper;
 		__rt_mutex_init(mutex, #mutex);			\
 	} while (0)
 
+# define rt_rw_mutex_init(rwmutex)					\
+	do {								\
+		raw_spin_lock_init(&(rwmutex)->mutex.wait_lock);	\
+		INIT_LIST_HEAD(&(rwmutex)->owners);			\
+		(rwmutex)->nr_owners = 0;				\
+		(rwmutex)->prio = -1;					\
+		__rt_mutex_init(&(rwmutex)->mutex, #rwmutex);		\
+	} while (0)
+
 #define __RT_MUTEX_INITIALIZER_PLAIN(mutexname) \
 	.wait_lock = __RAW_SPIN_LOCK_UNLOCKED(mutexname.wait_lock) \
 	, .wait_list = PLIST_HEAD_INIT(mutexname.wait_list) \
@@ -85,6 +109,11 @@ struct hrtimer_sleeper;
 #define __RT_MUTEX_INITIALIZER(mutexname) \
 	{ __RT_MUTEX_INITIALIZER_PLAIN(mutexname) }
 
+#define __RT_RW_MUTEX_INITIALIZER(mutexname)			\
+	{ .owners = LIST_HEAD_INIT(mutexname.owners)		\
+	  , .prio = -1						\
+	  , .mutex = __RT_MUTEX_INITIALIZER(mutexname.mutex) }
+
 #define __RT_MUTEX_INITIALIZER_SAVE_STATE(mutexname) \
 	{ __RT_MUTEX_INITIALIZER_PLAIN(mutexname)    \
 	  , .save_state = 1 }
Index: linux-rt.git/include/linux/rwsem_rt.h
===================================================================
--- linux-rt.git.orig/include/linux/rwsem_rt.h
+++ linux-rt.git/include/linux/rwsem_rt.h
@@ -19,14 +19,14 @@
 #include <linux/rtmutex.h>
 
 struct rw_semaphore {
-	struct rt_mutex		lock;
+	struct rt_rw_mutex	rwlock;
 #ifdef CONFIG_DEBUG_LOCK_ALLOC
 	struct lockdep_map	dep_map;
 #endif
 };
 
 #define __RWSEM_INITIALIZER(name) \
-	{ .lock = __RT_MUTEX_INITIALIZER(name.lock), \
+	{ .rwlock = __RT_RW_MUTEX_INITIALIZER(name.rwlock), \
 	  RW_DEP_MAP_INIT(name) }
 
 #define DECLARE_RWSEM(lockname) \
@@ -37,7 +37,7 @@ extern void  __rt_rwsem_init(struct rw_s
 
 #define __rt_init_rwsem(sem, name, key)			\
 	do {						\
-		rt_mutex_init(&(sem)->lock);		\
+		rt_rw_mutex_init(&(sem)->rwlock);	\
 		__rt_rwsem_init((sem), (name), (key));\
 	} while (0)
 
@@ -63,7 +63,7 @@ extern void  rt_up_write(struct rw_semap
 extern void  rt_downgrade_write(struct rw_semaphore *rwsem);
 
 #define init_rwsem(sem)		rt_init_rwsem(sem)
-#define rwsem_is_locked(s)	rt_mutex_is_locked(&(s)->lock)
+#define rwsem_is_locked(s)	rt_mutex_is_locked(&(s)->rwlock.mutex)
 
 static inline void down_read(struct rw_semaphore *sem)
 {
Index: linux-rt.git/kernel/futex.c
===================================================================
--- linux-rt.git.orig/kernel/futex.c
+++ linux-rt.git/kernel/futex.c
@@ -2327,7 +2327,7 @@ static int futex_wait_requeue_pi(u32 __u
 	 * The waiter is allocated on our stack, manipulated by the requeue
 	 * code while we sleep on uaddr.
 	 */
-	rt_mutex_init_waiter(&rt_waiter, false);
+	rt_mutex_init_waiter(&rt_waiter, false, false);
 
 	ret = get_futex_key(uaddr2, flags & FLAGS_SHARED, &key2, VERIFY_WRITE);
 	if (unlikely(ret != 0))

^ permalink raw reply	[flat|nested] 28+ messages in thread

* Re: [RFC PATCH RT] rwsem: The return of multi-reader PI rwsems
  2014-04-10 14:28   ` Mike Galbraith
  2014-04-10 14:32     ` Steven Rostedt
@ 2014-04-11  2:50     ` Mike Galbraith
  2014-04-11  3:25       ` Steven Rostedt
  1 sibling, 1 reply; 28+ messages in thread
From: Mike Galbraith @ 2014-04-11  2:50 UTC (permalink / raw)
  To: Steven Rostedt
  Cc: LKML, linux-rt-users, Paul E. McKenney, Paul Gortmaker,
	Thomas Gleixner, Sebastian Andrzej Siewior, Clark Williams,
	Frederic Weisbecker, Peter Zijlstra, Ingo Molnar

Oh, there was another useful bit. 

3.12.17-rt25       403991     384216     377786
                   405466     394011     392850

3.12.17-nopreempt  109049     186133     118766  !?!.. ew, TTWU_QUEUE
                   350385     318786     367336  !TTWU_QUEUE
                   326009     356939     378215  !TTWU_QUEUE

IPI == low-pass filter.

-Mike


^ permalink raw reply	[flat|nested] 28+ messages in thread

* Re: [RFC PATCH RT] rwsem: The return of multi-reader PI rwsems
  2014-04-11  2:50     ` Mike Galbraith
@ 2014-04-11  3:25       ` Steven Rostedt
  2014-04-11  3:52         ` Mike Galbraith
  0 siblings, 1 reply; 28+ messages in thread
From: Steven Rostedt @ 2014-04-11  3:25 UTC (permalink / raw)
  To: Mike Galbraith
  Cc: LKML, linux-rt-users, Paul E. McKenney, Paul Gortmaker,
	Thomas Gleixner, Sebastian Andrzej Siewior, Clark Williams,
	Frederic Weisbecker, Peter Zijlstra, Ingo Molnar

On Fri, 11 Apr 2014 04:50:26 +0200
Mike Galbraith <umgwanakikbuti@gmail.com> wrote:

> Oh, there was another useful bit. 
> 
> 3.12.17-rt25       403991     384216     377786
>                    405466     394011     392850
> 
> 3.12.17-nopreempt  109049     186133     118766  !?!.. ew, TTWU_QUEUE
>                    350385     318786     367336  !TTWU_QUEUE
>                    326009     356939     378215  !TTWU_QUEUE
> 
> IPI == low-pass filter.

I'm sorry, but I don't have the foggiest clue to what the above means.

-- Steve

^ permalink raw reply	[flat|nested] 28+ messages in thread

* Re: [RFC PATCH RT] rwsem: The return of multi-reader PI rwsems
  2014-04-11  3:25       ` Steven Rostedt
@ 2014-04-11  3:52         ` Mike Galbraith
  2014-04-11  4:25           ` Mike Galbraith
  0 siblings, 1 reply; 28+ messages in thread
From: Mike Galbraith @ 2014-04-11  3:52 UTC (permalink / raw)
  To: Steven Rostedt
  Cc: LKML, linux-rt-users, Paul E. McKenney, Paul Gortmaker,
	Thomas Gleixner, Sebastian Andrzej Siewior, Clark Williams,
	Frederic Weisbecker, Peter Zijlstra, Ingo Molnar

On Thu, 2014-04-10 at 23:25 -0400, Steven Rostedt wrote: 
> On Fri, 11 Apr 2014 04:50:26 +0200
> Mike Galbraith <umgwanakikbuti@gmail.com> wrote:
> 
> > Oh, there was another useful bit. 
> > 
> > 3.12.17-rt25       403991     384216     377786
> >                    405466     394011     392850
> > 
> > 3.12.17-nopreempt  109049     186133     118766  !?!.. ew, TTWU_QUEUE
> >                    350385     318786     367336  !TTWU_QUEUE
> >                    326009     356939     378215  !TTWU_QUEUE
> > 
> > IPI == low-pass filter.
> 
> I'm sorry, but I don't have the foggiest clue to what the above means.

It means..

# Overhead                                                                                                                                                                                   Symbol
# ........  .......................................................................................................................................................................................
#
    17.33%  [k] native_write_msr_safe
            |
            |--88.45%-- __x2apic_send_IPI_mask
            |          |
            |          |--97.89%-- try_to_wake_up
            |          |          |
            |          |          |--99.91%-- wake_futex
            |          |          |          |
            |          |          |          |--99.77%-- futex_wake_op
            |          |          |          |          do_futex
            |          |          |          |          sys_futex
            |          |          |          |          system_call_fastpath

..wakeup frequency is restricted by the IPI.

	-Mike


^ permalink raw reply	[flat|nested] 28+ messages in thread

* Re: [RFC PATCH RT] rwsem: The return of multi-reader PI rwsems
  2014-04-11  3:52         ` Mike Galbraith
@ 2014-04-11  4:25           ` Mike Galbraith
  0 siblings, 0 replies; 28+ messages in thread
From: Mike Galbraith @ 2014-04-11  4:25 UTC (permalink / raw)
  To: Steven Rostedt
  Cc: LKML, linux-rt-users, Paul E. McKenney, Paul Gortmaker,
	Thomas Gleixner, Sebastian Andrzej Siewior, Clark Williams,
	Frederic Weisbecker, Peter Zijlstra, Ingo Molnar

On Fri, 2014-04-11 at 05:52 +0200, Mike Galbraith wrote: 
> On Thu, 2014-04-10 at 23:25 -0400, Steven Rostedt wrote: 
> > On Fri, 11 Apr 2014 04:50:26 +0200
> > Mike Galbraith <umgwanakikbuti@gmail.com> wrote:
> > 
> > > Oh, there was another useful bit. 
> > > 
> > > 3.12.17-rt25       403991     384216     377786
> > >                    405466     394011     392850
> > > 
> > > 3.12.17-nopreempt  109049     186133     118766  !?!.. ew, TTWU_QUEUE
> > >                    350385     318786     367336  !TTWU_QUEUE
> > >                    326009     356939     378215  !TTWU_QUEUE
> > > 
> > > IPI == low-pass filter.
> > 
> > I'm sorry, but I don't have the foggiest clue to what the above means.
> 
> It means..
> 
> # Overhead                                                                                                                                                                                   Symbol
> # ........  .......................................................................................................................................................................................
> #
>     17.33%  [k] native_write_msr_safe
>             |
>             |--88.45%-- __x2apic_send_IPI_mask
>             |          |
>             |          |--97.89%-- try_to_wake_up
>             |          |          |
>             |          |          |--99.91%-- wake_futex
>             |          |          |          |
>             |          |          |          |--99.77%-- futex_wake_op
>             |          |          |          |          do_futex
>             |          |          |          |          sys_futex
>             |          |          |          |          system_call_fastpath
> 
> ..wakeup frequency is restricted by the IPI.

Turn TTWU_QUEUE off, bottleneck goes away.

# Overhead                                                                                                                                                                                   Symbol
# ........  .......................................................................................................................................................................................
#
     6.12%  [.] _ZN13ObjectMonitor20TrySpin_VaryDurationEP6Thread
            |
            |--61.27%-- _ZN13ObjectMonitor5enterEP6Thread
            |          |
            |          |--99.96%-- _ZN13SharedRuntime26complete_monitor_locking_CEP7oopDescP9BasicLockP10JavaThread
            |          |          0x7f93149b2748
            |          |          |
            |          |          |--99.76%-- 0xf1a02b30
            |          |          |          0xbaba00000000fc24
            |          |           --0.24%-- [...]
            |           --0.04%-- [...]
            |
            |--38.40%-- _ZN13ObjectMonitor6EnterIEP6Thread
            |          _ZN13ObjectMonitor5enterEP6Thread
            |          |
            |          |--99.98%-- _ZN13SharedRuntime26complete_monitor_locking_CEP7oopDescP9BasicLockP10JavaThread
            |          |          0x7f93149b2748
            |          |          |
            |          |          |--99.89%-- 0xf1a02b30
            |          |          |          0xbaba00000000fc24
            |          |           --0.11%-- [...]
            |           --0.02%-- [...]
             --0.32%-- [...]

     3.59%  [k] intel_idle
            |
            --- cpuidle_enter_state
                cpuidle_idle_call
                arch_cpu_idle
                cpu_startup_entry



^ permalink raw reply	[flat|nested] 28+ messages in thread

* Re: [RFC PATCH RT V3] rwsem: The return of multi-reader PI rwsems
  2014-04-11  2:35   ` [RFC PATCH RT V3] " Steven Rostedt
@ 2014-04-11 12:47     ` Carsten Emde
  2014-04-11 13:25       ` Steven Rostedt
  0 siblings, 1 reply; 28+ messages in thread
From: Carsten Emde @ 2014-04-11 12:47 UTC (permalink / raw)
  To: Steven Rostedt, LKML, linux-rt-users
  Cc: Mike Galbraith, Paul E. McKenney, Paul Gortmaker,
	Thomas Gleixner, Sebastian Andrzej Siewior, Clark Williams,
	Frederic Weisbecker, Peter Zijlstra, Ingo Molnar

Hi Steven,

> [..] I added Carsten to the Cc, so I'll post the entire change log
> of v1 here again.
I've been listening and testing boxes all the time ...

> [..] If you have any benchmark on large machines I would be very
> happy if you could test this patch against the unpatched version of
> -rt.
Three machines
- an X32 x86_64 (AMD Opteron 6272 @2100 MHz) at rack #1/slot #1,
- an X4x2 x86_64 (Intel i7-2600K @3400 MHz) at rack #4/slot #6, and
- an X4 ARM (i.MX6 Quad @996 MHz) at rack #8/slot #7
are running a v3-patched 3.12.15-rt25 kernel now. I'll equip more
machines later.

What I can say so far is:
- No evidence for any regression, no crashes
- Performance certainly at least as good as unpatched, probably better

I'll do more tests and come back with more precise performance
comparison data.

Thanks,
	-Carsten.

^ permalink raw reply	[flat|nested] 28+ messages in thread

* Re: [RFC PATCH RT V3] rwsem: The return of multi-reader PI rwsems
  2014-04-11 12:47     ` Carsten Emde
@ 2014-04-11 13:25       ` Steven Rostedt
  2014-04-17 23:26         ` [RFC PATCH RT V4] " Steven Rostedt
  0 siblings, 1 reply; 28+ messages in thread
From: Steven Rostedt @ 2014-04-11 13:25 UTC (permalink / raw)
  To: Carsten Emde
  Cc: LKML, linux-rt-users, Mike Galbraith, Paul E. McKenney,
	Paul Gortmaker, Thomas Gleixner, Sebastian Andrzej Siewior,
	Clark Williams, Frederic Weisbecker, Peter Zijlstra, Ingo Molnar

On Fri, 11 Apr 2014 14:47:49 +0200
Carsten Emde <C.Emde@osadl.org> wrote:

> Hi Steven,
> 
> > [..] I added Carsten to the Cc, so I'll post the entire change log
> > of v1 here again.
> I've been listening and testing boxes all the time ...
> 

I figured but still wanted to add you to the Cc.

> > [..] If you have any benchmark on large machines I would be very
> > happy if you could test this patch against the unpatched version of
> > -rt.
> Three machines
> - an X32 x86_64 (AMD Opteron 6272 @2100 MHz) at rack #1/slot #1,
> - an X4x2 x86_64 (Intel i7-2600K @3400 MHz) at rack #4/slot #6, and
> - an X4 ARM (i.MX6 Quad @996 MHz) at rack #8/slot #7
> are running a v3-patched 3.12.15-rt25 kernel now. I'll equip more
> machines later.
> 
> What I can say so far is:
> - No evidence for any regression, no crashes

That's good to hear.

> - Performance certainly at least as good as unpatched, probably better

That's even better.

> 
> I'll do more tests and come back with more precise performance
> comparison data.

Do you also have any threaded tests? That is, something like a java
benchmark that kicks off lots of threads. That's where the performance
should show up the most. Clark's whack_mmap_sem test is specific to
this, and has shown an 10x increase in performance with my patch. But
that's a micro benchmark. A better test would be a real java
application.

Thanks,

-- Steve

^ permalink raw reply	[flat|nested] 28+ messages in thread

* Re: [RFC PATCH RT] rwsem: The return of multi-reader PI rwsems
  2014-04-10 14:44 ` Clark Williams
  2014-04-10 15:01   ` Steven Rostedt
  2014-04-10 15:03   ` Sebastian Andrzej Siewior
@ 2014-04-11 21:39   ` Daniel Bristot de Oliveira
  2 siblings, 0 replies; 28+ messages in thread
From: Daniel Bristot de Oliveira @ 2014-04-11 21:39 UTC (permalink / raw)
  To: Clark Williams, Steven Rostedt
  Cc: LKML, linux-rt-users, Mike Galbraith, Paul E. McKenney,
	Paul Gortmaker, Thomas Gleixner, Sebastian Andrzej Siewior,
	Frederic Weisbecker, Peter Zijlstra, Ingo Molnar



On 04/10/2014 11:44 AM, Clark Williams wrote:
> On Wed, 9 Apr 2014 15:19:22 -0400
> Steven Rostedt <rostedt@goodmis.org> wrote:
>
>
>> This patch is built on top of the two other patches that I posted
>> earlier, which should not be as controversial.
>>
>> If you have any benchmark on large machines I would be very happy if
>> you could test this patch against the unpatched version of -rt.
>>
>> Cheers,
>>
>> -- Steve
>>
>
> Steven
>
> I wrote a program named whack_mmap_sem which creates a large (4GB)
> buffer, then creates 2 x ncpus threads that are affined across all the
> available cpus. These threads then randomly write into the buffer,
> which should cause page faults galore.
>
> I then built the following kernel configs:
>
>    vanilla-3.13.15  - no RT patches applied
>    rt-3.12.15       - PREEMPT_RT patchset
>    rt-3.12.15-fixes - PREEMPT_RT + rwsem fixes
>    rt-3.12.15-multi - PREEMPT_RT + rwsem fixes + rwsem-multi patch
>
> My test h/w was a Dell R520 with a 6-core Intel(R) Xeon(R) CPU E5-2430
> 0 @ 2.20GHz (hyperthreaded). So whack_mmap_sem created 24 threads
> which all partied in the 4GB address range.
>
> I ran whack_mmap_sem with the argument -w 100000 which means each
> thread does 100k writes to random locations inside the buffer and then
> did five runs per each kernel. At the end of the run whack_mmap_sem
> prints out the time of the run in microseconds.
>
> The means of each group of five test runs are:
>
>    vanilla.log:  1210117
>         rt.log:  17210953 (14.2 x slower than vanilla)
>   rt-fixes.log:  10062027 (8.3 x slower than vanilla)
>   rt-multi.log:  3179582  (2.x x slower than vanilla)
>

Hi

I ran Clark's test on a machine with 32 CPUs: 2 Sockets, 8 core/socket + HT

On this machine I ran 5 different kernels:

Vanilla:     3.12.15 - Vanilla
RT:          3.12.15 + Preempt-RT 3.12.15-rt25
FIX:         RT + rwsem fixes from rostedt
Multi:       FIX + Multi-reader PI
Multi -FULL: Multi + CONFIG_PREEMPT=y

I ran the test with the same parameters that Clark used, 100 iterations 
for each kernel. For each kernel I measure the min and max execution 
time, along with the avg execution time and the standard deviation.

The result was:

+-------+---------+----------+----------+-----------+-------------+
|       | Vanilla |    RT    |    FIX   |   Multi   | Multi -FULL |
--------+---------+----------+----------+-----------+-------------+
|MIN:   | 3806754 |  6092939 |  6324665 |   2633614 |     3867240 |
|AVG:   | 3875201 |  8162832 |  8007934 |   2736253 |     3961607 |
|MAX:   | 4062205 | 10951416 | 10574212 |   2972458 |     4139297 |
|STDEV: |   47645 |   927839 |   943482 |     52579 |      943482 |
+-------+---------+----------+----------+-----------+-------------+

A comparative of avg case to vanilla:

RT                    - 2.10x (slower)
FIX                   - 2.06x (slower)
Multi                 - 0.70x (faster?)
Multi no PREEMPT_FULL - 1.02x (equals?)

As we can see, the patch gave good results on Preempt-RT, but my results 
was a little bit weird, because the PREEMPT-RT + Multi patch became 
faster than vanilla.

In the standard deviation, the patch showed a good result as well, with 
the patch the std dev became ~17x smaller than on RT kernel without the 
patch, which means less jitter.

-- Daniel

^ permalink raw reply	[flat|nested] 28+ messages in thread

* Re: [RFC PATCH RT] rwsem: The return of multi-reader PI rwsems
  2014-04-09 19:19 [RFC PATCH RT] rwsem: The return of multi-reader PI rwsems Steven Rostedt
                   ` (2 preceding siblings ...)
  2014-04-10 17:42 ` [RFC PATCH RT V2] " Steven Rostedt
@ 2014-04-14  9:55 ` Ingo Molnar
  2014-04-14 13:34   ` Steven Rostedt
  3 siblings, 1 reply; 28+ messages in thread
From: Ingo Molnar @ 2014-04-14  9:55 UTC (permalink / raw)
  To: Steven Rostedt
  Cc: LKML, linux-rt-users, Mike Galbraith, Paul E. McKenney,
	Paul Gortmaker, Thomas Gleixner, Sebastian Andrzej Siewior,
	Clark Williams, Frederic Weisbecker, Peter Zijlstra


* Steven Rostedt <rostedt@goodmis.org> wrote:

> A while back ago I wrote a patch that would allow for reader/writer
> locks like rwlock and rwsems to have multiple readers in PREEMPT_RT. It
> was slick and fast but unfortunately it was way too complex and ridden
> with nasty little critters which earned me my large collection of
> frozen sharks in the fridge (which are quite tasty).
> 
> The main problem with my previous solution was that I tried to be too
> clever. I worked hard on making the rw mutex still have the "fast
> path". That is, the cmpxchg that could allow a non contended grabbing
> of the lock be one instruction and be off with it. But to get that
> working required lots of tricks and black magic that was certainly
> going to fail. Thus, with the raining of sharks on my parade, the
> priority inheritance mutex with multiple owners died a slow painful
> death.
> 
> So we thought.
> 
> But over the years, a new darkness was on the horizon. Complaints about
> running highly threaded processes (did I hear Java?) were suffering
> huge performance hits on the PREEMPT_RT kernel. Whether or not the
> processes were real-time tasks, they still were horrible compared to
> running the same tasks on the mainline kernel. Note, this was being
> done on machines with many CPUs.
> 
> The culprit mostly was a single rwsem, the notorious mmap_sem that
> can be taking several times for read, and as on RT, this is just a
> single mutex, and it would serialize these accesses that would not
> happen on mainline.
> 
> I looked back at my poor dead rw multi pi reader patch and thought to
> myself. "How complex would this be if I removed the 'fast path' from
> the code". I decided to build a new tower in Mordor.
> 
> I feel that I am correct. By removing the fast path and requiring all
> accesses to the rwsem to go through the slow path (must take the
> wait_lock to do anything). The code really wasn't that bad. I also only
> focused on the rwsem and did not worry about the rwlocks as that hasn't
> been pointed out as a bottle neck yet. If it does happen to be, this
> code could easily work on rwlocks too.
> 
> I'm much more confident in this code than I was with my previous
> version of the rwlock multi-reader patch. I added a bunch of comments
> to this code to explain how things interact. The writer unlock was
> still able to use the fast path as the writers are pretty much like a
> normal mutex. Too bad that the writer unlock is not a high point of
> contention.
> 
> This patch is built on top of the two other patches that I posted
> earlier, which should not be as controversial.
> 
> If you have any benchmark on large machines I would be very happy if
> you could test this patch against the unpatched version of -rt.
> 
> Cheers,
> 
> -- Steve
> 
> Signed-off-by: Steven Rostedt <rostedt@goodmis.org>
> ---
> Index: linux-rt.git/kernel/rtmutex.c

Side note: could you please in general include diffstats with such 
patches, especially since you seem to be exporting it from a Git repo?

Newfangled patch summaries like:

 include/linux/rtmutex.h  |   29 ++
 include/linux/rwsem_rt.h |    8 
 include/linux/sched.h    |   20 +
 kernel/fork.c            |   20 +
 kernel/futex.c           |    2 
 kernel/rt.c              |   27 +
 kernel/rtmutex.c         |  645 +++++++++++++++++++++++++++++++++++++++++++++--
 kernel/rtmutex_common.h  |   19 +
 kernel/sysctl.c          |   13 
 9 files changed, 753 insertions(+), 30 deletions(-)

Really give a useful bird's eye view of forest Fangorn, before 
straying into it!

Thanks,

	Ingo

^ permalink raw reply	[flat|nested] 28+ messages in thread

* Re: [RFC PATCH RT] rwsem: The return of multi-reader PI rwsems
  2014-04-14  9:55 ` [RFC PATCH RT] " Ingo Molnar
@ 2014-04-14 13:34   ` Steven Rostedt
  2014-04-14 14:08     ` Peter Zijlstra
  0 siblings, 1 reply; 28+ messages in thread
From: Steven Rostedt @ 2014-04-14 13:34 UTC (permalink / raw)
  To: Ingo Molnar
  Cc: LKML, linux-rt-users, Mike Galbraith, Paul E. McKenney,
	Paul Gortmaker, Thomas Gleixner, Sebastian Andrzej Siewior,
	Clark Williams, Frederic Weisbecker, Peter Zijlstra

On Mon, 14 Apr 2014 11:55:54 +0200
Ingo Molnar <mingo@kernel.org> wrote:

> > Index: linux-rt.git/kernel/rtmutex.c  
> 
> Side note: could you please in general include diffstats with such 
> patches, especially since you seem to be exporting it from a Git repo?

Sure, I'll try to remember to add them. I do my work in a git repo and
then do:

git diff > mypatch.patch
quilt import mypatch.patch
patch -p1 -R < mypatch.patch
rm mypatch.patch

and then I pull patches/mypatch.patch into my email to do the RFC.

> 
> Newfangled patch summaries like:
> 
>  include/linux/rtmutex.h  |   29 ++
>  include/linux/rwsem_rt.h |    8 
>  include/linux/sched.h    |   20 +
>  kernel/fork.c            |   20 +
>  kernel/futex.c           |    2 
>  kernel/rt.c              |   27 +
>  kernel/rtmutex.c         |  645 +++++++++++++++++++++++++++++++++++++++++++++--
>  kernel/rtmutex_common.h  |   19 +
>  kernel/sysctl.c          |   13 
>  9 files changed, 753 insertions(+), 30 deletions(-)
> 
> Really give a useful bird's eye view of forest Fangorn, before 
> straying into it!

diffstat patches/mypatch.patch should also give the same. Hmm, I wonder
if quilt has any commands to do this for me?

Thanks,

-- Steve

^ permalink raw reply	[flat|nested] 28+ messages in thread

* Re: [RFC PATCH RT] rwsem: The return of multi-reader PI rwsems
  2014-04-14 13:34   ` Steven Rostedt
@ 2014-04-14 14:08     ` Peter Zijlstra
  0 siblings, 0 replies; 28+ messages in thread
From: Peter Zijlstra @ 2014-04-14 14:08 UTC (permalink / raw)
  To: Steven Rostedt
  Cc: Ingo Molnar, LKML, linux-rt-users, Mike Galbraith,
	Paul E. McKenney, Paul Gortmaker, Thomas Gleixner,
	Sebastian Andrzej Siewior, Clark Williams, Frederic Weisbecker

On Mon, Apr 14, 2014 at 09:34:19AM -0400, Steven Rostedt wrote:
> On Mon, 14 Apr 2014 11:55:54 +0200
> Ingo Molnar <mingo@kernel.org> wrote:
> 
> > > Index: linux-rt.git/kernel/rtmutex.c  
> > 
> > Side note: could you please in general include diffstats with such 
> > patches, especially since you seem to be exporting it from a Git repo?
> 
> Sure, I'll try to remember to add them. I do my work in a git repo and
> then do:
> 
> git diff > mypatch.patch
> quilt import mypatch.patch
> patch -p1 -R < mypatch.patch
> rm mypatch.patch
> 
> and then I pull patches/mypatch.patch into my email to do the RFC.
> 
> > 
> > Newfangled patch summaries like:
> > 
> >  include/linux/rtmutex.h  |   29 ++
> >  include/linux/rwsem_rt.h |    8 
> >  include/linux/sched.h    |   20 +
> >  kernel/fork.c            |   20 +
> >  kernel/futex.c           |    2 
> >  kernel/rt.c              |   27 +
> >  kernel/rtmutex.c         |  645 +++++++++++++++++++++++++++++++++++++++++++++--
> >  kernel/rtmutex_common.h  |   19 +
> >  kernel/sysctl.c          |   13 
> >  9 files changed, 753 insertions(+), 30 deletions(-)
> > 
> > Really give a useful bird's eye view of forest Fangorn, before 
> > straying into it!
> 
> diffstat patches/mypatch.patch should also give the same. Hmm, I wonder
> if quilt has any commands to do this for me?

QUILT_REFRESH_ARGS="--no-timestamps --backup --diffstat --strip-trailing-whitespace --no-index --sort -p1 -p ab"


Note the --diffstat; you'll need to add 'quilt refresh' to your cmd-list
though.

^ permalink raw reply	[flat|nested] 28+ messages in thread

* [RFC PATCH RT V4] rwsem: The return of multi-reader PI rwsems
  2014-04-11 13:25       ` Steven Rostedt
@ 2014-04-17 23:26         ` Steven Rostedt
  2014-04-18  8:19           ` Ingo Molnar
  0 siblings, 1 reply; 28+ messages in thread
From: Steven Rostedt @ 2014-04-17 23:26 UTC (permalink / raw)
  To: LKML, linux-rt-users
  Cc: Carsten Emde, Mike Galbraith, Paul E. McKenney, Paul Gortmaker,
	Thomas Gleixner, Sebastian Andrzej Siewior, Clark Williams,
	Frederic Weisbecker, Peter Zijlstra, Ingo Molnar

Changes since v3:

Clark reported that he was seeing a large latency when he added this
patch. I tested it out on a 8 logical CPU box, and sure enough I was
seeing it too. After spending the day debugging why, I found that I had
a bug in rt_mutex_getprio(), where I could do:

  min(task_top_pi_waiter(task)->pi_list_entry.prio, prio)

when there was no "top_pi_waiter", which would give garbage as a
result. This would let some tasks have higher priority than they
should, and cause other tasks that should have high priority not run.

After correcting this issue, the latencies are back down to normal.

-- Steve

Signed-off-by: Steven Rostedt <rostedt@goodmis.org>
---
 include/linux/rtmutex.h  |   29 ++
 include/linux/rwsem_rt.h |    8 
 include/linux/sched.h    |   20 +
 kernel/fork.c            |   20 +
 kernel/futex.c           |    2 
 kernel/rt.c              |   27 +
 kernel/rtmutex.c         |  664 ++++++++++++++++++++++++++++++++++++++++++++++-
 kernel/rtmutex_common.h  |   19 +
 kernel/sysctl.c          |   13 
 9 files changed, 772 insertions(+), 30 deletions(-)

Index: linux-rt.git/kernel/rtmutex.c
===================================================================
--- linux-rt.git.orig/kernel/rtmutex.c	2014-04-16 10:19:09.030579948 -0400
+++ linux-rt.git/kernel/rtmutex.c	2014-04-17 19:11:24.018721323 -0400
@@ -26,6 +26,21 @@
 #include "rtmutex_common.h"
 
 /*
+ * rt_rw_limit is the number of simultaneous readers of a rwsem lock.
+ *
+ * rt_rw_limit gets updated on boot up to the number of
+ * possible CPUs, but we need to initialize it to something other
+ * than zero.
+ */
+unsigned rt_rw_limit = NR_CPUS;
+
+/* cnt == 0 means unlimited */
+static inline int under_rt_rw_limit(int cnt)
+{
+	return !rt_rw_limit || cnt < rt_rw_limit;
+}
+
+/*
  * lock->owner state tracking:
  *
  * lock->owner holds the task_struct pointer of the owner. Bit 0
@@ -110,19 +125,48 @@
 		plist_head_init(&lock->wait_list);
 }
 
+static inline void init_rw_lists(struct rt_rw_mutex *rwlock)
+{
+	struct rt_mutex *lock = &rwlock->mutex;
+
+	/*
+	 * A rwsem priority is initialized to -1 and never will
+	 * be that again.
+	 */
+	if (unlikely(rwlock->prio < 0)) {
+		rwlock->prio = MAX_PRIO;
+		init_lists(lock);
+	}
+}
+
+static int rt_mutex_get_readers_prio(struct task_struct *task, int prio);
+static inline int task_has_reader_locks(struct task_struct *task);
+
 /*
  * Calculate task priority from the waiter list priority
  *
  * Return task->normal_prio when the waiter list is empty or when
  * the waiter is not allowed to do priority boosting
+ *
+ * On PREEMPT_RT, we also check the priorities of the list
+ * of read locks that the task holds.
  */
 int rt_mutex_getprio(struct task_struct *task)
 {
-	if (likely(!task_has_pi_waiters(task)))
-		return task->normal_prio;
+	int prio = task->normal_prio;
+	bool has_pi_waiters = task_has_pi_waiters(task);
+	bool has_reader_locks = task_has_reader_locks(task);
+
+	if (likely(!has_pi_waiters && !has_reader_locks))
+		return prio;
+
+	if (has_reader_locks)
+		prio = rt_mutex_get_readers_prio(task, prio);
+
+	if (has_pi_waiters)
+		prio = min(task_top_pi_waiter(task)->pi_list_entry.prio, prio);
 
-	return min(task_top_pi_waiter(task)->pi_list_entry.prio,
-		   task->normal_prio);
+	return prio;
 }
 
 /*
@@ -181,6 +225,11 @@
  */
 int max_lock_depth = 1024;
 
+static int rt_mutex_adjust_readers(struct rt_mutex *orig_lock,
+				   struct rt_mutex_waiter *orig_waiter,
+				   struct task_struct *top_task,
+				   struct rt_mutex *lock,
+				   int recursion_depth);
 /*
  * Adjust the priority chain. Also used for deadlock detection.
  * Decreases task's usage by one - may thus free the task.
@@ -203,7 +252,8 @@
 				      int deadlock_detect,
 				      struct rt_mutex *orig_lock,
 				      struct rt_mutex_waiter *orig_waiter,
-				      struct task_struct *top_task)
+				      struct task_struct *top_task,
+				      int recursion_depth)
 {
 	struct rt_mutex *lock;
 	struct rt_mutex_waiter *waiter, *top_waiter = orig_waiter;
@@ -316,6 +366,18 @@
 
 	/* Grab the next task */
 	task = rt_mutex_owner(lock);
+
+	/*
+	 * Readers are special. We may need to boost more than one owner.
+	 */
+	if (unlikely(task == RT_RW_READER)) {
+		ret = rt_mutex_adjust_readers(orig_lock, orig_waiter,
+					      top_task, lock,
+					      recursion_depth);
+		raw_spin_unlock(&lock->wait_lock);
+		goto out;
+	}
+
 	get_task_struct(task);
 	raw_spin_lock_irqsave(&task->pi_lock, flags);
 
@@ -349,7 +411,7 @@
 	raw_spin_unlock_irqrestore(&task->pi_lock, flags);
  out_put_task:
 	put_task_struct(task);
-
+ out:
 	return ret;
 }
 
@@ -518,6 +580,13 @@
 		return 0;
 
 	if (waiter == rt_mutex_top_waiter(lock)) {
+		/* readers are handled differently */
+		if (unlikely(owner == RT_RW_READER)) {
+			res = rt_mutex_adjust_readers(lock, waiter,
+						      current, lock, 0);
+			return res;
+		}
+
 		raw_spin_lock_irqsave(&owner->pi_lock, flags);
 		plist_del(&top_waiter->pi_list_entry, &owner->pi_waiters);
 		plist_add(&waiter->pi_list_entry, &owner->pi_waiters);
@@ -527,7 +596,8 @@
 			chain_walk = 1;
 		raw_spin_unlock_irqrestore(&owner->pi_lock, flags);
 	}
-	else if (debug_rt_mutex_detect_deadlock(waiter, detect_deadlock))
+	else if (debug_rt_mutex_detect_deadlock(waiter, detect_deadlock) &&
+		 owner != RT_RW_READER)
 		chain_walk = 1;
 
 	if (!chain_walk)
@@ -543,7 +613,7 @@
 	raw_spin_unlock(&lock->wait_lock);
 
 	res = rt_mutex_adjust_prio_chain(owner, detect_deadlock, lock, waiter,
-					 task);
+					 task, 0);
 
 	raw_spin_lock(&lock->wait_lock);
 
@@ -633,7 +703,7 @@
 
 	raw_spin_unlock(&lock->wait_lock);
 
-	rt_mutex_adjust_prio_chain(owner, 0, lock, NULL, current);
+	rt_mutex_adjust_prio_chain(owner, 0, lock, NULL, current, 0);
 
 	raw_spin_lock(&lock->wait_lock);
 }
@@ -660,7 +730,7 @@
 	/* gets dropped in rt_mutex_adjust_prio_chain()! */
 	get_task_struct(task);
 	raw_spin_unlock_irqrestore(&task->pi_lock, flags);
-	rt_mutex_adjust_prio_chain(task, 0, NULL, NULL, task);
+	rt_mutex_adjust_prio_chain(task, 0, NULL, NULL, task, 0);
 }
 
 #ifdef CONFIG_PREEMPT_RT_FULL
@@ -739,7 +809,7 @@
 	struct rt_mutex_waiter waiter, *top_waiter;
 	int ret;
 
-	rt_mutex_init_waiter(&waiter, true);
+	rt_mutex_init_waiter(&waiter, true, false);
 
 	raw_spin_lock(&lock->wait_lock);
 	init_lists(lock);
@@ -1001,15 +1071,564 @@
 
 	return 0;
 }
+
+/*
+ * Wake up the next waiter on a rw lock.
+ *
+ * Similar to wakeup_next_waiter() but the waiter is not on
+ * the owner's pi_waiters. Also, it does not reset the lock
+ * owner.
+ *
+ * Called with lock->wait_lock held.
+ */
+static void wakeup_next_rw_waiter(struct rt_mutex *lock)
+{
+	struct rt_mutex_waiter *waiter;
+
+	waiter = rt_mutex_top_waiter(lock);
+	rt_mutex_wake_waiter(waiter);
+}
+
+/* Called with rwmutex->mutex.wait_lock held */
+static inline void
+rt_rw_mutex_take_as_reader(struct task_struct *task,
+			   struct rt_rw_mutex *rwmutex,
+			   struct rt_mutex_waiter *waiter)
+{
+	struct reader_lock_struct *rls;
+	struct rt_mutex *mutex = &rwmutex->mutex;
+	unsigned long flags;
+	bool wakeup = false;
+
+	rwmutex->nr_owners++;
+	rt_mutex_set_owner(&rwmutex->mutex, RT_RW_READER);
+
+	if (waiter || rt_mutex_has_waiters(mutex)) {
+		unsigned long flags;
+		struct rt_mutex_waiter *top;
+
+		raw_spin_lock_irqsave(&task->pi_lock, flags);
+
+		/* remove the queued waiter. */
+		if (waiter) {
+			plist_del(&waiter->list_entry, &mutex->wait_list);
+			task->pi_blocked_on = NULL;
+		}
+
+		/*
+		 * Initialize the rwmutex prio to the priority of
+		 * the top waiter.
+		 */
+		if (rt_mutex_has_waiters(mutex)) {
+			top = rt_mutex_top_waiter(mutex);
+			top->pi_list_entry.prio = top->list_entry.prio;
+			/*
+			 * Readers set the lock priority for faster access
+			 * to read the priorities of the locks it owns
+			 * when boosting. This helps to not have to take
+			 * the pi_lock of the task. The rwmutex->prio
+			 * is protected by the rwmutex->mutex.wait_lock,
+			 * which is held during boosting.
+			 */
+			rwmutex->prio = top->list_entry.prio;
+
+			/*
+			 * If this waiter is a reader, and the reader limit
+			 * has not been hit, then we can wake this waiter
+			 * up too.
+			 */
+			if (!top->writer && under_rt_rw_limit(rwmutex->nr_owners))
+				wakeup = true;
+		} else
+			rwmutex->prio = MAX_PRIO;
+		raw_spin_unlock_irqrestore(&task->pi_lock, flags);
+
+	} else
+		rwmutex->prio = MAX_PRIO;
+
+	/*
+	 * It is possible to have holes in the owned_read_locks array.
+	 * If we take read lock A and then B, but then release A, we
+	 * can't move the pointer of B because if something blocks on
+	 * B, it can use the B pointer to boost this task. B can only
+	 * be moved, by owning the wait_list lock of B. Remember, the
+	 * B lock has its pointers to that index of our array.
+	 */
+	rls = &task->owned_read_locks[task->reader_lock_free];
+	BUG_ON(rls->lock);
+
+	/*
+	 * Grabing the pi_lock here as other tasks can boost this
+	 * lock via other held locks, and if free lock descriptor
+	 * was not at the end of the owned_read_locks array, then
+	 * it might get confused if it sees this descriptor with
+	 * a lock and no task.
+	 */
+	raw_spin_lock_irqsave(&task->pi_lock, flags);
+	rls->lock = rwmutex;
+	rls->task = task;
+	list_add(&rls->list, &rwmutex->owners);
+	raw_spin_unlock_irqrestore(&task->pi_lock, flags);
+
+	if (task->reader_lock_free == task->reader_lock_count) {
+		/*
+		 * If we nest too deep, then this task can never get the lock.
+		 * This task will then block for good. Warn about this and
+		 * hopefully, people will notice the warning. If not, they will notice
+		 * the dead task. Maybe this should be a BUG_ON()
+		 */
+		if (WARN_ON(task->reader_lock_count == MAX_RWLOCK_DEPTH))
+			return;
+
+		/*
+		 * We don't need to take the pi_lock here as we have the
+		 * wait_lock of the lock at the end of the list. If this task
+		 * was being boosted by a task blocked on this lock, it would
+		 * need to grab the wait_lock before boosting this task.
+		 */
+		task->reader_lock_count++;
+		task->reader_lock_free++;
+	} else {
+		/*
+		 * Find the next free lock in array. Again, we do not need
+		 * to grab the pi_lock because the boosting doesn't use
+		 * the reader_lock_free variable, which is the only thing
+		 * we are updating here.
+		 */
+		do {
+			rls = &task->owned_read_locks[++task->reader_lock_free];
+		} while (rls->lock &&
+			 task->reader_lock_free < task->reader_lock_count);
+	}
+
+	if (wakeup)
+		wakeup_next_rw_waiter(&rwmutex->mutex);
+}
+
+static int try_to_take_rw_read(struct rt_rw_mutex *rwmutex,
+			       struct rt_mutex_waiter *waiter)
+{
+	struct task_struct *task = current;
+	struct rt_mutex *mutex = &rwmutex->mutex;
+	struct task_struct *owner;
+
+	assert_raw_spin_locked(&mutex->wait_lock);
+
+	/* The writer unlock can use the fast path */
+	mark_rt_mutex_waiters(mutex);
+
+	/* If we are at the reader limit, we can't take the lock */
+	if (unlikely(!under_rt_rw_limit(rwmutex->nr_owners)))
+		return 0;
+
+	/*
+	 * If there's no waiters, and there is no owner or
+	 * the owner is a reader, we get the lock.
+	 * Note, if the waiter or pending bits are set in the owner
+	 * then we need to do more checks.
+	 */
+	if (likely(!mutex->owner || mutex->owner == RT_RW_READER))
+		goto taken;
+
+	owner = rt_mutex_owner(mutex);
+
+	/* If the lock is owned by a task, then it is a writer */
+	if (owner && owner != RT_RW_READER)
+		return 0;
+
+	/*
+	 * A writer or a reader may be waiting. In either case, we
+	 * may still be able to steal the lock. The RT rwsems are
+	 * not fair if the new reader comes in and is higher priority
+	 * than all the waiters.
+	 */
+	if (likely(rt_mutex_has_waiters(mutex))) {
+		struct task_struct *pown = rt_mutex_top_waiter(mutex)->task;
+
+		if (task != pown && !lock_is_stealable(task, pown, STEAL_NORMAL))
+			return 0;
+	}
+
+ taken:
+	rt_rw_mutex_take_as_reader(task, rwmutex, waiter);
+	return 1;
+}
+
+void rt_rw_mutex_read_lock(struct rt_rw_mutex *rwmutex)
+{
+	struct rt_mutex_waiter waiter;
+	struct rt_mutex *mutex = &rwmutex->mutex;
+	int ret;
+
+	raw_spin_lock(&mutex->wait_lock);
+	init_rw_lists(rwmutex);
+
+	if (likely(try_to_take_rw_read(rwmutex, NULL))) {
+		/* Got the lock */
+		raw_spin_unlock(&mutex->wait_lock);
+		return;
+	}
+
+	rt_mutex_init_waiter(&waiter, false, false);
+
+	set_current_state(TASK_UNINTERRUPTIBLE);
+
+	ret = task_blocks_on_rt_mutex(mutex, &waiter, current, 0);
+	BUG_ON(ret);
+
+	for (;;) {
+		/* Try to acquire the lock: */
+		if (try_to_take_rw_read(rwmutex, &waiter))
+			break;
+
+		raw_spin_unlock(&mutex->wait_lock);
+
+		debug_rt_mutex_print_deadlock(waiter);
+
+		schedule_rt_mutex(mutex);
+
+		raw_spin_lock(&mutex->wait_lock);
+		set_current_state(TASK_UNINTERRUPTIBLE);
+	}
+
+	set_current_state(TASK_RUNNING);
+
+	raw_spin_unlock(&mutex->wait_lock);
+
+	debug_rt_mutex_free_waiter(&waiter);
+}
+
+int rt_rw_mutex_read_trylock(struct rt_rw_mutex *rwmutex)
+{
+	struct rt_mutex *mutex = &rwmutex->mutex;
+	int ret = 0;
+
+	if (!raw_spin_trylock(&mutex->wait_lock))
+		return ret;
+
+	init_rw_lists(rwmutex);
+
+	if (try_to_take_rw_read(rwmutex, NULL))
+		ret = 1;
+
+	raw_spin_unlock(&mutex->wait_lock);
+
+	return ret;
+}
+
+static int
+try_to_take_rw_write(struct rt_rw_mutex *rwmutex, struct rt_mutex_waiter *waiter)
+{
+	struct rt_mutex *mutex = &rwmutex->mutex;
+
+	/* Writers block if there's any readers owning the lock */
+	if (rwmutex->nr_owners)
+		return 0;
+
+	/* No readers, then we can try to take the mutex normally. */
+	return try_to_take_rt_mutex(mutex, current, waiter);
+}
+
+void rt_rw_mutex_write_lock(struct rt_rw_mutex *rwmutex)
+{
+	struct rt_mutex *mutex = &rwmutex->mutex;
+	struct rt_mutex_waiter waiter;
+	struct task_struct *task = current;
+	int ret;
+
+	raw_spin_lock(&mutex->wait_lock);
+	init_rw_lists(rwmutex);
+
+	if (try_to_take_rw_write(rwmutex, NULL)) {
+		raw_spin_unlock(&mutex->wait_lock);
+		return;
+	}
+
+	/* Writers wake up differently than readers (flag it) */
+	rt_mutex_init_waiter(&waiter, false, true);
+
+	ret = task_blocks_on_rt_mutex(mutex, &waiter, task, 0);
+	BUG_ON(ret);
+
+	set_current_state(TASK_UNINTERRUPTIBLE);
+
+	for (;;) {
+		/* Try to acquire the lock: */
+		if (try_to_take_rw_write(rwmutex, &waiter))
+			break;
+
+		raw_spin_unlock(&mutex->wait_lock);
+
+		debug_rt_mutex_print_deadlock(waiter);
+
+		schedule_rt_mutex(mutex);
+
+		raw_spin_lock(&mutex->wait_lock);
+		set_current_state(TASK_UNINTERRUPTIBLE);
+	}
+
+	set_current_state(TASK_RUNNING);
+
+	raw_spin_unlock(&mutex->wait_lock);
+	WARN_ONCE(rwmutex->nr_owners, "Writer has lock with readers");
+
+	debug_rt_mutex_free_waiter(&waiter);
+}
+
+int rt_rw_mutex_write_trylock(struct rt_rw_mutex *rwmutex)
+{
+	struct rt_mutex *mutex = &rwmutex->mutex;
+	int ret = 0;
+
+	if (!raw_spin_trylock(&mutex->wait_lock))
+		return ret;
+
+	init_rw_lists(rwmutex);
+
+	if (try_to_take_rw_write(rwmutex, NULL))
+		ret = 1;
+
+	raw_spin_unlock(&mutex->wait_lock);
+
+	return ret;
+}
+
+/*
+ * When a reader lock is released, see if the reader_lock_count
+ * can be moved back.
+ */
+static void shrink_reader_lock_array(struct task_struct *task)
+{
+	struct reader_lock_struct *read_locks = task->owned_read_locks;
+
+	while (task->reader_lock_count &&
+	       read_locks[task->reader_lock_count - 1].lock == NULL)
+		task->reader_lock_count--;
+
+	if (task->reader_lock_free > task->reader_lock_count)
+		task->reader_lock_free = task->reader_lock_count;
+}
+
+void rt_rw_mutex_read_unlock(struct rt_rw_mutex *rwmutex)
+{
+	struct rt_mutex *mutex = &rwmutex->mutex;
+	struct rt_mutex_waiter *waiter;
+	struct reader_lock_struct *rls;
+	struct task_struct *task = current;
+	unsigned long flags;
+	int readers;
+	int i;
+
+	raw_spin_lock(&mutex->wait_lock);
+
+	rt_mutex_deadlock_account_unlock(current);
+
+	WARN_ON_ONCE(!rwmutex->nr_owners);
+
+	rwmutex->nr_owners--;
+
+	if (rt_mutex_has_waiters(mutex)) {
+		/*
+		 * If the top waiter is a reader and we are under
+		 * the limit, or the top waiter is a writer and
+		 * there's no more readers, then we can give the
+		 * top waiter pending ownership and wake it up.
+		 *
+		 * To simplify things, we only wake up one task, even
+		 * if its a reader. If the reader wakes up and gets the
+		 * lock, it will look to see if it can wake up the next
+		 * waiter, and so on. This way we only need to worry about
+		 * one task at a time.
+		 */
+		waiter = rt_mutex_top_waiter(mutex);
+		readers = rwmutex->nr_owners;
+		if ((waiter->writer && !readers) ||
+		    (!waiter->writer && under_rt_rw_limit(readers)))
+			wakeup_next_rw_waiter(mutex);
+	} else
+		rwmutex->prio = MAX_PRIO;
+
+	if (!rwmutex->nr_owners)
+		rt_mutex_set_owner(&rwmutex->mutex, NULL);
+
+	raw_spin_lock_irqsave(&task->pi_lock, flags);
+	/* Remove the lock from this tasks list */
+	for (i = task->reader_lock_count - 1; i >= 0; i--) {
+		rls = &task->owned_read_locks[i];
+
+		if (rls->lock == rwmutex) {
+			rls->lock = NULL;
+			list_del_init(&rls->list);
+			/* Shrink the array if we can. */
+			if (i == task->reader_lock_count - 1)
+				shrink_reader_lock_array(task);
+			else if (i < task->reader_lock_free)
+				task->reader_lock_free = i;
+			break;
+		}
+	}
+	raw_spin_unlock_irqrestore(&task->pi_lock, flags);
+
+	WARN_ON_ONCE(i < 0);
+
+	raw_spin_unlock(&mutex->wait_lock);
+
+	/* Undo pi boosting if necessary */
+	rt_mutex_adjust_prio(current);
+}
+
+void rt_rw_mutex_downgrade_write(struct rt_rw_mutex *rwmutex)
+{
+	struct task_struct *task = current;
+	struct rt_mutex *mutex = &rwmutex->mutex;
+
+	raw_spin_lock(&mutex->wait_lock);
+
+	/*
+	 * Writers have normal pi with other tasks blocked
+	 * on the lock. That is, the top waiter will be in the
+	 * pi_list of this task. But for readers, waiters of
+	 * the lock are not included in the pi_list, only the
+	 * locks are. We must remove the top waiter of this
+	 * lock from this task.
+	 */
+	if (rt_mutex_has_waiters(mutex)) {
+		struct rt_mutex_waiter *waiter;
+		unsigned long flags;
+
+		waiter = rt_mutex_top_waiter(mutex);
+		raw_spin_lock_irqsave(&task->pi_lock, flags);
+		plist_del(&waiter->pi_list_entry, &task->pi_waiters);
+		/*
+		 * The rt_rw_mutex_take_as_reader() will update
+		 * the rwmutex prio.
+		 */
+		raw_spin_unlock_irqrestore(&task->pi_lock, flags);
+	}
+
+	WARN_ONCE(rwmutex->nr_owners, "Writer owned with readers");
+
+	rt_rw_mutex_take_as_reader(task, rwmutex, NULL);
+
+	raw_spin_unlock(&mutex->wait_lock);
+}
+
+static inline int task_has_reader_locks(struct task_struct *task)
+{
+	return task->reader_lock_count;
+}
+
+static int rt_mutex_get_readers_prio(struct task_struct *task, int prio)
+{
+	struct reader_lock_struct *rls;
+	struct rt_rw_mutex *rwmutex;
+	int lock_prio;
+	int i;
+
+	for (i = 0; i < task->reader_lock_count; i++) {
+		rls = &task->owned_read_locks[i];
+		rwmutex = rls->lock;
+		if (!rwmutex)
+			continue;
+		lock_prio = rwmutex->prio;
+		if (prio > lock_prio)
+			prio = lock_prio;
+	}
+	WARN_ON_ONCE(prio < 0);
+
+	return prio;
+}
+
+/* Expects to be called with lock->wait_lock held */
+static int rt_mutex_adjust_readers(struct rt_mutex *orig_lock,
+				   struct rt_mutex_waiter *orig_waiter,
+				   struct task_struct *top_task,
+				   struct rt_mutex *lock,
+				   int recursion_depth)
+{
+	struct reader_lock_struct *rls;
+	struct rt_mutex_waiter *waiter;
+	struct task_struct *task;
+	struct rt_rw_mutex *rwmutex = container_of(lock, struct rt_rw_mutex, mutex);
+	unsigned long flags;
+
+	/* Update the rwmutex's prio */
+	if (rt_mutex_has_waiters(lock)) {
+		waiter = rt_mutex_top_waiter(lock);
+		/*
+		 * Do we need to grab the task->pi_lock?
+		 * Really, we are only reading it. If it
+		 * changes, then that should follow this chain
+		 * too.
+		 */
+		rwmutex->prio = waiter->task->prio;
+	} else
+		rwmutex->prio = MAX_PRIO;
+
+	if (recursion_depth >= MAX_RWLOCK_DEPTH) {
+		WARN_ON(1);
+		return 1;
+	}
+
+	list_for_each_entry(rls, &rwmutex->owners, list) {
+		bool skip = false;
+
+		task = rls->task;
+
+		raw_spin_lock_irqsave(&task->pi_lock, flags);
+		__rt_mutex_adjust_prio(task);
+		/*
+		 * We need to grab the pi_lock to adjust the task prio
+		 * might as well use this to check if the task is blocked
+		 * as well, and save on a call to the prio chain that will
+		 * just grab the lock again and do the test.
+		 */
+		if (!rt_mutex_real_waiter(task->pi_blocked_on))
+			skip = true;
+		raw_spin_unlock_irqrestore(&task->pi_lock, flags);
+
+		if (skip)
+			continue;
+
+		get_task_struct(task);
+		/*
+		 * rt_mutex_adjust_prio_chain will do
+		 * the put_task_struct
+		 */
+		rt_mutex_adjust_prio_chain(task, 0, orig_lock,
+					   orig_waiter, top_task,
+					   recursion_depth+1);
+	}
+
+	return 0;
+}
 #else
+static int rt_mutex_adjust_readers(struct rt_mutex *orig_lock,
+				   struct rt_mutex_waiter *orig_waiter,
+				   struct task_struct *top_task,
+				   struct rt_mutex *lock,
+				   int recursion_depth)
+{
+	return 0;
+}
+
+static int rt_mutex_get_readers_prio(struct task_struct *task, int prio)
+{
+	return prio;
+}
+
+static inline int task_has_reader_locks(struct task_struct *task)
+{
+	return 0;
+}
+
 static inline int __sched
 __mutex_lock_check_stamp(struct rt_mutex *lock, struct ww_acquire_ctx *ctx)
 {
 	BUG();
 	return 0;
 }
-
-#endif
+#endif /* CONFIG_PREEMPT_RT */
 
 /**
  * __rt_mutex_slowlock() - Perform the wait-wake-try-to-take loop
@@ -1154,7 +1773,7 @@
 	struct rt_mutex_waiter waiter;
 	int ret = 0;
 
-	rt_mutex_init_waiter(&waiter, false);
+	rt_mutex_init_waiter(&waiter, false, false);
 
 	raw_spin_lock(&lock->wait_lock);
 	init_lists(lock);
@@ -1718,4 +2337,21 @@
 	rt_mutex_unlock(&lock->base.lock);
 }
 EXPORT_SYMBOL(ww_mutex_unlock);
+
+/*
+ * On boot up, rt_rw_limit is set to NR_CPUS. At the end of boot
+ * we can lower that to actual CPUs as everything should be running
+ * as it most likely will on a normal system.
+ *
+ * Note, benchmarks have shown that the best performance we get
+ * from doing a page fault stress test on threads, is when
+ * rt_rw_limit is set to 2x online CPUs.
+ */
+static int __init rt_rw_limit_init(void)
+{
+	rt_rw_limit = nr_cpu_ids * 2;
+	return 0;
+}
+late_initcall(rt_rw_limit_init);
+
 #endif
Index: linux-rt.git/kernel/rtmutex_common.h
===================================================================
--- linux-rt.git.orig/kernel/rtmutex_common.h	2014-04-16 10:19:09.030579948 -0400
+++ linux-rt.git/kernel/rtmutex_common.h	2014-04-16 10:31:20.160360827 -0400
@@ -43,6 +43,7 @@
  * @list_entry:		pi node to enqueue into the mutex waiters list
  * @pi_list_entry:	pi node to enqueue into the mutex owner waiters list
  * @task:		task reference to the blocked task
+ * @writer:		true if its a rwsem writer that is blocked
  */
 struct rt_mutex_waiter {
 	struct plist_node	list_entry;
@@ -50,6 +51,7 @@
 	struct task_struct	*task;
 	struct rt_mutex		*lock;
 	bool			savestate;
+	bool			writer;
 #ifdef CONFIG_DEBUG_RT_MUTEXES
 	unsigned long		ip;
 	struct pid		*deadlock_task_pid;
@@ -101,6 +103,20 @@
 		((unsigned long)lock->owner & ~RT_MUTEX_OWNER_MASKALL);
 }
 
+/* used as reader owner of the mutex */
+#define RT_RW_READER		(struct task_struct *)0x100
+
+#ifdef CONFIG_PREEMPT_RT_FULL
+
+void rt_rw_mutex_read_unlock(struct rt_rw_mutex *rwmutex);
+void rt_rw_mutex_downgrade_write(struct rt_rw_mutex *rwmutex);
+int rt_rw_mutex_write_trylock(struct rt_rw_mutex *rwmutex);
+void rt_rw_mutex_write_lock(struct rt_rw_mutex *rwmutex);
+int rt_rw_mutex_read_trylock(struct rt_rw_mutex *rwmutex);
+void rt_rw_mutex_read_lock(struct rt_rw_mutex *rwmutex);
+
+#endif /* CONFIG_PREEMPT_RT */
+
 /*
  * PI-futex support (proxy locking functions, etc.):
  */
@@ -128,11 +144,12 @@
 #endif
 
 static inline void
-rt_mutex_init_waiter(struct rt_mutex_waiter *waiter, bool savestate)
+rt_mutex_init_waiter(struct rt_mutex_waiter *waiter, bool savestate, bool writer)
 {
 	debug_rt_mutex_init_waiter(waiter);
 	waiter->task = NULL;
 	waiter->savestate = savestate;
+	waiter->writer = writer;
 }
 
 #endif
Index: linux-rt.git/kernel/rt.c
===================================================================
--- linux-rt.git.orig/kernel/rt.c	2014-04-16 10:19:09.030579948 -0400
+++ linux-rt.git/kernel/rt.c	2014-04-16 10:31:20.170360699 -0400
@@ -310,14 +310,19 @@
 void  rt_up_write(struct rw_semaphore *rwsem)
 {
 	rwsem_release(&rwsem->dep_map, 1, _RET_IP_);
-	rt_mutex_unlock(&rwsem->lock);
+	/*
+	 * Unlocking a write is the same as unlocking the mutex.
+	 * The woken reader will do all the work if it needs to
+	 * wake up more than one reader.
+	 */
+	__rt_spin_unlock(&rwsem->rwlock.mutex);
 }
 EXPORT_SYMBOL(rt_up_write);
 
 void  rt_up_read(struct rw_semaphore *rwsem)
 {
 	rwsem_release(&rwsem->dep_map, 1, _RET_IP_);
-	rt_mutex_unlock(&rwsem->lock);
+	rt_rw_mutex_read_unlock(&rwsem->rwlock);
 }
 EXPORT_SYMBOL(rt_up_read);
 
@@ -327,13 +332,14 @@
  */
 void  rt_downgrade_write(struct rw_semaphore *rwsem)
 {
-	BUG_ON(rt_mutex_owner(&rwsem->lock) != current);
+	BUG_ON(rt_mutex_owner(&rwsem->rwlock.mutex) != current);
+	rt_rw_mutex_downgrade_write(&rwsem->rwlock);
 }
 EXPORT_SYMBOL(rt_downgrade_write);
 
 int  rt_down_write_trylock(struct rw_semaphore *rwsem)
 {
-	int ret = rt_mutex_trylock(&rwsem->lock);
+	int ret = rt_rw_mutex_write_trylock(&rwsem->rwlock);
 
 	if (ret)
 		rwsem_acquire(&rwsem->dep_map, 0, 1, _RET_IP_);
@@ -344,14 +350,14 @@
 void  rt_down_write(struct rw_semaphore *rwsem)
 {
 	rwsem_acquire(&rwsem->dep_map, 0, 0, _RET_IP_);
-	rt_mutex_lock(&rwsem->lock);
+	rt_rw_mutex_write_lock(&rwsem->rwlock);
 }
 EXPORT_SYMBOL(rt_down_write);
 
 void  rt_down_write_nested(struct rw_semaphore *rwsem, int subclass)
 {
 	rwsem_acquire(&rwsem->dep_map, subclass, 0, _RET_IP_);
-	rt_mutex_lock(&rwsem->lock);
+	rt_rw_mutex_write_lock(&rwsem->rwlock);
 }
 EXPORT_SYMBOL(rt_down_write_nested);
 
@@ -359,14 +365,14 @@
 		struct lockdep_map *nest)
 {
 	rwsem_acquire_nest(&rwsem->dep_map, 0, 0, nest, _RET_IP_);
-	rt_mutex_lock(&rwsem->lock);
+	rt_rw_mutex_write_lock(&rwsem->rwlock);
 }
 
 int  rt_down_read_trylock(struct rw_semaphore *rwsem)
 {
 	int ret;
 
-	ret = rt_mutex_trylock(&rwsem->lock);
+	ret = rt_rw_mutex_read_trylock(&rwsem->rwlock);
 	if (ret)
 		rwsem_acquire(&rwsem->dep_map, 0, 1, _RET_IP_);
 
@@ -377,7 +383,7 @@
 static void __rt_down_read(struct rw_semaphore *rwsem, int subclass)
 {
 	rwsem_acquire(&rwsem->dep_map, subclass, 0, _RET_IP_);
-	rt_mutex_lock(&rwsem->lock);
+	rt_rw_mutex_read_lock(&rwsem->rwlock);
 }
 
 void  rt_down_read(struct rw_semaphore *rwsem)
@@ -402,7 +408,8 @@
 	debug_check_no_locks_freed((void *)rwsem, sizeof(*rwsem));
 	lockdep_init_map(&rwsem->dep_map, name, key, 0);
 #endif
-	rwsem->lock.save_state = 0;
+	rt_rw_mutex_init(&rwsem->rwlock);
+	rwsem->rwlock.mutex.save_state = 0;
 }
 EXPORT_SYMBOL(__rt_rwsem_init);
 
Index: linux-rt.git/include/linux/sched.h
===================================================================
--- linux-rt.git.orig/include/linux/sched.h	2014-04-16 10:19:09.030579948 -0400
+++ linux-rt.git/include/linux/sched.h	2014-04-16 10:31:20.170360699 -0400
@@ -993,6 +993,22 @@
 #endif
 };
 
+#ifdef CONFIG_PREEMPT_RT_FULL
+struct rt_rw_mutex;
+/**
+ * reader_lock_struct
+ *
+ * @lock:	pointer to rwsem that is held
+ * @task:	pointer back to task, for lock code
+ * @list_head:	link into rt_rw_mutex owners list
+ */
+struct reader_lock_struct {
+	struct rt_rw_mutex		*lock;
+	struct task_struct		*task;
+	struct list_head		list;
+};
+#endif
+
 struct sched_rt_entity {
 	struct list_head run_list;
 	unsigned long timeout;
@@ -1224,6 +1240,10 @@
 #ifdef CONFIG_PREEMPT_RT_FULL
 	/* TODO: move me into ->restart_block ? */
 	struct siginfo forced_info;
+#define MAX_RWLOCK_DEPTH 5
+	int reader_lock_count;	/* index of last element in owned_read_locks */
+	int reader_lock_free;	/* index of next free element. */
+	struct reader_lock_struct owned_read_locks[MAX_RWLOCK_DEPTH];
 #endif
 
 	unsigned long sas_ss_sp;
Index: linux-rt.git/kernel/fork.c
===================================================================
--- linux-rt.git.orig/kernel/fork.c	2014-04-16 10:19:09.030579948 -0400
+++ linux-rt.git/kernel/fork.c	2014-04-16 10:31:20.180360572 -0400
@@ -1385,6 +1385,26 @@
 	if (retval)
 		goto bad_fork_cleanup_io;
 
+#ifdef CONFIG_PREEMPT_RT_FULL
+	p->reader_lock_count = 0;
+	p->reader_lock_free = 0;
+	/* Bracket to keep 'i' local */
+	{
+		int i;
+		/*
+		 * We could put the initialization of this list in
+		 * the grabbing of the lock, but it is safer to
+		 * do it now. The list head initialization may be
+		 * removed, but we'll keep it for now, just to be safe.
+		 */
+		for (i = 0; i < MAX_RWLOCK_DEPTH; i++) {
+			p->owned_read_locks[i].lock = NULL;
+			p->owned_read_locks[i].task = p;
+			INIT_LIST_HEAD(&p->owned_read_locks[i].list);
+		}
+	}
+#endif
+
 	if (pid != &init_struct_pid) {
 		retval = -ENOMEM;
 		pid = alloc_pid(p->nsproxy->pid_ns_for_children);
Index: linux-rt.git/kernel/sysctl.c
===================================================================
--- linux-rt.git.orig/kernel/sysctl.c	2014-04-16 10:19:09.030579948 -0400
+++ linux-rt.git/kernel/sysctl.c	2014-04-16 10:31:20.189360458 -0400
@@ -91,6 +91,10 @@
 #include <linux/nmi.h>
 #endif
 
+#ifdef CONFIG_PREEMPT_RT_FULL
+extern int rt_rw_limit;
+#endif
+
 
 #if defined(CONFIG_SYSCTL)
 
@@ -453,6 +457,15 @@
 		.proc_handler	= proc_dointvec,
 	},
 #endif
+#ifdef CONFIG_PREEMPT_RT_FULL
+	{
+		.procname	= "rwsem_reader_limit",
+		.data		= &rt_rw_limit,
+		.maxlen		= sizeof(int),
+		.mode		= 0644,
+		.proc_handler	= &proc_dointvec,
+	},
+#endif
 	{
 		.procname	= "panic",
 		.data		= &panic_timeout,
Index: linux-rt.git/include/linux/rtmutex.h
===================================================================
--- linux-rt.git.orig/include/linux/rtmutex.h	2014-04-16 10:19:09.030579948 -0400
+++ linux-rt.git/include/linux/rtmutex.h	2014-04-16 10:31:20.189360458 -0400
@@ -42,6 +42,21 @@
 #endif
 };
 
+/**
+ * The rt_rw_mutex structure
+ *
+ * @rt_mutex:	The mutex to wait on
+ * @owners:	list of read owners of the mutex
+ * @nr_owners:	number of read owners
+ * @prio:	the priority of the highest waiter
+ */
+struct rt_rw_mutex {
+	struct rt_mutex		mutex;
+	struct list_head	owners;
+	int			nr_owners;
+	int			prio;
+};
+
 struct rt_mutex_waiter;
 struct hrtimer_sleeper;
 
@@ -75,6 +90,15 @@
 		__rt_mutex_init(mutex, #mutex);			\
 	} while (0)
 
+# define rt_rw_mutex_init(rwmutex)					\
+	do {								\
+		raw_spin_lock_init(&(rwmutex)->mutex.wait_lock);	\
+		INIT_LIST_HEAD(&(rwmutex)->owners);			\
+		(rwmutex)->nr_owners = 0;				\
+		(rwmutex)->prio = -1;					\
+		__rt_mutex_init(&(rwmutex)->mutex, #rwmutex);		\
+	} while (0)
+
 #define __RT_MUTEX_INITIALIZER_PLAIN(mutexname) \
 	.wait_lock = __RAW_SPIN_LOCK_UNLOCKED(mutexname.wait_lock) \
 	, .wait_list = PLIST_HEAD_INIT(mutexname.wait_list) \
@@ -85,6 +109,11 @@
 #define __RT_MUTEX_INITIALIZER(mutexname) \
 	{ __RT_MUTEX_INITIALIZER_PLAIN(mutexname) }
 
+#define __RT_RW_MUTEX_INITIALIZER(mutexname)			\
+	{ .owners = LIST_HEAD_INIT(mutexname.owners)		\
+	  , .prio = -1						\
+	  , .mutex = __RT_MUTEX_INITIALIZER(mutexname.mutex) }
+
 #define __RT_MUTEX_INITIALIZER_SAVE_STATE(mutexname) \
 	{ __RT_MUTEX_INITIALIZER_PLAIN(mutexname)    \
 	  , .save_state = 1 }
Index: linux-rt.git/include/linux/rwsem_rt.h
===================================================================
--- linux-rt.git.orig/include/linux/rwsem_rt.h	2014-04-16 10:19:09.030579948 -0400
+++ linux-rt.git/include/linux/rwsem_rt.h	2014-04-16 10:31:20.198360345 -0400
@@ -19,14 +19,14 @@
 #include <linux/rtmutex.h>
 
 struct rw_semaphore {
-	struct rt_mutex		lock;
+	struct rt_rw_mutex	rwlock;
 #ifdef CONFIG_DEBUG_LOCK_ALLOC
 	struct lockdep_map	dep_map;
 #endif
 };
 
 #define __RWSEM_INITIALIZER(name) \
-	{ .lock = __RT_MUTEX_INITIALIZER(name.lock), \
+	{ .rwlock = __RT_RW_MUTEX_INITIALIZER(name.rwlock), \
 	  RW_DEP_MAP_INIT(name) }
 
 #define DECLARE_RWSEM(lockname) \
@@ -37,7 +37,7 @@
 
 #define __rt_init_rwsem(sem, name, key)			\
 	do {						\
-		rt_mutex_init(&(sem)->lock);		\
+		rt_rw_mutex_init(&(sem)->rwlock);	\
 		__rt_rwsem_init((sem), (name), (key));\
 	} while (0)
 
@@ -63,7 +63,7 @@
 extern void  rt_downgrade_write(struct rw_semaphore *rwsem);
 
 #define init_rwsem(sem)		rt_init_rwsem(sem)
-#define rwsem_is_locked(s)	rt_mutex_is_locked(&(s)->lock)
+#define rwsem_is_locked(s)	rt_mutex_is_locked(&(s)->rwlock.mutex)
 
 static inline void down_read(struct rw_semaphore *sem)
 {
Index: linux-rt.git/kernel/futex.c
===================================================================
--- linux-rt.git.orig/kernel/futex.c	2014-04-16 10:19:09.030579948 -0400
+++ linux-rt.git/kernel/futex.c	2014-04-16 10:31:20.204360269 -0400
@@ -2327,7 +2327,7 @@
 	 * The waiter is allocated on our stack, manipulated by the requeue
 	 * code while we sleep on uaddr.
 	 */
-	rt_mutex_init_waiter(&rt_waiter, false);
+	rt_mutex_init_waiter(&rt_waiter, false, false);
 
 	ret = get_futex_key(uaddr2, flags & FLAGS_SHARED, &key2, VERIFY_WRITE);
 	if (unlikely(ret != 0))

^ permalink raw reply	[flat|nested] 28+ messages in thread

* Re: [RFC PATCH RT V4] rwsem: The return of multi-reader PI rwsems
  2014-04-17 23:26         ` [RFC PATCH RT V4] " Steven Rostedt
@ 2014-04-18  8:19           ` Ingo Molnar
  2014-04-24 17:52             ` Steven Rostedt
  0 siblings, 1 reply; 28+ messages in thread
From: Ingo Molnar @ 2014-04-18  8:19 UTC (permalink / raw)
  To: Steven Rostedt
  Cc: LKML, linux-rt-users, Carsten Emde, Mike Galbraith,
	Paul E. McKenney, Paul Gortmaker, Thomas Gleixner,
	Sebastian Andrzej Siewior, Clark Williams, Frederic Weisbecker,
	Peter Zijlstra


* Steven Rostedt <rostedt@goodmis.org> wrote:

> Changes since v3:
> 
> Clark reported that he was seeing a large latency when he added this
> patch. I tested it out on a 8 logical CPU box, and sure enough I was
> seeing it too. After spending the day debugging why, I found that I had
> a bug in rt_mutex_getprio(), where I could do:
> 
>   min(task_top_pi_waiter(task)->pi_list_entry.prio, prio)
> 
> when there was no "top_pi_waiter", which would give garbage as a
> result. This would let some tasks have higher priority than they
> should, and cause other tasks that should have high priority not run.

Would a sanity check like the one below have helped? (untested and 
such)

Thanks,

	Ingo

==========>
 kernel/locking/rtmutex_common.h | 1 +
 1 file changed, 1 insertion(+)

diff --git a/kernel/locking/rtmutex_common.h b/kernel/locking/rtmutex_common.h
index 7431a9c..36b1ce8 100644
--- a/kernel/locking/rtmutex_common.h
+++ b/kernel/locking/rtmutex_common.h
@@ -85,6 +85,7 @@ static inline int task_has_pi_waiters(struct task_struct *p)
 static inline struct rt_mutex_waiter *
 task_top_pi_waiter(struct task_struct *p)
 {
+	WARN_ON_ONCE(!p->pi_waiters_leftmost);
 	return rb_entry(p->pi_waiters_leftmost, struct rt_mutex_waiter,
 			pi_tree_entry);
 }

^ permalink raw reply related	[flat|nested] 28+ messages in thread

* Re: [RFC PATCH RT V4] rwsem: The return of multi-reader PI rwsems
  2014-04-18  8:19           ` Ingo Molnar
@ 2014-04-24 17:52             ` Steven Rostedt
  0 siblings, 0 replies; 28+ messages in thread
From: Steven Rostedt @ 2014-04-24 17:52 UTC (permalink / raw)
  To: Ingo Molnar
  Cc: LKML, linux-rt-users, Carsten Emde, Mike Galbraith,
	Paul E. McKenney, Paul Gortmaker, Thomas Gleixner,
	Sebastian Andrzej Siewior, Clark Williams, Frederic Weisbecker,
	Peter Zijlstra

On Fri, 18 Apr 2014 10:19:26 +0200
Ingo Molnar <mingo@kernel.org> wrote:

> 
> * Steven Rostedt <rostedt@goodmis.org> wrote:
> 
> > Changes since v3:
> > 
> > Clark reported that he was seeing a large latency when he added this
> > patch. I tested it out on a 8 logical CPU box, and sure enough I was
> > seeing it too. After spending the day debugging why, I found that I had
> > a bug in rt_mutex_getprio(), where I could do:
> > 
> >   min(task_top_pi_waiter(task)->pi_list_entry.prio, prio)
> > 
> > when there was no "top_pi_waiter", which would give garbage as a
> > result. This would let some tasks have higher priority than they
> > should, and cause other tasks that should have high priority not run.
> 
> Would a sanity check like the one below have helped? (untested and 
> such)

Actually, if I had run this with CONFIG_DEBUG_PI_LIST then this would
have triggered:

#ifdef CONFIG_DEBUG_PI_LIST
# define plist_first_entry(head, type, member)	\
({ \
	WARN_ON(plist_head_empty(head)); \
	container_of(plist_first(head), type, member); \
})
#else
# define plist_first_entry(head, type, member)	\
	container_of(plist_first(head), type, member)
#endif

-- Steve

> 
> Thanks,
> 
> 	Ingo
> 
> ==========>
>  kernel/locking/rtmutex_common.h | 1 +
>  1 file changed, 1 insertion(+)
> 
> diff --git a/kernel/locking/rtmutex_common.h b/kernel/locking/rtmutex_common.h
> index 7431a9c..36b1ce8 100644
> --- a/kernel/locking/rtmutex_common.h
> +++ b/kernel/locking/rtmutex_common.h
> @@ -85,6 +85,7 @@ static inline int task_has_pi_waiters(struct task_struct *p)
>  static inline struct rt_mutex_waiter *
>  task_top_pi_waiter(struct task_struct *p)
>  {
> +	WARN_ON_ONCE(!p->pi_waiters_leftmost);
>  	return rb_entry(p->pi_waiters_leftmost, struct rt_mutex_waiter,
>  			pi_tree_entry);
>  }


^ permalink raw reply	[flat|nested] 28+ messages in thread

end of thread, other threads:[~2014-04-24 17:52 UTC | newest]

Thread overview: 28+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2014-04-09 19:19 [RFC PATCH RT] rwsem: The return of multi-reader PI rwsems Steven Rostedt
2014-04-10 14:18 ` Mike Galbraith
2014-04-10 14:28   ` Mike Galbraith
2014-04-10 14:32     ` Steven Rostedt
2014-04-11  2:50     ` Mike Galbraith
2014-04-11  3:25       ` Steven Rostedt
2014-04-11  3:52         ` Mike Galbraith
2014-04-11  4:25           ` Mike Galbraith
2014-04-10 14:44 ` Clark Williams
2014-04-10 15:01   ` Steven Rostedt
2014-04-10 15:03   ` Sebastian Andrzej Siewior
2014-04-10 15:36     ` Peter Zijlstra
2014-04-10 19:17       ` Steven Rostedt
2014-04-10 20:48         ` Peter Zijlstra
2014-04-10 21:30         ` Paul E. McKenney
2014-04-10 22:02           ` Steven Rostedt
2014-04-10 15:38     ` Steven Rostedt
2014-04-11 21:39   ` Daniel Bristot de Oliveira
2014-04-10 17:42 ` [RFC PATCH RT V2] " Steven Rostedt
2014-04-11  2:35   ` [RFC PATCH RT V3] " Steven Rostedt
2014-04-11 12:47     ` Carsten Emde
2014-04-11 13:25       ` Steven Rostedt
2014-04-17 23:26         ` [RFC PATCH RT V4] " Steven Rostedt
2014-04-18  8:19           ` Ingo Molnar
2014-04-24 17:52             ` Steven Rostedt
2014-04-14  9:55 ` [RFC PATCH RT] " Ingo Molnar
2014-04-14 13:34   ` Steven Rostedt
2014-04-14 14:08     ` Peter Zijlstra

This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.