From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: (majordomo@vger.kernel.org) by vger.kernel.org via listexpand id S1758432AbWK3B5X (ORCPT ); Wed, 29 Nov 2006 20:57:23 -0500 Received: (majordomo@vger.kernel.org) by vger.kernel.org id S1758616AbWK3B5W (ORCPT ); Wed, 29 Nov 2006 20:57:22 -0500 Received: from host-233-54.several.ru ([213.234.233.54]:50378 "EHLO mail.screens.ru") by vger.kernel.org with ESMTP id S1758432AbWK3B5U (ORCPT ); Wed, 29 Nov 2006 20:57:20 -0500 Date: Thu, 30 Nov 2006 04:57:14 +0300 From: Oleg Nesterov To: Andrew Morton , Jens Axboe Cc: "Paul E. McKenney" , Alan Stern , Josh Triplett , linux-kernel@vger.kernel.org Subject: Re: [RFC, PATCH 1/2] qrcu: "quick" srcu implementation Message-ID: <20061130015714.GC1350@oleg> References: <20061129235303.GA1118@oleg> Mime-Version: 1.0 Content-Type: text/plain; charset=us-ascii Content-Disposition: inline In-Reply-To: <20061129235303.GA1118@oleg> User-Agent: Mutt/1.5.11 Sender: linux-kernel-owner@vger.kernel.org X-Mailing-List: linux-kernel@vger.kernel.org (the same patch + comments from Paul) [RFC, PATCH 1/2] qrcu: "quick" srcu implementation Very much based on ideas, corrections, and patient explanations from Alan and Paul. The current srcu implementation is very good for readers, lock/unlock are extremely cheap. But for that reason it is not possible to avoid synchronize_sched() and polling in synchronize_srcu(). Jens Axboe wrote: > > It works for me, but the overhead is still large. Before it would take > 8-12 jiffies for a synchronize_srcu() to complete without there actually > being any reader locks active, now it takes 2-3 jiffies. So it's > definitely faster, and as suspected the loss of two of three > synchronize_sched() cut down the overhead to a third. 'qrcu' behaves the same as srcu but optimized for writers. The fast path for synchronize_qrcu() is mutex_lock() + atomic_read() + mutex_unlock(). The slow path is __wait_event(), no polling. However, the reader does atomic inc/dec on lock/unlock, and the counters are not per-cpu. Also, unlike srcu, qrcu read lock/unlock can be used in interrupt context, and 'qrcu_struct' can be compile-time initialized. See also (a long) discussion: http://marc.theaimsgroup.com/?t=116370857600003 Signed-off-by: Oleg Nesterov --- 19-rc6/include/linux/srcu.h~1_qrcu 2006-10-22 18:24:03.000000000 +0400 +++ 19-rc6/include/linux/srcu.h 2006-11-30 04:32:42.000000000 +0300 @@ -27,6 +27,8 @@ #ifndef _LINUX_SRCU_H #define _LINUX_SRCU_H +#include + struct srcu_struct_array { int c[2]; }; @@ -50,4 +52,32 @@ void srcu_read_unlock(struct srcu_struct void synchronize_srcu(struct srcu_struct *sp); long srcu_batches_completed(struct srcu_struct *sp); +/* + * fully compatible with srcu, but optimized for writers. + */ + +struct qrcu_struct { + int completed; + atomic_t ctr[2]; + wait_queue_head_t wq; + struct mutex mutex; +}; + +int init_qrcu_struct(struct qrcu_struct *qp); +int qrcu_read_lock(struct qrcu_struct *qp); +void qrcu_read_unlock(struct qrcu_struct *qp, int idx); +void synchronize_qrcu(struct qrcu_struct *qp); + +/** + * cleanup_qrcu_struct - deconstruct a quick-RCU structure + * @qp: structure to clean up. + * + * Must invoke this after you are finished using a given qrcu_struct that + * was initialized via init_qrcu_struct(). We reserve the right to + * leak memory should you fail to do this! + */ +static inline void cleanup_qrcu_struct(struct qrcu_struct *qp) +{ +} + #endif --- 19-rc6/kernel/srcu.c~1_qrcu 2006-10-22 18:24:03.000000000 +0400 +++ 19-rc6/kernel/srcu.c 2006-11-30 04:39:53.000000000 +0300 @@ -256,3 +256,94 @@ EXPORT_SYMBOL_GPL(srcu_read_unlock); EXPORT_SYMBOL_GPL(synchronize_srcu); EXPORT_SYMBOL_GPL(srcu_batches_completed); EXPORT_SYMBOL_GPL(srcu_readers_active); + +/** + * init_qrcu_struct - initialize a quick-RCU structure. + * @qp: structure to initialize. + * + * Must invoke this on a given qrcu_struct before passing that qrcu_struct + * to any other function. Each qrcu_struct represents a separate domain + * of QRCU protection. + */ +int init_qrcu_struct(struct qrcu_struct *qp) +{ + qp->completed = 0; + atomic_set(qp->ctr + 0, 1); + atomic_set(qp->ctr + 1, 0); + init_waitqueue_head(&qp->wq); + mutex_init(&qp->mutex); + + return 0; +} + +/** + * qrcu_read_lock - register a new reader for an QRCU-protected structure. + * @qp: qrcu_struct in which to register the new reader. + * + * Counts the new reader in the appropriate element of the qrcu_struct. + * Returns an index that must be passed to the matching qrcu_read_unlock(). + */ +int qrcu_read_lock(struct qrcu_struct *qp) +{ + for (;;) { + int idx = qp->completed & 0x1; + if (likely(atomic_inc_not_zero(qp->ctr + idx))) + return idx; + } +} + +/** + * qrcu_read_unlock - unregister a old reader from an QRCU-protected structure. + * @qp: qrcu_struct in which to unregister the old reader. + * @idx: return value from corresponding qrcu_read_lock(). + * + * Removes the count for the old reader from the appropriate element of + * the qrcu_struct. + */ +void qrcu_read_unlock(struct qrcu_struct *qp, int idx) +{ + if (atomic_dec_and_test(qp->ctr + idx)) + wake_up(&qp->wq); +} + +/** + * synchronize_qrcu - wait for prior QRCU read-side critical-section completion + * @qp: qrcu_struct with which to synchronize. + * + * Flip the completed counter, and wait for the old count to drain to zero. + * As with classic RCU, the updater must use some separate means of + * synchronizing concurrent updates. Can block; must be called from + * process context. + * + * Note that it is illegal to call synchronize_qrcu() from the corresponding + * QRCU read-side critical section; doing so will result in deadlock. + * However, it is perfectly legal to call synchronize_qrcu() on one + * qrcu_struct from some other qrcu_struct's read-side critical section. + */ +void synchronize_qrcu(struct qrcu_struct *qp) +{ + int idx; + + smp_mb(); + mutex_lock(&qp->mutex); + + idx = qp->completed & 0x1; + if (atomic_read(qp->ctr + idx) == 1) + goto out; + + atomic_inc(qp->ctr + (idx ^ 0x1)); + /* Reduce the likelihood that qrcu_read_lock() will loop */ + smp_mb__after_atomic_inc(); + qp->completed++; + + atomic_dec(qp->ctr + idx); + __wait_event(qp->wq, !atomic_read(qp->ctr + idx)); +out: + mutex_unlock(&qp->mutex); + smp_mb(); +} + +EXPORT_SYMBOL_GPL(init_qrcu_struct); +EXPORT_SYMBOL_GPL(qrcu_read_lock); +EXPORT_SYMBOL_GPL(qrcu_read_unlock); +EXPORT_SYMBOL_GPL(synchronize_qrcu);