* [Qemu-devel] [PATCH 0/3] v11: Threadlets: A generic task offloading framework
@ 2010-11-15 17:53 Arun R Bharadwaj
2010-11-15 17:53 ` [Qemu-devel] [PATCH 1/3] Make paio subsystem use threadlets infrastructure Arun R Bharadwaj
` (2 more replies)
0 siblings, 3 replies; 6+ messages in thread
From: Arun R Bharadwaj @ 2010-11-15 17:53 UTC (permalink / raw)
To: qemu-devel; +Cc: kwolf
Hi,
This is the v11 of the refactored patch-series to have a generic
asynchronous task offloading framework (called threadlets)
within qemu.
I have run KVM autotest suite with this patch. This test suite
ran successfully for the following tests:
-connecthon
-ebizzy
-dbench
-fsstress
-disktest
-cpu_hotplug
-hackbench
-sleeptest
Changelog:
* Moved the qemu_cond_broadcast to the right place.
* Removed unnecessary extern qualifiers.
The following series implements...
---
Arun R Bharadwaj (1):
Move threadlets infrastructure to qemu-threadlets.c
Gautham R Shenoy (2):
Make paio subsystem use threadlets infrastructure
Add helper functions to enable virtio-9p make use of the threadlets
Makefile.objs | 3 -
configure | 2
docs/async-support.txt | 141 ++++++++++++++++++++++++++++++
hw/virtio-9p.c | 164 ++++++++++++++++++++++++++++++++++-
posix-aio-compat.c | 227 +++++++++++-------------------------------------
qemu-threadlets.c | 189 ++++++++++++++++++++++++++++++++++++++++
qemu-threadlets.h | 47 ++++++++++
vl.c | 3 +
8 files changed, 599 insertions(+), 177 deletions(-)
create mode 100644 docs/async-support.txt
create mode 100644 qemu-threadlets.c
create mode 100644 qemu-threadlets.h
--
arun
^ permalink raw reply [flat|nested] 6+ messages in thread
* [Qemu-devel] [PATCH 1/3] Make paio subsystem use threadlets infrastructure
2010-11-15 17:53 [Qemu-devel] [PATCH 0/3] v11: Threadlets: A generic task offloading framework Arun R Bharadwaj
@ 2010-11-15 17:53 ` Arun R Bharadwaj
2010-11-15 21:13 ` Anthony Liguori
2010-11-15 17:53 ` [Qemu-devel] [PATCH 2/3] Move threadlets infrastructure to qemu-threadlets.c Arun R Bharadwaj
2010-11-15 17:54 ` [Qemu-devel] [PATCH 3/3] Add helper functions to enable virtio-9p make use of the threadlets Arun R Bharadwaj
2 siblings, 1 reply; 6+ messages in thread
From: Arun R Bharadwaj @ 2010-11-15 17:53 UTC (permalink / raw)
To: qemu-devel; +Cc: kwolf
From: Gautham R Shenoy <gautham.shenoy@gmail.com>
This patch creates a generic asynchronous-task-offloading infrastructure named
threadlets.
The patch creates a global queue on-to which subsystems can queue their tasks to
be executed asynchronously.
The patch also provides API's that allow a subsystem to create a private queue
with an associated pool of threads.
The patch has been tested with fstress.
Signed-off-by: Arun R Bharadwaj <arun@linux.vnet.ibm.com>
Signed-off-by: Aneesh Kumar K.V <aneesh.kumar@linux.vnet.ibm.com>
Signed-off-by: Gautham R Shenoy <gautham.shenoy@gmail.com>
Signed-off-by: Sripathi Kodi <sripathik@in.ibm.com>
---
Makefile.objs | 2
configure | 2
posix-aio-compat.c | 354 +++++++++++++++++++++++++++++++---------------------
3 files changed, 211 insertions(+), 147 deletions(-)
diff --git a/Makefile.objs b/Makefile.objs
index cd5a24b..3b7ec27 100644
--- a/Makefile.objs
+++ b/Makefile.objs
@@ -9,6 +9,7 @@ qobject-obj-y += qerror.o
block-obj-y = cutils.o cache-utils.o qemu-malloc.o qemu-option.o module.o
block-obj-y += nbd.o block.o aio.o aes.o osdep.o qemu-config.o
+block-obj-$(CONFIG_POSIX) += qemu-thread.o
block-obj-$(CONFIG_POSIX) += posix-aio-compat.o
block-obj-$(CONFIG_LINUX_AIO) += linux-aio.o
@@ -124,7 +125,6 @@ endif
common-obj-y += $(addprefix ui/, $(ui-obj-y))
common-obj-y += iov.o acl.o
-common-obj-$(CONFIG_THREAD) += qemu-thread.o
common-obj-y += notify.o event_notifier.o
common-obj-y += qemu-timer.o
diff --git a/configure b/configure
index a079a49..addf733 100755
--- a/configure
+++ b/configure
@@ -2456,7 +2456,6 @@ if test "$vnc_png" != "no" ; then
fi
if test "$vnc_thread" != "no" ; then
echo "CONFIG_VNC_THREAD=y" >> $config_host_mak
- echo "CONFIG_THREAD=y" >> $config_host_mak
fi
if test "$fnmatch" = "yes" ; then
echo "CONFIG_FNMATCH=y" >> $config_host_mak
@@ -2534,7 +2533,6 @@ if test "$xen" = "yes" ; then
fi
if test "$io_thread" = "yes" ; then
echo "CONFIG_IOTHREAD=y" >> $config_host_mak
- echo "CONFIG_THREAD=y" >> $config_host_mak
fi
if test "$linux_aio" = "yes" ; then
echo "CONFIG_LINUX_AIO=y" >> $config_host_mak
diff --git a/posix-aio-compat.c b/posix-aio-compat.c
index 7b862b5..e1812fc 100644
--- a/posix-aio-compat.c
+++ b/posix-aio-compat.c
@@ -29,7 +29,33 @@
#include "block_int.h"
#include "block/raw-posix-aio.h"
+#include "qemu-thread.h"
+#define MAX_GLOBAL_THREADS 64
+#define MIN_GLOBAL_THREADS 8
+
+static QemuMutex aiocb_mutex;
+static QemuCond aiocb_completion;
+
+typedef struct ThreadletQueue
+{
+ QemuMutex lock;
+ QemuCond cond;
+ int max_threads;
+ int min_threads;
+ int cur_threads;
+ int idle_threads;
+ QTAILQ_HEAD(, ThreadletWork) request_list;
+} ThreadletQueue;
+
+typedef struct ThreadletWork
+{
+ QTAILQ_ENTRY(ThreadletWork) node;
+ void (*func)(struct ThreadletWork *work);
+} ThreadletWork;
+
+static ThreadletQueue globalqueue;
+static int globalqueue_init;
struct qemu_paiocb {
BlockDriverAIOCB common;
@@ -44,13 +70,12 @@ struct qemu_paiocb {
int ev_signo;
off_t aio_offset;
- QTAILQ_ENTRY(qemu_paiocb) node;
int aio_type;
ssize_t ret;
- int active;
struct qemu_paiocb *next;
int async_context_id;
+ ThreadletWork work;
};
typedef struct PosixAioState {
@@ -58,64 +83,169 @@ typedef struct PosixAioState {
struct qemu_paiocb *first_aio;
} PosixAioState;
+static void *threadlet_worker(void *data)
+{
+ ThreadletQueue *queue = data;
-static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
-static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
-static pthread_t thread_id;
-static pthread_attr_t attr;
-static int max_threads = 64;
-static int cur_threads = 0;
-static int idle_threads = 0;
-static QTAILQ_HEAD(, qemu_paiocb) request_list;
+ qemu_mutex_lock(&queue->lock);
+ while (1) {
+ ThreadletWork *work;
+ int ret = 0;
+
+ while (QTAILQ_EMPTY(&queue->request_list) &&
+ (ret != ETIMEDOUT)) {
+ /* wait for cond to be signalled or broadcast for 1000s */
+ ret = qemu_cond_timedwait((&queue->cond),
+ &(queue->lock), 10*100000);
+ }
-#ifdef CONFIG_PREADV
-static int preadv_present = 1;
-#else
-static int preadv_present = 0;
-#endif
+ assert(queue->idle_threads != 0);
+ if (QTAILQ_EMPTY(&queue->request_list)) {
+ if (queue->cur_threads > queue->min_threads) {
+ /* We retain the minimum number of threads */
+ break;
+ }
+ } else {
+ work = QTAILQ_FIRST(&queue->request_list);
+ QTAILQ_REMOVE(&queue->request_list, work, node);
-static void die2(int err, const char *what)
-{
- fprintf(stderr, "%s failed: %s\n", what, strerror(err));
- abort();
+ queue->idle_threads--;
+ qemu_mutex_unlock(&queue->lock);
+
+ /* execute the work function */
+ work->func(work);
+
+ qemu_mutex_lock(&queue->lock);
+ queue->idle_threads++;
+ }
+ }
+
+ queue->idle_threads--;
+ queue->cur_threads--;
+ qemu_mutex_unlock(&queue->lock);
+
+ return NULL;
}
-static void die(const char *what)
+static void spawn_threadlet(ThreadletQueue *queue)
{
- die2(errno, what);
+ QemuThread thread;
+
+ queue->cur_threads++;
+ queue->idle_threads++;
+
+ qemu_thread_create(&thread, threadlet_worker, queue);
}
-static void mutex_lock(pthread_mutex_t *mutex)
+/**
+ * submit_work_to_queue: Submit a new task to a private queue to be
+ * executed asynchronously.
+ * @queue: Per-subsystem private queue to which the new task needs
+ * to be submitted.
+ * @work: Contains information about the task that needs to be submitted.
+ */
+static void submit_work_to_queue(ThreadletQueue *queue, ThreadletWork *work)
{
- int ret = pthread_mutex_lock(mutex);
- if (ret) die2(ret, "pthread_mutex_lock");
+ qemu_mutex_lock(&queue->lock);
+ if (queue->idle_threads == 0 && queue->cur_threads < queue->max_threads) {
+ spawn_threadlet(queue);
+ } else {
+ qemu_cond_signal(&queue->cond);
+ }
+ QTAILQ_INSERT_TAIL(&queue->request_list, work, node);
+ qemu_mutex_unlock(&queue->lock);
}
-static void mutex_unlock(pthread_mutex_t *mutex)
+static void threadlet_queue_init(ThreadletQueue *queue,
+ int max_threads, int min_threads);
+
+/**
+ * submit_work: Submit to the global queue a new task to be executed
+ * asynchronously.
+ * @work: Contains information about the task that needs to be submitted.
+ */
+static void submit_work(ThreadletWork *work)
{
- int ret = pthread_mutex_unlock(mutex);
- if (ret) die2(ret, "pthread_mutex_unlock");
+ if (!globalqueue_init) {
+ threadlet_queue_init(&globalqueue, MAX_GLOBAL_THREADS,
+ MIN_GLOBAL_THREADS);
+ globalqueue_init = 1;
+ }
+
+ submit_work_to_queue(&globalqueue, work);
}
-static int cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex,
- struct timespec *ts)
+/**
+ * dequeue_work_on_queue: Cancel a task queued on a Queue.
+ * @queue: The queue containing the task to be cancelled.
+ * @work: Contains the information of the task that needs to be cancelled.
+ *
+ * Returns: 0 if the task is successfully cancelled.
+ * 1 otherwise.
+ */
+static int dequeue_work_on_queue(ThreadletQueue *queue, ThreadletWork *work)
{
- int ret = pthread_cond_timedwait(cond, mutex, ts);
- if (ret && ret != ETIMEDOUT) die2(ret, "pthread_cond_timedwait");
+ ThreadletWork *ret_work;
+ int ret = 1;
+
+ qemu_mutex_lock(&queue->lock);
+ QTAILQ_FOREACH(ret_work, &(queue->request_list), node) {
+ if (ret_work == work) {
+ QTAILQ_REMOVE(&queue->request_list, ret_work, node);
+ ret = 0;
+ break;
+ }
+ }
+ qemu_mutex_unlock(&queue->lock);
+
return ret;
}
-static void cond_signal(pthread_cond_t *cond)
+/**
+ * dequeue_work: Cancel a task queued on the global queue.
+ * @work: Contains the information of the task that needs to be cancelled.
+ *
+ * Returns: 0 if the task is successfully cancelled.
+ * 1 otherwise.
+ */
+static int dequeue_work(ThreadletWork *work)
+{
+ return dequeue_work_on_queue(&globalqueue, work);
+}
+
+/**
+ * threadlet_queue_init: Initialize a threadlet queue.
+ * @queue: The threadlet queue to be initialized.
+ * @max_threads: Maximum number of threads processing the queue.
+ * @min_threads: Minimum number of threads processing the queue.
+ */
+static void threadlet_queue_init(ThreadletQueue *queue,
+ int max_threads, int min_threads)
+{
+ queue->cur_threads = 0;
+ queue->idle_threads = 0;
+ queue->max_threads = max_threads;
+ queue->min_threads = min_threads;
+ QTAILQ_INIT(&queue->request_list);
+ qemu_mutex_init(&queue->lock);
+ qemu_cond_init(&queue->cond);
+}
+
+#ifdef CONFIG_PREADV
+static int preadv_present = 1;
+#else
+static int preadv_present;
+#endif
+
+static void die2(int err, const char *what)
{
- int ret = pthread_cond_signal(cond);
- if (ret) die2(ret, "pthread_cond_signal");
+ fprintf(stderr, "%s failed: %s\n", what, strerror(err));
+ abort();
}
-static void thread_create(pthread_t *thread, pthread_attr_t *attr,
- void *(*start_routine)(void*), void *arg)
+static void die(const char *what)
{
- int ret = pthread_create(thread, attr, start_routine, arg);
- if (ret) die2(ret, "pthread_create");
+ die2(errno, what);
}
static ssize_t handle_aiocb_ioctl(struct qemu_paiocb *aiocb)
@@ -301,106 +431,58 @@ static ssize_t handle_aiocb_rw(struct qemu_paiocb *aiocb)
return nbytes;
}
-static void *aio_thread(void *unused)
+static void aio_thread(ThreadletWork *work)
{
pid_t pid;
+ struct qemu_paiocb *aiocb = container_of(work, struct qemu_paiocb, work);
+ ssize_t ret = 0;
pid = getpid();
- while (1) {
- struct qemu_paiocb *aiocb;
- ssize_t ret = 0;
- qemu_timeval tv;
- struct timespec ts;
-
- qemu_gettimeofday(&tv);
- ts.tv_sec = tv.tv_sec + 10;
- ts.tv_nsec = 0;
-
- mutex_lock(&lock);
-
- while (QTAILQ_EMPTY(&request_list) &&
- !(ret == ETIMEDOUT)) {
- ret = cond_timedwait(&cond, &lock, &ts);
- }
-
- if (QTAILQ_EMPTY(&request_list))
- break;
-
- aiocb = QTAILQ_FIRST(&request_list);
- QTAILQ_REMOVE(&request_list, aiocb, node);
- aiocb->active = 1;
- idle_threads--;
- mutex_unlock(&lock);
-
- switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
- case QEMU_AIO_READ:
- case QEMU_AIO_WRITE:
- ret = handle_aiocb_rw(aiocb);
- break;
- case QEMU_AIO_FLUSH:
- ret = handle_aiocb_flush(aiocb);
- break;
- case QEMU_AIO_IOCTL:
- ret = handle_aiocb_ioctl(aiocb);
- break;
- default:
- fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
- ret = -EINVAL;
- break;
- }
-
- mutex_lock(&lock);
- aiocb->ret = ret;
- idle_threads++;
- mutex_unlock(&lock);
-
- if (kill(pid, aiocb->ev_signo)) die("kill failed");
+ switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
+ case QEMU_AIO_READ:
+ case QEMU_AIO_WRITE:
+ ret = handle_aiocb_rw(aiocb);
+ break;
+ case QEMU_AIO_FLUSH:
+ ret = handle_aiocb_flush(aiocb);
+ break;
+ case QEMU_AIO_IOCTL:
+ ret = handle_aiocb_ioctl(aiocb);
+ break;
+ default:
+ fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
+ ret = -EINVAL;
+ break;
}
- idle_threads--;
- cur_threads--;
- mutex_unlock(&lock);
-
- return NULL;
-}
-
-static void spawn_thread(void)
-{
- sigset_t set, oldset;
-
- cur_threads++;
- idle_threads++;
+ qemu_mutex_lock(&aiocb_mutex);
+ aiocb->ret = ret;
+ qemu_cond_broadcast(&aiocb_completion);
+ qemu_mutex_unlock(&aiocb_mutex);
- /* block all signals */
- if (sigfillset(&set)) die("sigfillset");
- if (sigprocmask(SIG_SETMASK, &set, &oldset)) die("sigprocmask");
-
- thread_create(&thread_id, &attr, aio_thread, NULL);
-
- if (sigprocmask(SIG_SETMASK, &oldset, NULL)) die("sigprocmask restore");
+ if (kill(pid, aiocb->ev_signo)) {
+ die("kill failed");
+ }
}
static void qemu_paio_submit(struct qemu_paiocb *aiocb)
{
+ qemu_mutex_lock(&aiocb_mutex);
aiocb->ret = -EINPROGRESS;
- aiocb->active = 0;
- mutex_lock(&lock);
- if (idle_threads == 0 && cur_threads < max_threads)
- spawn_thread();
- QTAILQ_INSERT_TAIL(&request_list, aiocb, node);
- mutex_unlock(&lock);
- cond_signal(&cond);
+ qemu_mutex_unlock(&aiocb_mutex);
+
+ aiocb->work.func = aio_thread;
+ submit_work(&aiocb->work);
}
static ssize_t qemu_paio_return(struct qemu_paiocb *aiocb)
{
ssize_t ret;
- mutex_lock(&lock);
+ qemu_mutex_lock(&aiocb_mutex);
ret = aiocb->ret;
- mutex_unlock(&lock);
-
+ qemu_mutex_unlock(&aiocb_mutex);
return ret;
}
@@ -534,22 +616,14 @@ static void paio_remove(struct qemu_paiocb *acb)
static void paio_cancel(BlockDriverAIOCB *blockacb)
{
struct qemu_paiocb *acb = (struct qemu_paiocb *)blockacb;
- int active = 0;
-
- mutex_lock(&lock);
- if (!acb->active) {
- QTAILQ_REMOVE(&request_list, acb, node);
- acb->ret = -ECANCELED;
- } else if (acb->ret == -EINPROGRESS) {
- active = 1;
- }
- mutex_unlock(&lock);
- if (active) {
- /* fail safe: if the aio could not be canceled, we wait for
- it */
- while (qemu_paio_error(acb) == EINPROGRESS)
- ;
+ if (dequeue_work(&acb->work) != 0) {
+ /* Wait for running work item to complete */
+ qemu_mutex_lock(&aiocb_mutex);
+ while (acb->ret == -EINPROGRESS) {
+ qemu_cond_wait(&aiocb_completion, &aiocb_mutex);
+ }
+ qemu_mutex_unlock(&aiocb_mutex);
}
paio_remove(acb);
@@ -618,11 +692,13 @@ int paio_init(void)
struct sigaction act;
PosixAioState *s;
int fds[2];
- int ret;
if (posix_aio_state)
return 0;
+ qemu_mutex_init(&aiocb_mutex);
+ qemu_cond_init(&aiocb_completion);
+
s = qemu_malloc(sizeof(PosixAioState));
sigfillset(&act.sa_mask);
@@ -645,16 +721,6 @@ int paio_init(void)
qemu_aio_set_fd_handler(s->rfd, posix_aio_read, NULL, posix_aio_flush,
posix_aio_process_queue, s);
- ret = pthread_attr_init(&attr);
- if (ret)
- die2(ret, "pthread_attr_init");
-
- ret = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
- if (ret)
- die2(ret, "pthread_attr_setdetachstate");
-
- QTAILQ_INIT(&request_list);
-
posix_aio_state = s;
return 0;
}
^ permalink raw reply related [flat|nested] 6+ messages in thread
* [Qemu-devel] [PATCH 2/3] Move threadlets infrastructure to qemu-threadlets.c
2010-11-15 17:53 [Qemu-devel] [PATCH 0/3] v11: Threadlets: A generic task offloading framework Arun R Bharadwaj
2010-11-15 17:53 ` [Qemu-devel] [PATCH 1/3] Make paio subsystem use threadlets infrastructure Arun R Bharadwaj
@ 2010-11-15 17:53 ` Arun R Bharadwaj
2010-11-15 17:54 ` [Qemu-devel] [PATCH 3/3] Add helper functions to enable virtio-9p make use of the threadlets Arun R Bharadwaj
2 siblings, 0 replies; 6+ messages in thread
From: Arun R Bharadwaj @ 2010-11-15 17:53 UTC (permalink / raw)
To: qemu-devel; +Cc: kwolf
The reason for creating this generic infrastructure is so that other subsystems,
such as virtio-9p could make use of it for offloading tasks that could block.
Signed-off-by: Arun R Bharadwaj <arun@linux.vnet.ibm.com>
Signed-off-by: Aneesh Kumar K.V <aneesh.kumar@linux.vnet.ibm.com>
Signed-off-by: Gautham R Shenoy <gautham.shenoy@gmail.com>
Signed-off-by: Sripathi Kodi <sripathik@in.ibm.com>
Acked-by: Stefan Hajnoczi <stefanha@linux.vnet.ibm.com>
---
Makefile.objs | 1
docs/async-support.txt | 141 +++++++++++++++++++++++++++++++++++++++
posix-aio-compat.c | 173 ------------------------------------------------
qemu-threadlets.c | 168 +++++++++++++++++++++++++++++++++++++++++++++++
qemu-threadlets.h | 46 +++++++++++++
5 files changed, 357 insertions(+), 172 deletions(-)
create mode 100644 docs/async-support.txt
create mode 100644 qemu-threadlets.c
create mode 100644 qemu-threadlets.h
diff --git a/Makefile.objs b/Makefile.objs
index 3b7ec27..2cf8aba 100644
--- a/Makefile.objs
+++ b/Makefile.objs
@@ -10,6 +10,7 @@ qobject-obj-y += qerror.o
block-obj-y = cutils.o cache-utils.o qemu-malloc.o qemu-option.o module.o
block-obj-y += nbd.o block.o aio.o aes.o osdep.o qemu-config.o
block-obj-$(CONFIG_POSIX) += qemu-thread.o
+block-obj-$(CONFIG_POSIX) += qemu-threadlets.o
block-obj-$(CONFIG_POSIX) += posix-aio-compat.o
block-obj-$(CONFIG_LINUX_AIO) += linux-aio.o
diff --git a/docs/async-support.txt b/docs/async-support.txt
new file mode 100644
index 0000000..9f22b9a
--- /dev/null
+++ b/docs/async-support.txt
@@ -0,0 +1,141 @@
+== How to use the threadlets infrastructure supported in Qemu ==
+
+== Threadlets ==
+
+Q.1: What are threadlets ?
+A.1: Threadlets is an infrastructure within QEMU that allows other subsystems
+ to offload possibly blocking work to a queue to be processed by a pool
+ of threads asynchronously.
+
+Q.2: When would one want to use threadlets ?
+A.2: Threadlets are useful when there are operations that can be performed
+ outside the context of the VCPU/IO threads inorder to free these latter
+ to service any other guest requests.
+
+Q.3: I have some work that can be executed in an asynchronous context. How
+ should I go about it ?
+A.3: One could follow the steps listed below:
+
+ - Define a function which would do the asynchronous work.
+ static void my_threadlet_func(ThreadletWork *work)
+ {
+ }
+
+ - Declare an object of type ThreadletWork;
+ ThreadletWork work;
+
+
+ - Assign a value to the "func" member of ThreadletWork object.
+ work.func = my_threadlet_func;
+
+ - Submit the threadlet to the global queue.
+ submit_threadletwork(&work);
+
+ - Continue servicing some other guest operations.
+
+Q.4: I want to my_threadlet_func to access some non-global data. How do I do
+ that ?
+A.4: Suppose you want my_threadlet_func to access some non-global data-object
+ of type myPrivateData. In that case one could follow the following steps.
+
+ - Define a member of the type ThreadletWork within myPrivateData.
+ typedef struct MyPrivateData {
+ ...;
+ ...;
+ ...;
+ ThreadletWork work;
+ } MyPrivateData;
+
+ MyPrivateData my_data;
+
+ - Initialize myData.work as described in A.3
+ myData.work.func = my_threadlet_func;
+ submit_threadletwork(&myData.work);
+
+ - Access the myData object inside my_threadlet_func() using container_of
+ primitive
+ static void my_threadlet_func(ThreadletWork *work)
+ {
+ myPrivateData *mydata_ptr;
+ mydata_ptr = container_of(work, myPrivateData, work);
+
+ /* mydata_ptr now points to myData object */
+ }
+
+Q.5: Are there any precautions one must take while sharing data with the
+ Asynchronous thread-pool ?
+A.5: Yes, make sure that the helper function of the type my_threadlet_func()
+ does not access/modify data when it can be accessed or modified in the
+ context of VCPU thread or IO thread. This is because the asynchronous
+ threads in the pool can run in parallel with the VCPU/IOThreads as shown
+ in the figure.
+
+ A typical workflow is as follows:
+
+ VCPU/IOThread
+ |
+ | (1)
+ |
+ V
+ Offload work (2)
+ |-------> to threadlets -----------------------------> Helper thread
+ | | |
+ | | |
+ | | (3) | (4)
+ | | |
+ | Handle other Guest requests |
+ | | |
+ | | V
+ | | (3) Signal the I/O Thread
+ |(6) | |
+ | | /
+ | | /
+ | V /
+ | Do the post <---------------------------------/
+ | processing (5)
+ | |
+ | | (6)
+ | V
+ |-Yes------ More async work?
+ |
+ | (7)
+ No
+ |
+ |
+ .
+ .
+
+ Hence one needs to make sure that in the steps (3) and (4) which run in
+ parallel, any global data is accessed within only one context.
+
+Q.6: I have queued a threadlet which I want to cancel. How do I do that ?
+A.6: Threadlets framework provides the API cancel_threadlet:
+ - int cancel_threadletwork(ThreadletWork *work)
+
+ The API scans the ThreadletQueue to see if (work) is present. If it finds
+ work, it'll dequeue work and return 0.
+
+ On the other hand, if it does not find the (work) in the ThreadletQueue,
+ then it'll return 1. This can imply two things. Either the work is being
+ processed by one of the helper threads or it has been processed. The
+ threadlet infrastructure currently _does_not_ distinguish between these
+ two and the onus is on the caller to do that.
+
+Q.7: Apart from the global pool of threads, can I have my own private Queue ?
+A.7: Yes, the threadlets framework allows subsystems to create their own private
+ queues with associated pools of threads.
+
+ - Define a PrivateQueue
+ ThreadletQueue myQueue;
+
+ - Initialize it:
+ threadlet_queue_init(&myQueue, my_max_threads, my_min_threads);
+ where my_max_threads is the maximum number of threads that can be in the
+ thread pool and my_min_threads is the minimum number of active threads
+ that can be in the thread-pool.
+
+ - Submit work:
+ submit_threadletwork_to_queue(&myQueue, &my_work);
+
+ - Cancel work:
+ cancel_threadletwork_on_queue(&myQueue, &my_work);
diff --git a/posix-aio-compat.c b/posix-aio-compat.c
index e1812fc..d8cae6d 100644
--- a/posix-aio-compat.c
+++ b/posix-aio-compat.c
@@ -29,34 +29,11 @@
#include "block_int.h"
#include "block/raw-posix-aio.h"
-#include "qemu-thread.h"
-
-#define MAX_GLOBAL_THREADS 64
-#define MIN_GLOBAL_THREADS 8
+#include "qemu-threadlets.h"
static QemuMutex aiocb_mutex;
static QemuCond aiocb_completion;
-typedef struct ThreadletQueue
-{
- QemuMutex lock;
- QemuCond cond;
- int max_threads;
- int min_threads;
- int cur_threads;
- int idle_threads;
- QTAILQ_HEAD(, ThreadletWork) request_list;
-} ThreadletQueue;
-
-typedef struct ThreadletWork
-{
- QTAILQ_ENTRY(ThreadletWork) node;
- void (*func)(struct ThreadletWork *work);
-} ThreadletWork;
-
-static ThreadletQueue globalqueue;
-static int globalqueue_init;
-
struct qemu_paiocb {
BlockDriverAIOCB common;
int aio_fildes;
@@ -83,154 +60,6 @@ typedef struct PosixAioState {
struct qemu_paiocb *first_aio;
} PosixAioState;
-static void *threadlet_worker(void *data)
-{
- ThreadletQueue *queue = data;
-
- qemu_mutex_lock(&queue->lock);
- while (1) {
- ThreadletWork *work;
- int ret = 0;
-
- while (QTAILQ_EMPTY(&queue->request_list) &&
- (ret != ETIMEDOUT)) {
- /* wait for cond to be signalled or broadcast for 1000s */
- ret = qemu_cond_timedwait((&queue->cond),
- &(queue->lock), 10*100000);
- }
-
- assert(queue->idle_threads != 0);
- if (QTAILQ_EMPTY(&queue->request_list)) {
- if (queue->cur_threads > queue->min_threads) {
- /* We retain the minimum number of threads */
- break;
- }
- } else {
- work = QTAILQ_FIRST(&queue->request_list);
- QTAILQ_REMOVE(&queue->request_list, work, node);
-
- queue->idle_threads--;
- qemu_mutex_unlock(&queue->lock);
-
- /* execute the work function */
- work->func(work);
-
- qemu_mutex_lock(&queue->lock);
- queue->idle_threads++;
- }
- }
-
- queue->idle_threads--;
- queue->cur_threads--;
- qemu_mutex_unlock(&queue->lock);
-
- return NULL;
-}
-
-static void spawn_threadlet(ThreadletQueue *queue)
-{
- QemuThread thread;
-
- queue->cur_threads++;
- queue->idle_threads++;
-
- qemu_thread_create(&thread, threadlet_worker, queue);
-}
-
-/**
- * submit_work_to_queue: Submit a new task to a private queue to be
- * executed asynchronously.
- * @queue: Per-subsystem private queue to which the new task needs
- * to be submitted.
- * @work: Contains information about the task that needs to be submitted.
- */
-static void submit_work_to_queue(ThreadletQueue *queue, ThreadletWork *work)
-{
- qemu_mutex_lock(&queue->lock);
- if (queue->idle_threads == 0 && queue->cur_threads < queue->max_threads) {
- spawn_threadlet(queue);
- } else {
- qemu_cond_signal(&queue->cond);
- }
- QTAILQ_INSERT_TAIL(&queue->request_list, work, node);
- qemu_mutex_unlock(&queue->lock);
-}
-
-static void threadlet_queue_init(ThreadletQueue *queue,
- int max_threads, int min_threads);
-
-/**
- * submit_work: Submit to the global queue a new task to be executed
- * asynchronously.
- * @work: Contains information about the task that needs to be submitted.
- */
-static void submit_work(ThreadletWork *work)
-{
- if (!globalqueue_init) {
- threadlet_queue_init(&globalqueue, MAX_GLOBAL_THREADS,
- MIN_GLOBAL_THREADS);
- globalqueue_init = 1;
- }
-
- submit_work_to_queue(&globalqueue, work);
-}
-
-/**
- * dequeue_work_on_queue: Cancel a task queued on a Queue.
- * @queue: The queue containing the task to be cancelled.
- * @work: Contains the information of the task that needs to be cancelled.
- *
- * Returns: 0 if the task is successfully cancelled.
- * 1 otherwise.
- */
-static int dequeue_work_on_queue(ThreadletQueue *queue, ThreadletWork *work)
-{
- ThreadletWork *ret_work;
- int ret = 1;
-
- qemu_mutex_lock(&queue->lock);
- QTAILQ_FOREACH(ret_work, &(queue->request_list), node) {
- if (ret_work == work) {
- QTAILQ_REMOVE(&queue->request_list, ret_work, node);
- ret = 0;
- break;
- }
- }
- qemu_mutex_unlock(&queue->lock);
-
- return ret;
-}
-
-/**
- * dequeue_work: Cancel a task queued on the global queue.
- * @work: Contains the information of the task that needs to be cancelled.
- *
- * Returns: 0 if the task is successfully cancelled.
- * 1 otherwise.
- */
-static int dequeue_work(ThreadletWork *work)
-{
- return dequeue_work_on_queue(&globalqueue, work);
-}
-
-/**
- * threadlet_queue_init: Initialize a threadlet queue.
- * @queue: The threadlet queue to be initialized.
- * @max_threads: Maximum number of threads processing the queue.
- * @min_threads: Minimum number of threads processing the queue.
- */
-static void threadlet_queue_init(ThreadletQueue *queue,
- int max_threads, int min_threads)
-{
- queue->cur_threads = 0;
- queue->idle_threads = 0;
- queue->max_threads = max_threads;
- queue->min_threads = min_threads;
- QTAILQ_INIT(&queue->request_list);
- qemu_mutex_init(&queue->lock);
- qemu_cond_init(&queue->cond);
-}
-
#ifdef CONFIG_PREADV
static int preadv_present = 1;
#else
diff --git a/qemu-threadlets.c b/qemu-threadlets.c
new file mode 100644
index 0000000..23b4ecf
--- /dev/null
+++ b/qemu-threadlets.c
@@ -0,0 +1,168 @@
+/*
+ * Threadlet support for offloading tasks to be executed asynchronously
+ *
+ * Copyright IBM, Corp. 2008
+ * Copyright IBM, Corp. 2010
+ *
+ * Authors:
+ * Anthony Liguori <aliguori@us.ibm.com>
+ * Aneesh Kumar K.V <aneesh.kumar@linux.vnet.ibm.com>
+ * Gautham R Shenoy <gautham.shenoy@gmail.com>
+ * Arun R Bharadwaj <arun@linux.vnet.ibm.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2. See
+ * the COPYING file in the top-level directory.
+ */
+
+#include "qemu-threadlets.h"
+#include "osdep.h"
+
+#define MAX_GLOBAL_THREADS 64
+#define MIN_GLOBAL_THREADS 8
+static ThreadletQueue globalqueue;
+static int globalqueue_init;
+
+static void *threadlet_worker(void *data)
+{
+ ThreadletQueue *queue = data;
+
+ qemu_mutex_lock(&queue->lock);
+ while (1) {
+ ThreadletWork *work;
+ int ret = 0;
+
+ while (QTAILQ_EMPTY(&queue->request_list) &&
+ (ret != ETIMEDOUT)) {
+ /* wait for cond to be signalled or broadcast for 1000s */
+ ret = qemu_cond_timedwait((&queue->cond),
+ &(queue->lock), 10*100000);
+ }
+
+ assert(queue->idle_threads != 0);
+ if (QTAILQ_EMPTY(&queue->request_list)) {
+ if (queue->cur_threads > queue->min_threads) {
+ /* We retain the minimum number of threads */
+ break;
+ }
+ } else {
+ work = QTAILQ_FIRST(&queue->request_list);
+ QTAILQ_REMOVE(&queue->request_list, work, node);
+
+ queue->idle_threads--;
+ qemu_mutex_unlock(&queue->lock);
+
+ /* execute the work function */
+ work->func(work);
+
+ qemu_mutex_lock(&queue->lock);
+ queue->idle_threads++;
+ }
+ }
+
+ queue->idle_threads--;
+ queue->cur_threads--;
+ qemu_mutex_unlock(&queue->lock);
+
+ return NULL;
+}
+
+static void spawn_threadlet(ThreadletQueue *queue)
+{
+ QemuThread thread;
+
+ queue->cur_threads++;
+ queue->idle_threads++;
+
+ qemu_thread_create(&thread, threadlet_worker, queue);
+}
+
+/**
+ * submit_work_to_queue: Submit a new task to a private queue to be
+ * executed asynchronously.
+ * @queue: Per-subsystem private queue to which the new task needs
+ * to be submitted.
+ * @work: Contains information about the task that needs to be submitted.
+ */
+void submit_work_to_queue(ThreadletQueue *queue, ThreadletWork *work)
+{
+ qemu_mutex_lock(&queue->lock);
+ if (queue->idle_threads == 0 && queue->cur_threads < queue->max_threads) {
+ spawn_threadlet(queue);
+ } else {
+ qemu_cond_signal(&queue->cond);
+ }
+ QTAILQ_INSERT_TAIL(&queue->request_list, work, node);
+ qemu_mutex_unlock(&queue->lock);
+}
+
+/**
+ * submit_work: Submit to the global queue a new task to be executed
+ * asynchronously.
+ * @work: Contains information about the task that needs to be submitted.
+ */
+void submit_work(ThreadletWork *work)
+{
+ if (!globalqueue_init) {
+ threadlet_queue_init(&globalqueue, MAX_GLOBAL_THREADS,
+ MIN_GLOBAL_THREADS);
+ globalqueue_init = 1;
+ }
+
+ submit_work_to_queue(&globalqueue, work);
+}
+
+/**
+ * dequeue_work_on_queue: Cancel a task queued on a Queue.
+ * @queue: The queue containing the task to be cancelled.
+ * @work: Contains the information of the task that needs to be cancelled.
+ *
+ * Returns: 0 if the task is successfully cancelled.
+ * 1 otherwise.
+ */
+int dequeue_work_on_queue(ThreadletQueue *queue, ThreadletWork *work)
+{
+ ThreadletWork *ret_work;
+ int ret = 1;
+
+ qemu_mutex_lock(&queue->lock);
+ QTAILQ_FOREACH(ret_work, &(queue->request_list), node) {
+ if (ret_work == work) {
+ QTAILQ_REMOVE(&queue->request_list, ret_work, node);
+ ret = 0;
+ break;
+ }
+ }
+ qemu_mutex_unlock(&queue->lock);
+
+ return ret;
+}
+
+/**
+ * dequeue_work: Cancel a task queued on the global queue.
+ * @work: Contains the information of the task that needs to be cancelled.
+ *
+ * Returns: 0 if the task is successfully cancelled.
+ * 1 otherwise.
+ */
+int dequeue_work(ThreadletWork *work)
+{
+ return dequeue_work_on_queue(&globalqueue, work);
+}
+
+/**
+ * threadlet_queue_init: Initialize a threadlet queue.
+ * @queue: The threadlet queue to be initialized.
+ * @max_threads: Maximum number of threads processing the queue.
+ * @min_threads: Minimum number of threads processing the queue.
+ */
+void threadlet_queue_init(ThreadletQueue *queue,
+ int max_threads, int min_threads)
+{
+ queue->cur_threads = 0;
+ queue->idle_threads = 0;
+ queue->max_threads = max_threads;
+ queue->min_threads = min_threads;
+ QTAILQ_INIT(&queue->request_list);
+ qemu_mutex_init(&queue->lock);
+ qemu_cond_init(&queue->cond);
+}
diff --git a/qemu-threadlets.h b/qemu-threadlets.h
new file mode 100644
index 0000000..39461a5
--- /dev/null
+++ b/qemu-threadlets.h
@@ -0,0 +1,46 @@
+/*
+ * Threadlet support for offloading tasks to be executed asynchronously
+ *
+ * Copyright IBM, Corp. 2008
+ * Copyright IBM, Corp. 2010
+ *
+ * Authors:
+ * Anthony Liguori <aliguori@us.ibm.com>
+ * Aneesh Kumar K.V <aneesh.kumar@linux.vnet.ibm.com>
+ * Gautham R Shenoy <gautham.shenoy@gmail.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2. See
+ * the COPYING file in the top-level directory.
+ */
+
+#ifndef QEMU_ASYNC_WORK_H
+#define QEMU_ASYNC_WORK_H
+
+#include "qemu-queue.h"
+#include "qemu-common.h"
+#include "qemu-thread.h"
+
+typedef struct ThreadletQueue
+{
+ QemuMutex lock;
+ QemuCond cond;
+ int max_threads;
+ int min_threads;
+ int cur_threads;
+ int idle_threads;
+ QTAILQ_HEAD(, ThreadletWork) request_list;
+} ThreadletQueue;
+
+typedef struct ThreadletWork
+{
+ QTAILQ_ENTRY(ThreadletWork) node;
+ void (*func)(struct ThreadletWork *work);
+} ThreadletWork;
+
+void submit_work_to_queue(ThreadletQueue *queue, ThreadletWork *work);
+void submit_work(ThreadletWork *work);
+int dequeue_work_on_queue(ThreadletQueue *queue, ThreadletWork *work);
+int dequeue_work(ThreadletWork *work);
+void threadlet_queue_init(ThreadletQueue *queue, int max_threads,
+ int min_threads);
+#endif
^ permalink raw reply related [flat|nested] 6+ messages in thread
* [Qemu-devel] [PATCH 3/3] Add helper functions to enable virtio-9p make use of the threadlets
2010-11-15 17:53 [Qemu-devel] [PATCH 0/3] v11: Threadlets: A generic task offloading framework Arun R Bharadwaj
2010-11-15 17:53 ` [Qemu-devel] [PATCH 1/3] Make paio subsystem use threadlets infrastructure Arun R Bharadwaj
2010-11-15 17:53 ` [Qemu-devel] [PATCH 2/3] Move threadlets infrastructure to qemu-threadlets.c Arun R Bharadwaj
@ 2010-11-15 17:54 ` Arun R Bharadwaj
2 siblings, 0 replies; 6+ messages in thread
From: Arun R Bharadwaj @ 2010-11-15 17:54 UTC (permalink / raw)
To: qemu-devel; +Cc: kwolf
From: Gautham R Shenoy <ego@in.ibm.com>
infrastructure for offloading blocking tasks such as making posix calls on
to the helper threads and handle the post_posix_operations() from the
context of the iothread. This frees the vcpu thread to process any other guest
operations while the processing of v9fs_io is in progress.
Signed-off-by: Gautham R Shenoy <ego@in.ibm.com>
Signed-off-by: Sripathi Kodi <sripathik@in.ibm.com>
Signed-off-by: Arun R Bharadwaj <arun@linux.vnet.ibm.com>
---
hw/virtio-9p.c | 164 ++++++++++++++++++++++++++++++++++++++++++++++++++++
posix-aio-compat.c | 30 +++-------
qemu-threadlets.c | 21 +++++++
qemu-threadlets.h | 1
vl.c | 3 +
5 files changed, 196 insertions(+), 23 deletions(-)
diff --git a/hw/virtio-9p.c b/hw/virtio-9p.c
index 7c59988..88da20f 100644
--- a/hw/virtio-9p.c
+++ b/hw/virtio-9p.c
@@ -18,6 +18,7 @@
#include "fsdev/qemu-fsdev.h"
#include "virtio-9p-debug.h"
#include "virtio-9p-xattr.h"
+#include "qemu-threadlets.h"
int debug_9p_pdu;
@@ -33,6 +34,149 @@ enum {
Oappend = 0x80,
};
+typedef struct V9fsPostOp {
+ QTAILQ_ENTRY(V9fsPostOp) node;
+ void (*func)(void *arg);
+ void *arg;
+} V9fsPostOp;
+
+static struct {
+ int rfd;
+ int wfd;
+ QemuMutex lock;
+ QTAILQ_HEAD(, V9fsPostOp) post_op_list;
+} v9fs_async_struct;
+
+static void die2(int err, const char *what)
+{
+ fprintf(stderr, "%s failed: %s\n", what, strerror(err));
+ abort();
+}
+
+static void die(const char *what)
+{
+ die2(errno, what);
+}
+
+#define ASYNC_MAX_PROCESS 5
+
+/**
+ * v9fs_process_post_ops: Process any pending v9fs_post_posix_operation
+ * @arg: Not used.
+ *
+ * This function serves as a callback to the iothread to be called into whenever
+ * the v9fs_async_struct.wfd is written into. This thread goes through the list
+ * of v9fs_post_posix_operations() and executes them. In the process, it might
+ * queue more job on the asynchronous thread pool.
+ */
+static void v9fs_process_post_ops(void *arg)
+{
+ int count = 0;
+ struct V9fsPostOp *post_op;
+ int ret;
+ char byte;
+
+ do {
+ ret = read(v9fs_async_struct.rfd, &byte, sizeof(byte));
+ } while (ret > 0 || (ret == -1 && errno == EINTR));
+
+ qemu_mutex_lock(&v9fs_async_struct.lock);
+ for (count = 0; count < ASYNC_MAX_PROCESS; count++) {
+ if (QTAILQ_EMPTY(&(v9fs_async_struct.post_op_list))) {
+ break;
+ }
+ post_op = QTAILQ_FIRST(&(v9fs_async_struct.post_op_list));
+ QTAILQ_REMOVE(&(v9fs_async_struct.post_op_list), post_op, node);
+
+ qemu_mutex_unlock(&v9fs_async_struct.lock);
+ post_op->func(post_op->arg);
+ qemu_free(post_op);
+ qemu_mutex_lock(&v9fs_async_struct.lock);
+ }
+ qemu_mutex_unlock(&v9fs_async_struct.lock);
+}
+
+/**
+ * v9fs_async_signal: Inform the io-thread of completion of async job.
+ *
+ * This function is used to inform the iothread that a particular
+ * async-operation pertaining to v9fs has been completed and that the io thread
+ * can handle the v9fs_post_posix_operation.
+ *
+ * This is based on the aio_signal_handler
+ */
+static inline void v9fs_async_signal(void)
+{
+ char byte = 0;
+ ssize_t ret;
+ int tries = 0;
+
+ qemu_mutex_lock(&v9fs_async_struct.lock);
+ do {
+ assert(tries != 100);
+ ret = write(v9fs_async_struct.wfd, &byte, sizeof(byte));
+ tries++;
+ } while (ret < 0 && errno == EAGAIN);
+ qemu_mutex_unlock(&v9fs_async_struct.lock);
+
+ if (ret < 0 && errno != EAGAIN) {
+ die("write() in v9fs");
+ }
+
+ if (kill(getpid(), SIGUSR2)) {
+ die("kill failed");
+ }
+}
+
+/**
+ * v9fs_async_helper_done: Marks the completion of the v9fs_async job
+ * @func: v9fs_post_posix_func() for post-processing invoked in the context of
+ * the io-thread
+ * @arg: Argument to func.
+ *
+ * This function is called from the context of one of the asynchronous threads
+ * in the thread pool. This is called when the asynchronous thread has finished
+ * executing a v9fs_posix_operation. It's purpose is to initiate the process of
+ * informing the io-thread that the v9fs_posix_operation has completed.
+ */
+static void v9fs_async_helper_done(void (*func)(void *arg), void *arg)
+{
+ struct V9fsPostOp *post_op;
+
+ post_op = qemu_mallocz(sizeof(*post_op));
+ post_op->func = func;
+ post_op->arg = arg;
+
+ qemu_mutex_lock(&v9fs_async_struct.lock);
+ QTAILQ_INSERT_TAIL(&(v9fs_async_struct.post_op_list), post_op, node);
+ qemu_mutex_unlock(&v9fs_async_struct.lock);
+
+ v9fs_async_signal();
+}
+
+/**
+ * v9fs_do_async_posix: Offload v9fs_posix_operation onto async thread.
+ * @vs: V9fsOPState variable for the OP operation.
+ * @posix_fn: The posix function which has to be offloaded onto async thread.
+ * @post_fn_ptr: Address of the location to hold the post_fn corresponding to
+ * the posix_fn
+ * @post_fn: The post processing function corresponding to the posix_fn.
+ *
+ * This function is a helper to offload posix_operation on to the asynchronous
+ * thread pool. It sets up the associations with the post_function that needs to
+ * be invoked by from the context of the iothread once the posix_fn has been
+ * executed.
+ */
+static void v9fs_do_async_posix(ThreadletWork *work ,
+ void (*posix_fn)(ThreadletWork *work),
+ void (**post_fn_ptr)(void *arg),
+ void (*post_fn)(void *arg))
+{
+ *post_fn_ptr = post_fn;
+ work->func = posix_fn;
+ submit_work(work);
+}
+
static int omode_to_uflags(int8_t mode)
{
int ret = 0;
@@ -3657,7 +3801,7 @@ VirtIODevice *virtio_9p_init(DeviceState *dev, V9fsConf *conf)
int i, len;
struct stat stat;
FsTypeEntry *fse;
-
+ int fds[2];
s = (V9fsState *)virtio_common_init("virtio-9p",
VIRTIO_ID_9P,
@@ -3740,5 +3884,23 @@ VirtIODevice *virtio_9p_init(DeviceState *dev, V9fsConf *conf)
s->tag_len;
s->vdev.get_config = virtio_9p_get_config;
+ if (qemu_pipe(fds) == -1) {
+ fprintf(stderr, "failed to create fd's for virtio-9p\n");
+ exit(1);
+ }
+
+ v9fs_async_struct.rfd = fds[0];
+ v9fs_async_struct.wfd = fds[1];
+
+ fcntl(fds[0], F_SETFL, O_NONBLOCK);
+ fcntl(fds[1], F_SETFL, O_NONBLOCK);
+
+ qemu_set_fd_handler(fds[0], v9fs_process_post_ops, NULL, NULL);
+ QTAILQ_INIT(&v9fs_async_struct.post_op_list);
+ qemu_mutex_init(&(v9fs_async_struct.lock));
+
+ (void)v9fs_do_async_posix;
+ (void)v9fs_async_helper_done;
+
return &s->vdev;
}
diff --git a/posix-aio-compat.c b/posix-aio-compat.c
index d8cae6d..0f8dfa9 100644
--- a/posix-aio-compat.c
+++ b/posix-aio-compat.c
@@ -260,11 +260,14 @@ static ssize_t handle_aiocb_rw(struct qemu_paiocb *aiocb)
return nbytes;
}
+static PosixAioState *posix_aio_state;
+
static void aio_thread(ThreadletWork *work)
{
pid_t pid;
struct qemu_paiocb *aiocb = container_of(work, struct qemu_paiocb, work);
ssize_t ret = 0;
+ char byte = 0;
pid = getpid();
@@ -290,6 +293,11 @@ static void aio_thread(ThreadletWork *work)
qemu_cond_broadcast(&aiocb_completion);
qemu_mutex_unlock(&aiocb_mutex);
+ ret = write(posix_aio_state->wfd, &byte, sizeof(byte));
+ if (ret < 0 && errno != EAGAIN) {
+ die("write()");
+ }
+
if (kill(pid, aiocb->ev_signo)) {
die("kill failed");
}
@@ -407,22 +415,6 @@ static int posix_aio_flush(void *opaque)
return !!s->first_aio;
}
-static PosixAioState *posix_aio_state;
-
-static void aio_signal_handler(int signum)
-{
- if (posix_aio_state) {
- char byte = 0;
- ssize_t ret;
-
- ret = write(posix_aio_state->wfd, &byte, sizeof(byte));
- if (ret < 0 && errno != EAGAIN)
- die("write()");
- }
-
- qemu_service_io();
-}
-
static void paio_remove(struct qemu_paiocb *acb)
{
struct qemu_paiocb **pacb;
@@ -518,7 +510,6 @@ BlockDriverAIOCB *paio_ioctl(BlockDriverState *bs, int fd,
int paio_init(void)
{
- struct sigaction act;
PosixAioState *s;
int fds[2];
@@ -530,11 +521,6 @@ int paio_init(void)
s = qemu_malloc(sizeof(PosixAioState));
- sigfillset(&act.sa_mask);
- act.sa_flags = 0; /* do not restart syscalls to interrupt select() */
- act.sa_handler = aio_signal_handler;
- sigaction(SIGUSR2, &act, NULL);
-
s->first_aio = NULL;
if (qemu_pipe(fds) == -1) {
fprintf(stderr, "failed to create pipe\n");
diff --git a/qemu-threadlets.c b/qemu-threadlets.c
index 23b4ecf..1e78f56 100644
--- a/qemu-threadlets.c
+++ b/qemu-threadlets.c
@@ -16,12 +16,28 @@
#include "qemu-threadlets.h"
#include "osdep.h"
+#include <signal.h>
#define MAX_GLOBAL_THREADS 64
#define MIN_GLOBAL_THREADS 8
static ThreadletQueue globalqueue;
static int globalqueue_init;
+static void threadlet_io_completion_signal_handler(int signum)
+{
+ qemu_service_io();
+}
+
+static void threadlet_register_signal_handler(void)
+{
+ struct sigaction act;
+
+ sigfillset(&act.sa_mask);
+ act.sa_flags = 0; /* do not restart syscalls to interrupt select() */
+ act.sa_handler = threadlet_io_completion_signal_handler;
+ sigaction(SIGUSR2, &act, NULL);
+}
+
static void *threadlet_worker(void *data)
{
ThreadletQueue *queue = data;
@@ -166,3 +182,8 @@ void threadlet_queue_init(ThreadletQueue *queue,
qemu_mutex_init(&queue->lock);
qemu_cond_init(&queue->cond);
}
+
+void threadlet_init(void)
+{
+ threadlet_register_signal_handler();
+}
diff --git a/qemu-threadlets.h b/qemu-threadlets.h
index 39461a5..f26c5b7 100644
--- a/qemu-threadlets.h
+++ b/qemu-threadlets.h
@@ -43,4 +43,5 @@ int dequeue_work_on_queue(ThreadletQueue *queue, ThreadletWork *work);
int dequeue_work(ThreadletWork *work);
void threadlet_queue_init(ThreadletQueue *queue, int max_threads,
int min_threads);
+void threadlet_init(void);
#endif
diff --git a/vl.c b/vl.c
index df414ef..7b9a425 100644
--- a/vl.c
+++ b/vl.c
@@ -148,6 +148,7 @@ int main(int argc, char **argv)
#include "qemu-config.h"
#include "qemu-objects.h"
#include "qemu-options.h"
+#include "qemu-threadlets.h"
#ifdef CONFIG_VIRTFS
#include "fsdev/qemu-fsdev.h"
#endif
@@ -2922,6 +2923,8 @@ int main(int argc, char **argv, char **envp)
exit(1);
}
+ threadlet_init();
+
/* init generic devices */
if (qemu_opts_foreach(qemu_find_opts("device"), device_init_func, NULL, 1) != 0)
exit(1);
^ permalink raw reply related [flat|nested] 6+ messages in thread
* Re: [Qemu-devel] [PATCH 1/3] Make paio subsystem use threadlets infrastructure
2010-11-15 17:53 ` [Qemu-devel] [PATCH 1/3] Make paio subsystem use threadlets infrastructure Arun R Bharadwaj
@ 2010-11-15 21:13 ` Anthony Liguori
2010-11-18 18:05 ` Arun R Bharadwaj
0 siblings, 1 reply; 6+ messages in thread
From: Anthony Liguori @ 2010-11-15 21:13 UTC (permalink / raw)
To: Arun R Bharadwaj; +Cc: kwolf, qemu-devel
On 11/15/2010 11:53 AM, Arun R Bharadwaj wrote:
> From: Gautham R Shenoy<gautham.shenoy@gmail.com>
>
> This patch creates a generic asynchronous-task-offloading infrastructure named
> threadlets.
>
> The patch creates a global queue on-to which subsystems can queue their tasks to
> be executed asynchronously.
>
> The patch also provides API's that allow a subsystem to create a private queue
> with an associated pool of threads.
>
> The patch has been tested with fstress.
>
This patch is very hard to review. Can you generize this via a serial
of incremental changes instead of a big rewrite?
This code has been historically fickle so doing the extra work to
simplify review is worth it.
Regards,
Anthony Liguori
> Signed-off-by: Arun R Bharadwaj<arun@linux.vnet.ibm.com>
> Signed-off-by: Aneesh Kumar K.V<aneesh.kumar@linux.vnet.ibm.com>
> Signed-off-by: Gautham R Shenoy<gautham.shenoy@gmail.com>
> Signed-off-by: Sripathi Kodi<sripathik@in.ibm.com>
> ---
> Makefile.objs | 2
> configure | 2
> posix-aio-compat.c | 354 +++++++++++++++++++++++++++++++---------------------
> 3 files changed, 211 insertions(+), 147 deletions(-)
>
> diff --git a/Makefile.objs b/Makefile.objs
> index cd5a24b..3b7ec27 100644
> --- a/Makefile.objs
> +++ b/Makefile.objs
> @@ -9,6 +9,7 @@ qobject-obj-y += qerror.o
>
> block-obj-y = cutils.o cache-utils.o qemu-malloc.o qemu-option.o module.o
> block-obj-y += nbd.o block.o aio.o aes.o osdep.o qemu-config.o
> +block-obj-$(CONFIG_POSIX) += qemu-thread.o
> block-obj-$(CONFIG_POSIX) += posix-aio-compat.o
> block-obj-$(CONFIG_LINUX_AIO) += linux-aio.o
>
> @@ -124,7 +125,6 @@ endif
> common-obj-y += $(addprefix ui/, $(ui-obj-y))
>
> common-obj-y += iov.o acl.o
> -common-obj-$(CONFIG_THREAD) += qemu-thread.o
> common-obj-y += notify.o event_notifier.o
> common-obj-y += qemu-timer.o
>
> diff --git a/configure b/configure
> index a079a49..addf733 100755
> --- a/configure
> +++ b/configure
> @@ -2456,7 +2456,6 @@ if test "$vnc_png" != "no" ; then
> fi
> if test "$vnc_thread" != "no" ; then
> echo "CONFIG_VNC_THREAD=y">> $config_host_mak
> - echo "CONFIG_THREAD=y">> $config_host_mak
> fi
> if test "$fnmatch" = "yes" ; then
> echo "CONFIG_FNMATCH=y">> $config_host_mak
> @@ -2534,7 +2533,6 @@ if test "$xen" = "yes" ; then
> fi
> if test "$io_thread" = "yes" ; then
> echo "CONFIG_IOTHREAD=y">> $config_host_mak
> - echo "CONFIG_THREAD=y">> $config_host_mak
> fi
> if test "$linux_aio" = "yes" ; then
> echo "CONFIG_LINUX_AIO=y">> $config_host_mak
> diff --git a/posix-aio-compat.c b/posix-aio-compat.c
> index 7b862b5..e1812fc 100644
> --- a/posix-aio-compat.c
> +++ b/posix-aio-compat.c
> @@ -29,7 +29,33 @@
> #include "block_int.h"
>
> #include "block/raw-posix-aio.h"
> +#include "qemu-thread.h"
>
> +#define MAX_GLOBAL_THREADS 64
> +#define MIN_GLOBAL_THREADS 8
> +
> +static QemuMutex aiocb_mutex;
> +static QemuCond aiocb_completion;
> +
> +typedef struct ThreadletQueue
> +{
> + QemuMutex lock;
> + QemuCond cond;
> + int max_threads;
> + int min_threads;
> + int cur_threads;
> + int idle_threads;
> + QTAILQ_HEAD(, ThreadletWork) request_list;
> +} ThreadletQueue;
> +
> +typedef struct ThreadletWork
> +{
> + QTAILQ_ENTRY(ThreadletWork) node;
> + void (*func)(struct ThreadletWork *work);
> +} ThreadletWork;
> +
> +static ThreadletQueue globalqueue;
> +static int globalqueue_init;
>
> struct qemu_paiocb {
> BlockDriverAIOCB common;
> @@ -44,13 +70,12 @@ struct qemu_paiocb {
> int ev_signo;
> off_t aio_offset;
>
> - QTAILQ_ENTRY(qemu_paiocb) node;
> int aio_type;
> ssize_t ret;
> - int active;
> struct qemu_paiocb *next;
>
> int async_context_id;
> + ThreadletWork work;
> };
>
> typedef struct PosixAioState {
> @@ -58,64 +83,169 @@ typedef struct PosixAioState {
> struct qemu_paiocb *first_aio;
> } PosixAioState;
>
> +static void *threadlet_worker(void *data)
> +{
> + ThreadletQueue *queue = data;
>
> -static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
> -static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
> -static pthread_t thread_id;
> -static pthread_attr_t attr;
> -static int max_threads = 64;
> -static int cur_threads = 0;
> -static int idle_threads = 0;
> -static QTAILQ_HEAD(, qemu_paiocb) request_list;
> + qemu_mutex_lock(&queue->lock);
> + while (1) {
> + ThreadletWork *work;
> + int ret = 0;
> +
> + while (QTAILQ_EMPTY(&queue->request_list)&&
> + (ret != ETIMEDOUT)) {
> + /* wait for cond to be signalled or broadcast for 1000s */
> + ret = qemu_cond_timedwait((&queue->cond),
> +&(queue->lock), 10*100000);
> + }
>
> -#ifdef CONFIG_PREADV
> -static int preadv_present = 1;
> -#else
> -static int preadv_present = 0;
> -#endif
> + assert(queue->idle_threads != 0);
> + if (QTAILQ_EMPTY(&queue->request_list)) {
> + if (queue->cur_threads> queue->min_threads) {
> + /* We retain the minimum number of threads */
> + break;
> + }
> + } else {
> + work = QTAILQ_FIRST(&queue->request_list);
> + QTAILQ_REMOVE(&queue->request_list, work, node);
>
> -static void die2(int err, const char *what)
> -{
> - fprintf(stderr, "%s failed: %s\n", what, strerror(err));
> - abort();
> + queue->idle_threads--;
> + qemu_mutex_unlock(&queue->lock);
> +
> + /* execute the work function */
> + work->func(work);
> +
> + qemu_mutex_lock(&queue->lock);
> + queue->idle_threads++;
> + }
> + }
> +
> + queue->idle_threads--;
> + queue->cur_threads--;
> + qemu_mutex_unlock(&queue->lock);
> +
> + return NULL;
> }
>
> -static void die(const char *what)
> +static void spawn_threadlet(ThreadletQueue *queue)
> {
> - die2(errno, what);
> + QemuThread thread;
> +
> + queue->cur_threads++;
> + queue->idle_threads++;
> +
> + qemu_thread_create(&thread, threadlet_worker, queue);
> }
>
> -static void mutex_lock(pthread_mutex_t *mutex)
> +/**
> + * submit_work_to_queue: Submit a new task to a private queue to be
> + * executed asynchronously.
> + * @queue: Per-subsystem private queue to which the new task needs
> + * to be submitted.
> + * @work: Contains information about the task that needs to be submitted.
> + */
> +static void submit_work_to_queue(ThreadletQueue *queue, ThreadletWork *work)
> {
> - int ret = pthread_mutex_lock(mutex);
> - if (ret) die2(ret, "pthread_mutex_lock");
> + qemu_mutex_lock(&queue->lock);
> + if (queue->idle_threads == 0&& queue->cur_threads< queue->max_threads) {
> + spawn_threadlet(queue);
> + } else {
> + qemu_cond_signal(&queue->cond);
> + }
> + QTAILQ_INSERT_TAIL(&queue->request_list, work, node);
> + qemu_mutex_unlock(&queue->lock);
> }
>
> -static void mutex_unlock(pthread_mutex_t *mutex)
> +static void threadlet_queue_init(ThreadletQueue *queue,
> + int max_threads, int min_threads);
> +
> +/**
> + * submit_work: Submit to the global queue a new task to be executed
> + * asynchronously.
> + * @work: Contains information about the task that needs to be submitted.
> + */
> +static void submit_work(ThreadletWork *work)
> {
> - int ret = pthread_mutex_unlock(mutex);
> - if (ret) die2(ret, "pthread_mutex_unlock");
> + if (!globalqueue_init) {
> + threadlet_queue_init(&globalqueue, MAX_GLOBAL_THREADS,
> + MIN_GLOBAL_THREADS);
> + globalqueue_init = 1;
> + }
> +
> + submit_work_to_queue(&globalqueue, work);
> }
>
> -static int cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex,
> - struct timespec *ts)
> +/**
> + * dequeue_work_on_queue: Cancel a task queued on a Queue.
> + * @queue: The queue containing the task to be cancelled.
> + * @work: Contains the information of the task that needs to be cancelled.
> + *
> + * Returns: 0 if the task is successfully cancelled.
> + * 1 otherwise.
> + */
> +static int dequeue_work_on_queue(ThreadletQueue *queue, ThreadletWork *work)
> {
> - int ret = pthread_cond_timedwait(cond, mutex, ts);
> - if (ret&& ret != ETIMEDOUT) die2(ret, "pthread_cond_timedwait");
> + ThreadletWork *ret_work;
> + int ret = 1;
> +
> + qemu_mutex_lock(&queue->lock);
> + QTAILQ_FOREACH(ret_work,&(queue->request_list), node) {
> + if (ret_work == work) {
> + QTAILQ_REMOVE(&queue->request_list, ret_work, node);
> + ret = 0;
> + break;
> + }
> + }
> + qemu_mutex_unlock(&queue->lock);
> +
> return ret;
> }
>
> -static void cond_signal(pthread_cond_t *cond)
> +/**
> + * dequeue_work: Cancel a task queued on the global queue.
> + * @work: Contains the information of the task that needs to be cancelled.
> + *
> + * Returns: 0 if the task is successfully cancelled.
> + * 1 otherwise.
> + */
> +static int dequeue_work(ThreadletWork *work)
> +{
> + return dequeue_work_on_queue(&globalqueue, work);
> +}
> +
> +/**
> + * threadlet_queue_init: Initialize a threadlet queue.
> + * @queue: The threadlet queue to be initialized.
> + * @max_threads: Maximum number of threads processing the queue.
> + * @min_threads: Minimum number of threads processing the queue.
> + */
> +static void threadlet_queue_init(ThreadletQueue *queue,
> + int max_threads, int min_threads)
> +{
> + queue->cur_threads = 0;
> + queue->idle_threads = 0;
> + queue->max_threads = max_threads;
> + queue->min_threads = min_threads;
> + QTAILQ_INIT(&queue->request_list);
> + qemu_mutex_init(&queue->lock);
> + qemu_cond_init(&queue->cond);
> +}
> +
> +#ifdef CONFIG_PREADV
> +static int preadv_present = 1;
> +#else
> +static int preadv_present;
> +#endif
> +
> +static void die2(int err, const char *what)
> {
> - int ret = pthread_cond_signal(cond);
> - if (ret) die2(ret, "pthread_cond_signal");
> + fprintf(stderr, "%s failed: %s\n", what, strerror(err));
> + abort();
> }
>
> -static void thread_create(pthread_t *thread, pthread_attr_t *attr,
> - void *(*start_routine)(void*), void *arg)
> +static void die(const char *what)
> {
> - int ret = pthread_create(thread, attr, start_routine, arg);
> - if (ret) die2(ret, "pthread_create");
> + die2(errno, what);
> }
>
> static ssize_t handle_aiocb_ioctl(struct qemu_paiocb *aiocb)
> @@ -301,106 +431,58 @@ static ssize_t handle_aiocb_rw(struct qemu_paiocb *aiocb)
> return nbytes;
> }
>
> -static void *aio_thread(void *unused)
> +static void aio_thread(ThreadletWork *work)
> {
> pid_t pid;
> + struct qemu_paiocb *aiocb = container_of(work, struct qemu_paiocb, work);
> + ssize_t ret = 0;
>
> pid = getpid();
>
> - while (1) {
> - struct qemu_paiocb *aiocb;
> - ssize_t ret = 0;
> - qemu_timeval tv;
> - struct timespec ts;
> -
> - qemu_gettimeofday(&tv);
> - ts.tv_sec = tv.tv_sec + 10;
> - ts.tv_nsec = 0;
> -
> - mutex_lock(&lock);
> -
> - while (QTAILQ_EMPTY(&request_list)&&
> - !(ret == ETIMEDOUT)) {
> - ret = cond_timedwait(&cond,&lock,&ts);
> - }
> -
> - if (QTAILQ_EMPTY(&request_list))
> - break;
> -
> - aiocb = QTAILQ_FIRST(&request_list);
> - QTAILQ_REMOVE(&request_list, aiocb, node);
> - aiocb->active = 1;
> - idle_threads--;
> - mutex_unlock(&lock);
> -
> - switch (aiocb->aio_type& QEMU_AIO_TYPE_MASK) {
> - case QEMU_AIO_READ:
> - case QEMU_AIO_WRITE:
> - ret = handle_aiocb_rw(aiocb);
> - break;
> - case QEMU_AIO_FLUSH:
> - ret = handle_aiocb_flush(aiocb);
> - break;
> - case QEMU_AIO_IOCTL:
> - ret = handle_aiocb_ioctl(aiocb);
> - break;
> - default:
> - fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
> - ret = -EINVAL;
> - break;
> - }
> -
> - mutex_lock(&lock);
> - aiocb->ret = ret;
> - idle_threads++;
> - mutex_unlock(&lock);
> -
> - if (kill(pid, aiocb->ev_signo)) die("kill failed");
> + switch (aiocb->aio_type& QEMU_AIO_TYPE_MASK) {
> + case QEMU_AIO_READ:
> + case QEMU_AIO_WRITE:
> + ret = handle_aiocb_rw(aiocb);
> + break;
> + case QEMU_AIO_FLUSH:
> + ret = handle_aiocb_flush(aiocb);
> + break;
> + case QEMU_AIO_IOCTL:
> + ret = handle_aiocb_ioctl(aiocb);
> + break;
> + default:
> + fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
> + ret = -EINVAL;
> + break;
> }
>
> - idle_threads--;
> - cur_threads--;
> - mutex_unlock(&lock);
> -
> - return NULL;
> -}
> -
> -static void spawn_thread(void)
> -{
> - sigset_t set, oldset;
> -
> - cur_threads++;
> - idle_threads++;
> + qemu_mutex_lock(&aiocb_mutex);
> + aiocb->ret = ret;
> + qemu_cond_broadcast(&aiocb_completion);
> + qemu_mutex_unlock(&aiocb_mutex);
>
> - /* block all signals */
> - if (sigfillset(&set)) die("sigfillset");
> - if (sigprocmask(SIG_SETMASK,&set,&oldset)) die("sigprocmask");
> -
> - thread_create(&thread_id,&attr, aio_thread, NULL);
> -
> - if (sigprocmask(SIG_SETMASK,&oldset, NULL)) die("sigprocmask restore");
> + if (kill(pid, aiocb->ev_signo)) {
> + die("kill failed");
> + }
> }
>
> static void qemu_paio_submit(struct qemu_paiocb *aiocb)
> {
> + qemu_mutex_lock(&aiocb_mutex);
> aiocb->ret = -EINPROGRESS;
> - aiocb->active = 0;
> - mutex_lock(&lock);
> - if (idle_threads == 0&& cur_threads< max_threads)
> - spawn_thread();
> - QTAILQ_INSERT_TAIL(&request_list, aiocb, node);
> - mutex_unlock(&lock);
> - cond_signal(&cond);
> + qemu_mutex_unlock(&aiocb_mutex);
> +
> + aiocb->work.func = aio_thread;
> + submit_work(&aiocb->work);
> }
>
> static ssize_t qemu_paio_return(struct qemu_paiocb *aiocb)
> {
> ssize_t ret;
>
> - mutex_lock(&lock);
> + qemu_mutex_lock(&aiocb_mutex);
> ret = aiocb->ret;
> - mutex_unlock(&lock);
> -
> + qemu_mutex_unlock(&aiocb_mutex);
> return ret;
> }
>
> @@ -534,22 +616,14 @@ static void paio_remove(struct qemu_paiocb *acb)
> static void paio_cancel(BlockDriverAIOCB *blockacb)
> {
> struct qemu_paiocb *acb = (struct qemu_paiocb *)blockacb;
> - int active = 0;
> -
> - mutex_lock(&lock);
> - if (!acb->active) {
> - QTAILQ_REMOVE(&request_list, acb, node);
> - acb->ret = -ECANCELED;
> - } else if (acb->ret == -EINPROGRESS) {
> - active = 1;
> - }
> - mutex_unlock(&lock);
>
> - if (active) {
> - /* fail safe: if the aio could not be canceled, we wait for
> - it */
> - while (qemu_paio_error(acb) == EINPROGRESS)
> - ;
> + if (dequeue_work(&acb->work) != 0) {
> + /* Wait for running work item to complete */
> + qemu_mutex_lock(&aiocb_mutex);
> + while (acb->ret == -EINPROGRESS) {
> + qemu_cond_wait(&aiocb_completion,&aiocb_mutex);
> + }
> + qemu_mutex_unlock(&aiocb_mutex);
> }
>
> paio_remove(acb);
> @@ -618,11 +692,13 @@ int paio_init(void)
> struct sigaction act;
> PosixAioState *s;
> int fds[2];
> - int ret;
>
> if (posix_aio_state)
> return 0;
>
> + qemu_mutex_init(&aiocb_mutex);
> + qemu_cond_init(&aiocb_completion);
> +
> s = qemu_malloc(sizeof(PosixAioState));
>
> sigfillset(&act.sa_mask);
> @@ -645,16 +721,6 @@ int paio_init(void)
> qemu_aio_set_fd_handler(s->rfd, posix_aio_read, NULL, posix_aio_flush,
> posix_aio_process_queue, s);
>
> - ret = pthread_attr_init(&attr);
> - if (ret)
> - die2(ret, "pthread_attr_init");
> -
> - ret = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
> - if (ret)
> - die2(ret, "pthread_attr_setdetachstate");
> -
> - QTAILQ_INIT(&request_list);
> -
> posix_aio_state = s;
> return 0;
> }
>
>
>
^ permalink raw reply [flat|nested] 6+ messages in thread
* Re: [Qemu-devel] [PATCH 1/3] Make paio subsystem use threadlets infrastructure
2010-11-15 21:13 ` Anthony Liguori
@ 2010-11-18 18:05 ` Arun R Bharadwaj
0 siblings, 0 replies; 6+ messages in thread
From: Arun R Bharadwaj @ 2010-11-18 18:05 UTC (permalink / raw)
To: Anthony Liguori; +Cc: kwolf, qemu-devel
* Anthony Liguori <anthony@codemonkey.ws> [2010-11-15 15:13:24]:
> On 11/15/2010 11:53 AM, Arun R Bharadwaj wrote:
> >From: Gautham R Shenoy<gautham.shenoy@gmail.com>
> >
> >This patch creates a generic asynchronous-task-offloading infrastructure named
> >threadlets.
> >
> >The patch creates a global queue on-to which subsystems can queue their tasks to
> >be executed asynchronously.
> >
> >The patch also provides API's that allow a subsystem to create a private queue
> >with an associated pool of threads.
> >
> >The patch has been tested with fstress.
>
> This patch is very hard to review. Can you generize this via a
> serial of incremental changes instead of a big rewrite?
>
> This code has been historically fickle so doing the extra work to
> simplify review is worth it.
>
Hi Anthony,
I have broken down this patch into 4 sub-patches to enable better
reviewing. Posting it as v12. Please let me know your comments.
-arun
> Regards,
>
> Anthony Liguori
>
> >Signed-off-by: Arun R Bharadwaj<arun@linux.vnet.ibm.com>
> >Signed-off-by: Aneesh Kumar K.V<aneesh.kumar@linux.vnet.ibm.com>
> >Signed-off-by: Gautham R Shenoy<gautham.shenoy@gmail.com>
> >Signed-off-by: Sripathi Kodi<sripathik@in.ibm.com>
> >---
> > Makefile.objs | 2
> > configure | 2
> > posix-aio-compat.c | 354 +++++++++++++++++++++++++++++++---------------------
> > 3 files changed, 211 insertions(+), 147 deletions(-)
> >
> >diff --git a/Makefile.objs b/Makefile.objs
> >index cd5a24b..3b7ec27 100644
> >--- a/Makefile.objs
> >+++ b/Makefile.objs
> >@@ -9,6 +9,7 @@ qobject-obj-y += qerror.o
> >
> > block-obj-y = cutils.o cache-utils.o qemu-malloc.o qemu-option.o module.o
> > block-obj-y += nbd.o block.o aio.o aes.o osdep.o qemu-config.o
> >+block-obj-$(CONFIG_POSIX) += qemu-thread.o
> > block-obj-$(CONFIG_POSIX) += posix-aio-compat.o
> > block-obj-$(CONFIG_LINUX_AIO) += linux-aio.o
> >
> >@@ -124,7 +125,6 @@ endif
> > common-obj-y += $(addprefix ui/, $(ui-obj-y))
> >
> > common-obj-y += iov.o acl.o
> >-common-obj-$(CONFIG_THREAD) += qemu-thread.o
> > common-obj-y += notify.o event_notifier.o
> > common-obj-y += qemu-timer.o
> >
> >diff --git a/configure b/configure
> >index a079a49..addf733 100755
> >--- a/configure
> >+++ b/configure
> >@@ -2456,7 +2456,6 @@ if test "$vnc_png" != "no" ; then
> > fi
> > if test "$vnc_thread" != "no" ; then
> > echo "CONFIG_VNC_THREAD=y">> $config_host_mak
> >- echo "CONFIG_THREAD=y">> $config_host_mak
> > fi
> > if test "$fnmatch" = "yes" ; then
> > echo "CONFIG_FNMATCH=y">> $config_host_mak
> >@@ -2534,7 +2533,6 @@ if test "$xen" = "yes" ; then
> > fi
> > if test "$io_thread" = "yes" ; then
> > echo "CONFIG_IOTHREAD=y">> $config_host_mak
> >- echo "CONFIG_THREAD=y">> $config_host_mak
> > fi
> > if test "$linux_aio" = "yes" ; then
> > echo "CONFIG_LINUX_AIO=y">> $config_host_mak
> >diff --git a/posix-aio-compat.c b/posix-aio-compat.c
> >index 7b862b5..e1812fc 100644
> >--- a/posix-aio-compat.c
> >+++ b/posix-aio-compat.c
> >@@ -29,7 +29,33 @@
> > #include "block_int.h"
> >
> > #include "block/raw-posix-aio.h"
> >+#include "qemu-thread.h"
> >
> >+#define MAX_GLOBAL_THREADS 64
> >+#define MIN_GLOBAL_THREADS 8
> >+
> >+static QemuMutex aiocb_mutex;
> >+static QemuCond aiocb_completion;
> >+
> >+typedef struct ThreadletQueue
> >+{
> >+ QemuMutex lock;
> >+ QemuCond cond;
> >+ int max_threads;
> >+ int min_threads;
> >+ int cur_threads;
> >+ int idle_threads;
> >+ QTAILQ_HEAD(, ThreadletWork) request_list;
> >+} ThreadletQueue;
> >+
> >+typedef struct ThreadletWork
> >+{
> >+ QTAILQ_ENTRY(ThreadletWork) node;
> >+ void (*func)(struct ThreadletWork *work);
> >+} ThreadletWork;
> >+
> >+static ThreadletQueue globalqueue;
> >+static int globalqueue_init;
> >
> > struct qemu_paiocb {
> > BlockDriverAIOCB common;
> >@@ -44,13 +70,12 @@ struct qemu_paiocb {
> > int ev_signo;
> > off_t aio_offset;
> >
> >- QTAILQ_ENTRY(qemu_paiocb) node;
> > int aio_type;
> > ssize_t ret;
> >- int active;
> > struct qemu_paiocb *next;
> >
> > int async_context_id;
> >+ ThreadletWork work;
> > };
> >
> > typedef struct PosixAioState {
> >@@ -58,64 +83,169 @@ typedef struct PosixAioState {
> > struct qemu_paiocb *first_aio;
> > } PosixAioState;
> >
> >+static void *threadlet_worker(void *data)
> >+{
> >+ ThreadletQueue *queue = data;
> >
> >-static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
> >-static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
> >-static pthread_t thread_id;
> >-static pthread_attr_t attr;
> >-static int max_threads = 64;
> >-static int cur_threads = 0;
> >-static int idle_threads = 0;
> >-static QTAILQ_HEAD(, qemu_paiocb) request_list;
> >+ qemu_mutex_lock(&queue->lock);
> >+ while (1) {
> >+ ThreadletWork *work;
> >+ int ret = 0;
> >+
> >+ while (QTAILQ_EMPTY(&queue->request_list)&&
> >+ (ret != ETIMEDOUT)) {
> >+ /* wait for cond to be signalled or broadcast for 1000s */
> >+ ret = qemu_cond_timedwait((&queue->cond),
> >+&(queue->lock), 10*100000);
> >+ }
> >
> >-#ifdef CONFIG_PREADV
> >-static int preadv_present = 1;
> >-#else
> >-static int preadv_present = 0;
> >-#endif
> >+ assert(queue->idle_threads != 0);
> >+ if (QTAILQ_EMPTY(&queue->request_list)) {
> >+ if (queue->cur_threads> queue->min_threads) {
> >+ /* We retain the minimum number of threads */
> >+ break;
> >+ }
> >+ } else {
> >+ work = QTAILQ_FIRST(&queue->request_list);
> >+ QTAILQ_REMOVE(&queue->request_list, work, node);
> >
> >-static void die2(int err, const char *what)
> >-{
> >- fprintf(stderr, "%s failed: %s\n", what, strerror(err));
> >- abort();
> >+ queue->idle_threads--;
> >+ qemu_mutex_unlock(&queue->lock);
> >+
> >+ /* execute the work function */
> >+ work->func(work);
> >+
> >+ qemu_mutex_lock(&queue->lock);
> >+ queue->idle_threads++;
> >+ }
> >+ }
> >+
> >+ queue->idle_threads--;
> >+ queue->cur_threads--;
> >+ qemu_mutex_unlock(&queue->lock);
> >+
> >+ return NULL;
> > }
> >
> >-static void die(const char *what)
> >+static void spawn_threadlet(ThreadletQueue *queue)
> > {
> >- die2(errno, what);
> >+ QemuThread thread;
> >+
> >+ queue->cur_threads++;
> >+ queue->idle_threads++;
> >+
> >+ qemu_thread_create(&thread, threadlet_worker, queue);
> > }
> >
> >-static void mutex_lock(pthread_mutex_t *mutex)
> >+/**
> >+ * submit_work_to_queue: Submit a new task to a private queue to be
> >+ * executed asynchronously.
> >+ * @queue: Per-subsystem private queue to which the new task needs
> >+ * to be submitted.
> >+ * @work: Contains information about the task that needs to be submitted.
> >+ */
> >+static void submit_work_to_queue(ThreadletQueue *queue, ThreadletWork *work)
> > {
> >- int ret = pthread_mutex_lock(mutex);
> >- if (ret) die2(ret, "pthread_mutex_lock");
> >+ qemu_mutex_lock(&queue->lock);
> >+ if (queue->idle_threads == 0&& queue->cur_threads< queue->max_threads) {
> >+ spawn_threadlet(queue);
> >+ } else {
> >+ qemu_cond_signal(&queue->cond);
> >+ }
> >+ QTAILQ_INSERT_TAIL(&queue->request_list, work, node);
> >+ qemu_mutex_unlock(&queue->lock);
> > }
> >
> >-static void mutex_unlock(pthread_mutex_t *mutex)
> >+static void threadlet_queue_init(ThreadletQueue *queue,
> >+ int max_threads, int min_threads);
> >+
> >+/**
> >+ * submit_work: Submit to the global queue a new task to be executed
> >+ * asynchronously.
> >+ * @work: Contains information about the task that needs to be submitted.
> >+ */
> >+static void submit_work(ThreadletWork *work)
> > {
> >- int ret = pthread_mutex_unlock(mutex);
> >- if (ret) die2(ret, "pthread_mutex_unlock");
> >+ if (!globalqueue_init) {
> >+ threadlet_queue_init(&globalqueue, MAX_GLOBAL_THREADS,
> >+ MIN_GLOBAL_THREADS);
> >+ globalqueue_init = 1;
> >+ }
> >+
> >+ submit_work_to_queue(&globalqueue, work);
> > }
> >
> >-static int cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex,
> >- struct timespec *ts)
> >+/**
> >+ * dequeue_work_on_queue: Cancel a task queued on a Queue.
> >+ * @queue: The queue containing the task to be cancelled.
> >+ * @work: Contains the information of the task that needs to be cancelled.
> >+ *
> >+ * Returns: 0 if the task is successfully cancelled.
> >+ * 1 otherwise.
> >+ */
> >+static int dequeue_work_on_queue(ThreadletQueue *queue, ThreadletWork *work)
> > {
> >- int ret = pthread_cond_timedwait(cond, mutex, ts);
> >- if (ret&& ret != ETIMEDOUT) die2(ret, "pthread_cond_timedwait");
> >+ ThreadletWork *ret_work;
> >+ int ret = 1;
> >+
> >+ qemu_mutex_lock(&queue->lock);
> >+ QTAILQ_FOREACH(ret_work,&(queue->request_list), node) {
> >+ if (ret_work == work) {
> >+ QTAILQ_REMOVE(&queue->request_list, ret_work, node);
> >+ ret = 0;
> >+ break;
> >+ }
> >+ }
> >+ qemu_mutex_unlock(&queue->lock);
> >+
> > return ret;
> > }
> >
> >-static void cond_signal(pthread_cond_t *cond)
> >+/**
> >+ * dequeue_work: Cancel a task queued on the global queue.
> >+ * @work: Contains the information of the task that needs to be cancelled.
> >+ *
> >+ * Returns: 0 if the task is successfully cancelled.
> >+ * 1 otherwise.
> >+ */
> >+static int dequeue_work(ThreadletWork *work)
> >+{
> >+ return dequeue_work_on_queue(&globalqueue, work);
> >+}
> >+
> >+/**
> >+ * threadlet_queue_init: Initialize a threadlet queue.
> >+ * @queue: The threadlet queue to be initialized.
> >+ * @max_threads: Maximum number of threads processing the queue.
> >+ * @min_threads: Minimum number of threads processing the queue.
> >+ */
> >+static void threadlet_queue_init(ThreadletQueue *queue,
> >+ int max_threads, int min_threads)
> >+{
> >+ queue->cur_threads = 0;
> >+ queue->idle_threads = 0;
> >+ queue->max_threads = max_threads;
> >+ queue->min_threads = min_threads;
> >+ QTAILQ_INIT(&queue->request_list);
> >+ qemu_mutex_init(&queue->lock);
> >+ qemu_cond_init(&queue->cond);
> >+}
> >+
> >+#ifdef CONFIG_PREADV
> >+static int preadv_present = 1;
> >+#else
> >+static int preadv_present;
> >+#endif
> >+
> >+static void die2(int err, const char *what)
> > {
> >- int ret = pthread_cond_signal(cond);
> >- if (ret) die2(ret, "pthread_cond_signal");
> >+ fprintf(stderr, "%s failed: %s\n", what, strerror(err));
> >+ abort();
> > }
> >
> >-static void thread_create(pthread_t *thread, pthread_attr_t *attr,
> >- void *(*start_routine)(void*), void *arg)
> >+static void die(const char *what)
> > {
> >- int ret = pthread_create(thread, attr, start_routine, arg);
> >- if (ret) die2(ret, "pthread_create");
> >+ die2(errno, what);
> > }
> >
> > static ssize_t handle_aiocb_ioctl(struct qemu_paiocb *aiocb)
> >@@ -301,106 +431,58 @@ static ssize_t handle_aiocb_rw(struct qemu_paiocb *aiocb)
> > return nbytes;
> > }
> >
> >-static void *aio_thread(void *unused)
> >+static void aio_thread(ThreadletWork *work)
> > {
> > pid_t pid;
> >+ struct qemu_paiocb *aiocb = container_of(work, struct qemu_paiocb, work);
> >+ ssize_t ret = 0;
> >
> > pid = getpid();
> >
> >- while (1) {
> >- struct qemu_paiocb *aiocb;
> >- ssize_t ret = 0;
> >- qemu_timeval tv;
> >- struct timespec ts;
> >-
> >- qemu_gettimeofday(&tv);
> >- ts.tv_sec = tv.tv_sec + 10;
> >- ts.tv_nsec = 0;
> >-
> >- mutex_lock(&lock);
> >-
> >- while (QTAILQ_EMPTY(&request_list)&&
> >- !(ret == ETIMEDOUT)) {
> >- ret = cond_timedwait(&cond,&lock,&ts);
> >- }
> >-
> >- if (QTAILQ_EMPTY(&request_list))
> >- break;
> >-
> >- aiocb = QTAILQ_FIRST(&request_list);
> >- QTAILQ_REMOVE(&request_list, aiocb, node);
> >- aiocb->active = 1;
> >- idle_threads--;
> >- mutex_unlock(&lock);
> >-
> >- switch (aiocb->aio_type& QEMU_AIO_TYPE_MASK) {
> >- case QEMU_AIO_READ:
> >- case QEMU_AIO_WRITE:
> >- ret = handle_aiocb_rw(aiocb);
> >- break;
> >- case QEMU_AIO_FLUSH:
> >- ret = handle_aiocb_flush(aiocb);
> >- break;
> >- case QEMU_AIO_IOCTL:
> >- ret = handle_aiocb_ioctl(aiocb);
> >- break;
> >- default:
> >- fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
> >- ret = -EINVAL;
> >- break;
> >- }
> >-
> >- mutex_lock(&lock);
> >- aiocb->ret = ret;
> >- idle_threads++;
> >- mutex_unlock(&lock);
> >-
> >- if (kill(pid, aiocb->ev_signo)) die("kill failed");
> >+ switch (aiocb->aio_type& QEMU_AIO_TYPE_MASK) {
> >+ case QEMU_AIO_READ:
> >+ case QEMU_AIO_WRITE:
> >+ ret = handle_aiocb_rw(aiocb);
> >+ break;
> >+ case QEMU_AIO_FLUSH:
> >+ ret = handle_aiocb_flush(aiocb);
> >+ break;
> >+ case QEMU_AIO_IOCTL:
> >+ ret = handle_aiocb_ioctl(aiocb);
> >+ break;
> >+ default:
> >+ fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
> >+ ret = -EINVAL;
> >+ break;
> > }
> >
> >- idle_threads--;
> >- cur_threads--;
> >- mutex_unlock(&lock);
> >-
> >- return NULL;
> >-}
> >-
> >-static void spawn_thread(void)
> >-{
> >- sigset_t set, oldset;
> >-
> >- cur_threads++;
> >- idle_threads++;
> >+ qemu_mutex_lock(&aiocb_mutex);
> >+ aiocb->ret = ret;
> >+ qemu_cond_broadcast(&aiocb_completion);
> >+ qemu_mutex_unlock(&aiocb_mutex);
> >
> >- /* block all signals */
> >- if (sigfillset(&set)) die("sigfillset");
> >- if (sigprocmask(SIG_SETMASK,&set,&oldset)) die("sigprocmask");
> >-
> >- thread_create(&thread_id,&attr, aio_thread, NULL);
> >-
> >- if (sigprocmask(SIG_SETMASK,&oldset, NULL)) die("sigprocmask restore");
> >+ if (kill(pid, aiocb->ev_signo)) {
> >+ die("kill failed");
> >+ }
> > }
> >
> > static void qemu_paio_submit(struct qemu_paiocb *aiocb)
> > {
> >+ qemu_mutex_lock(&aiocb_mutex);
> > aiocb->ret = -EINPROGRESS;
> >- aiocb->active = 0;
> >- mutex_lock(&lock);
> >- if (idle_threads == 0&& cur_threads< max_threads)
> >- spawn_thread();
> >- QTAILQ_INSERT_TAIL(&request_list, aiocb, node);
> >- mutex_unlock(&lock);
> >- cond_signal(&cond);
> >+ qemu_mutex_unlock(&aiocb_mutex);
> >+
> >+ aiocb->work.func = aio_thread;
> >+ submit_work(&aiocb->work);
> > }
> >
> > static ssize_t qemu_paio_return(struct qemu_paiocb *aiocb)
> > {
> > ssize_t ret;
> >
> >- mutex_lock(&lock);
> >+ qemu_mutex_lock(&aiocb_mutex);
> > ret = aiocb->ret;
> >- mutex_unlock(&lock);
> >-
> >+ qemu_mutex_unlock(&aiocb_mutex);
> > return ret;
> > }
> >
> >@@ -534,22 +616,14 @@ static void paio_remove(struct qemu_paiocb *acb)
> > static void paio_cancel(BlockDriverAIOCB *blockacb)
> > {
> > struct qemu_paiocb *acb = (struct qemu_paiocb *)blockacb;
> >- int active = 0;
> >-
> >- mutex_lock(&lock);
> >- if (!acb->active) {
> >- QTAILQ_REMOVE(&request_list, acb, node);
> >- acb->ret = -ECANCELED;
> >- } else if (acb->ret == -EINPROGRESS) {
> >- active = 1;
> >- }
> >- mutex_unlock(&lock);
> >
> >- if (active) {
> >- /* fail safe: if the aio could not be canceled, we wait for
> >- it */
> >- while (qemu_paio_error(acb) == EINPROGRESS)
> >- ;
> >+ if (dequeue_work(&acb->work) != 0) {
> >+ /* Wait for running work item to complete */
> >+ qemu_mutex_lock(&aiocb_mutex);
> >+ while (acb->ret == -EINPROGRESS) {
> >+ qemu_cond_wait(&aiocb_completion,&aiocb_mutex);
> >+ }
> >+ qemu_mutex_unlock(&aiocb_mutex);
> > }
> >
> > paio_remove(acb);
> >@@ -618,11 +692,13 @@ int paio_init(void)
> > struct sigaction act;
> > PosixAioState *s;
> > int fds[2];
> >- int ret;
> >
> > if (posix_aio_state)
> > return 0;
> >
> >+ qemu_mutex_init(&aiocb_mutex);
> >+ qemu_cond_init(&aiocb_completion);
> >+
> > s = qemu_malloc(sizeof(PosixAioState));
> >
> > sigfillset(&act.sa_mask);
> >@@ -645,16 +721,6 @@ int paio_init(void)
> > qemu_aio_set_fd_handler(s->rfd, posix_aio_read, NULL, posix_aio_flush,
> > posix_aio_process_queue, s);
> >
> >- ret = pthread_attr_init(&attr);
> >- if (ret)
> >- die2(ret, "pthread_attr_init");
> >-
> >- ret = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
> >- if (ret)
> >- die2(ret, "pthread_attr_setdetachstate");
> >-
> >- QTAILQ_INIT(&request_list);
> >-
> > posix_aio_state = s;
> > return 0;
> > }
> >
> >
>
>
^ permalink raw reply [flat|nested] 6+ messages in thread
end of thread, other threads:[~2010-11-18 18:05 UTC | newest]
Thread overview: 6+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2010-11-15 17:53 [Qemu-devel] [PATCH 0/3] v11: Threadlets: A generic task offloading framework Arun R Bharadwaj
2010-11-15 17:53 ` [Qemu-devel] [PATCH 1/3] Make paio subsystem use threadlets infrastructure Arun R Bharadwaj
2010-11-15 21:13 ` Anthony Liguori
2010-11-18 18:05 ` Arun R Bharadwaj
2010-11-15 17:53 ` [Qemu-devel] [PATCH 2/3] Move threadlets infrastructure to qemu-threadlets.c Arun R Bharadwaj
2010-11-15 17:54 ` [Qemu-devel] [PATCH 3/3] Add helper functions to enable virtio-9p make use of the threadlets Arun R Bharadwaj
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.