All of lore.kernel.org
 help / color / mirror / Atom feed
* [RFC PATCH 0/6] job: replace AioContext lock with job_mutex
@ 2021-07-07 16:58 Emanuele Giuseppe Esposito
  2021-07-07 16:58 ` [RFC PATCH 1/6] job: use getter/setters instead of accessing the Job fields directly Emanuele Giuseppe Esposito
                   ` (7 more replies)
  0 siblings, 8 replies; 33+ messages in thread
From: Emanuele Giuseppe Esposito @ 2021-07-07 16:58 UTC (permalink / raw)
  To: qemu-block
  Cc: Kevin Wolf, Vladimir Sementsov-Ogievskiy, qemu-devel,
	Wen Congyang, Xie Changlong, Markus Armbruster, Max Reitz,
	Stefan Hajnoczi, Paolo Bonzini, John Snow

This is a continuation on the work to reduce (and possibly get rid of) the usage of AioContext lock, by introducing smaller granularity locks to keep the thread safety.

This series aims to:
1) remove the aiocontext lock and substitute it with the already existing
   global job_mutex
2) fix what it looks like to be an oversight when moving the blockjob.c logic
   into the more generic job.c: job_mutex was introduced especially to
   protect job->busy flag, but it seems that it was not used in successive
   patches, because there are multiple code sections that directly
   access the field without any locking.
3) use job_mutex instead of the aiocontext_lock
4) extend the reach of the job_mutex to protect all shared fields
   that the job structure has.

The reason why we propose to use the existing job_mutex and not make one for
each job is to keep things as simple as possible for now, and because
the jobs are not in the execution critical path, so we can affort
some delays.
Having a lock per job would increase overall complexity and
increase the chances of deadlocks (one good example could be the job
transactions, where multiple jobs are grouped together).
Anyways, the per-job mutex can always be added in the future.

Patch 1-4 are in preparation for patch 5. They try to simplify and clarify
the job_mutex usage. Patch 5 tries to add proper syncronization to the job
structure, replacing the AioContext lock when necessary.
Patch 6 just removes unnecessary AioContext locks that are now unneeded.


RFC: I am not sure the way I layed out the locks is ideal.
But their usage should not make deadlocks. I also made sure
the series passess all qemu_iotests.

What is very clear from this patch is that it
is strictly related to the brdv_* and lower level calls, because
they also internally check or even use the aiocontext lock.
Therefore, in order to make it work, I temporarly added some
aiocontext_acquire/release pair around the function that
still assert for them or assume they are hold and temporarly
unlock (unlock() - lock()).

I also apologize for the amount of changes in patch 5, any suggestion on
how to improve the patch layout is also very much appreciated.

Emanuele Giuseppe Esposito (6):
  job: use getter/setters instead of accessing the Job fields directly
  job: _locked functions and public job_lock/unlock for next patch
  job: minor changes to simplify locking
  job.h: categorize job fields
  job: use global job_mutex to protect struct Job
  jobs: remove unnecessary AioContext aquire/release pairs

 include/block/blockjob_int.h   |   1 +
 include/qemu/job.h             | 159 ++++++++++--
 block.c                        |   2 +-
 block/backup.c                 |   4 +
 block/commit.c                 |   4 +-
 block/mirror.c                 |  30 ++-
 block/monitor/block-hmp-cmds.c |   6 -
 block/replication.c            |   3 +-
 blockdev.c                     | 235 ++++++------------
 blockjob.c                     | 140 +++++++----
 job-qmp.c                      |  65 +++--
 job.c                          | 432 ++++++++++++++++++++++++++-------
 qemu-img.c                     |  19 +-
 13 files changed, 724 insertions(+), 376 deletions(-)

-- 
2.31.1



^ permalink raw reply	[flat|nested] 33+ messages in thread

* [RFC PATCH 1/6] job: use getter/setters instead of accessing the Job fields directly
  2021-07-07 16:58 [RFC PATCH 0/6] job: replace AioContext lock with job_mutex Emanuele Giuseppe Esposito
@ 2021-07-07 16:58 ` Emanuele Giuseppe Esposito
  2021-07-07 16:58 ` [RFC PATCH 2/6] job: _locked functions and public job_lock/unlock for next patch Emanuele Giuseppe Esposito
                   ` (6 subsequent siblings)
  7 siblings, 0 replies; 33+ messages in thread
From: Emanuele Giuseppe Esposito @ 2021-07-07 16:58 UTC (permalink / raw)
  To: qemu-block
  Cc: Kevin Wolf, Emanuele Giuseppe Esposito,
	Vladimir Sementsov-Ogievskiy, qemu-devel, Wen Congyang,
	Xie Changlong, Markus Armbruster, Max Reitz, Stefan Hajnoczi,
	Paolo Bonzini, John Snow

Using getters/setters we can have a more strict control on struct Job
fields. The struct remains public, because it is also used as base
class for BlockJobs and various, but replace all direct accesses
to the fields we want to protect with getters/setters.
This is in preparation to the locking patches.

No functional change intended.

Signed-off-by: Emanuele Giuseppe Esposito <eesposit@redhat.com>
---
 include/qemu/job.h  | 33 +++++++++++++++++++
 block.c             |  2 +-
 block/commit.c      |  4 +--
 block/mirror.c      | 17 +++++-----
 block/replication.c |  3 +-
 blockdev.c          |  2 +-
 blockjob.c          | 78 ++++++++++++++++++++++++---------------------
 job-qmp.c           | 16 ++++++----
 job.c               | 52 +++++++++++++++++++++++++++++-
 qemu-img.c          |  2 +-
 10 files changed, 151 insertions(+), 58 deletions(-)

diff --git a/include/qemu/job.h b/include/qemu/job.h
index 41162ed494..72c7d0f69d 100644
--- a/include/qemu/job.h
+++ b/include/qemu/job.h
@@ -303,6 +303,39 @@ void job_txn_unref(JobTxn *txn);
  */
 void job_txn_add_job(JobTxn *txn, Job *job);
 
+/** Returns the @ret field of a given Job. */
+int job_get_ret(Job *job);
+
+/** Returns the AioContext of a given Job. */
+AioContext *job_get_aiocontext(Job *job);
+
+/** Sets the AioContext of a given Job. */
+void job_set_aiocontext(Job *job, AioContext *aio);
+
+/** Returns if a given Job is busy. */
+bool job_is_busy(Job *job);
+
+/** Returns the Error of a given Job. */
+Error *job_get_err(Job *job);
+
+/** Returns if a Job has a pause_count > 0. */
+bool job_should_pause(Job *job);
+
+/** Sets the user_paused flag of a given Job to true. */
+void job_set_user_paused(Job *job);
+
+/** Sets the cancelled flag of a given Job. */
+void job_set_cancelled(Job *job, bool cancel);
+
+/** Returns if a given Job is paused. */
+bool job_is_paused(Job *job);
+
+/** Returns if a given Job is force cancelled. */
+bool job_is_force_cancel(Job *job);
+
+/** Returns the statis of a given Job. */
+JobStatus job_get_status(Job *job);
+
 /**
  * Create a new long-running job and return it.
  *
diff --git a/block.c b/block.c
index acd35cb0cb..1628db2550 100644
--- a/block.c
+++ b/block.c
@@ -5721,7 +5721,7 @@ XDbgBlockGraph *bdrv_get_xdbg_block_graph(Error **errp)
         GSList *el;
 
         xdbg_graph_add_node(gr, job, X_DBG_BLOCK_GRAPH_NODE_TYPE_BLOCK_JOB,
-                           job->job.id);
+                            job->job.id);
         for (el = job->nodes; el; el = el->next) {
             xdbg_graph_add_edge(gr, job, (BdrvChild *)el->data);
         }
diff --git a/block/commit.c b/block/commit.c
index 42792b4556..087865953e 100644
--- a/block/commit.c
+++ b/block/commit.c
@@ -367,7 +367,7 @@ void commit_start(const char *job_id, BlockDriverState *bs,
         goto fail;
     }
 
-    s->base = blk_new(s->common.job.aio_context,
+    s->base = blk_new(job_get_aiocontext(&s->common.job),
                       base_perms,
                       BLK_PERM_CONSISTENT_READ
                       | BLK_PERM_GRAPH_MOD
@@ -380,7 +380,7 @@ void commit_start(const char *job_id, BlockDriverState *bs,
     s->base_bs = base;
 
     /* Required permissions are already taken with block_job_add_bdrv() */
-    s->top = blk_new(s->common.job.aio_context, 0, BLK_PERM_ALL);
+    s->top = blk_new(job_get_aiocontext(&s->common.job), 0, BLK_PERM_ALL);
     ret = blk_insert_bs(s->top, top, errp);
     if (ret < 0) {
         goto fail;
diff --git a/block/mirror.c b/block/mirror.c
index 019f6deaa5..49aaaafffa 100644
--- a/block/mirror.c
+++ b/block/mirror.c
@@ -636,7 +636,7 @@ static int mirror_exit_common(Job *job)
     BlockDriverState *target_bs;
     BlockDriverState *mirror_top_bs;
     Error *local_err = NULL;
-    bool abort = job->ret < 0;
+    bool abort = job_get_ret(job) < 0;
     int ret = 0;
 
     if (s->prepared) {
@@ -930,7 +930,7 @@ static int coroutine_fn mirror_run(Job *job, Error **errp)
         while (!job_is_cancelled(&s->common.job) && !s->should_complete) {
             job_yield(&s->common.job);
         }
-        s->common.job.cancelled = false;
+        job_set_cancelled(&s->common.job, false);
         goto immediate_exit;
     }
 
@@ -1065,7 +1065,7 @@ static int coroutine_fn mirror_run(Job *job, Error **errp)
              * completion.
              */
             assert(QLIST_EMPTY(&bs->tracked_requests));
-            s->common.job.cancelled = false;
+            job_set_cancelled(&s->common.job, false);
             need_drain = false;
             break;
         }
@@ -1079,7 +1079,7 @@ static int coroutine_fn mirror_run(Job *job, Error **errp)
         trace_mirror_before_sleep(s, cnt, s->synced, delay_ns);
         job_sleep_ns(&s->common.job, delay_ns);
         if (job_is_cancelled(&s->common.job) &&
-            (!s->synced || s->common.job.force_cancel))
+            (!s->synced || job_is_force_cancel(&s->common.job)))
         {
             break;
         }
@@ -1092,8 +1092,8 @@ immediate_exit:
          * or it was cancelled prematurely so that we do not guarantee that
          * the target is a copy of the source.
          */
-        assert(ret < 0 || ((s->common.job.force_cancel || !s->synced) &&
-               job_is_cancelled(&s->common.job)));
+        assert(ret < 0 || ((job_is_force_cancel(&s->common.job) || !s->synced)
+               && job_is_cancelled(&s->common.job)));
         assert(need_drain);
         mirror_wait_for_all_io(s);
     }
@@ -1150,7 +1150,7 @@ static void mirror_complete(Job *job, Error **errp)
     s->should_complete = true;
 
     /* If the job is paused, it will be re-entered when it is resumed */
-    if (!job->paused) {
+    if (!job_is_paused(job)) {
         job_enter(job);
     }
 }
@@ -1171,7 +1171,8 @@ static bool mirror_drained_poll(BlockJob *job)
      * from one of our own drain sections, to avoid a deadlock waiting for
      * ourselves.
      */
-    if (!s->common.job.paused && !s->common.job.cancelled && !s->in_drain) {
+    if (!job_is_paused(&s->common.job) && !job_is_cancelled(&s->common.job) &&
+        !s->in_drain) {
         return true;
     }
 
diff --git a/block/replication.c b/block/replication.c
index 52163f2d1f..3923761a54 100644
--- a/block/replication.c
+++ b/block/replication.c
@@ -149,7 +149,8 @@ static void replication_close(BlockDriverState *bs)
     }
     if (s->stage == BLOCK_REPLICATION_FAILOVER) {
         commit_job = &s->commit_job->job;
-        assert(commit_job->aio_context == qemu_get_current_aio_context());
+        assert(job_get_aiocontext(commit_job) ==
+               qemu_get_current_aio_context());
         job_cancel_sync(commit_job);
     }
 
diff --git a/blockdev.c b/blockdev.c
index f08192deda..8e2c15370e 100644
--- a/blockdev.c
+++ b/blockdev.c
@@ -147,7 +147,7 @@ void blockdev_mark_auto_del(BlockBackend *blk)
 
     for (job = block_job_next(NULL); job; job = block_job_next(job)) {
         if (block_job_has_bdrv(job, blk_bs(blk))) {
-            AioContext *aio_context = job->job.aio_context;
+            AioContext *aio_context = job_get_aiocontext(&job->job);
             aio_context_acquire(aio_context);
 
             job_cancel(&job->job, false);
diff --git a/blockjob.c b/blockjob.c
index 4bad1408cb..7f49f03ec7 100644
--- a/blockjob.c
+++ b/blockjob.c
@@ -112,7 +112,7 @@ static bool child_job_drained_poll(BdrvChild *c)
     /* An inactive or completed job doesn't have any pending requests. Jobs
      * with !job->busy are either already paused or have a pause point after
      * being reentered, so no job driver code will run before they pause. */
-    if (!job->busy || job_is_completed(job)) {
+    if (!job_is_busy(job) || job_is_completed(job)) {
         return false;
     }
 
@@ -161,14 +161,14 @@ static void child_job_set_aio_ctx(BdrvChild *c, AioContext *ctx,
         bdrv_set_aio_context_ignore(sibling->bs, ctx, ignore);
     }
 
-    job->job.aio_context = ctx;
+    job_set_aiocontext(&job->job, ctx);
 }
 
 static AioContext *child_job_get_parent_aio_context(BdrvChild *c)
 {
     BlockJob *job = c->opaque;
 
-    return job->job.aio_context;
+    return job_get_aiocontext(&job->job);
 }
 
 static const BdrvChildClass child_job = {
@@ -222,18 +222,19 @@ int block_job_add_bdrv(BlockJob *job, const char *name, BlockDriverState *bs,
 {
     BdrvChild *c;
     bool need_context_ops;
+    AioContext *ctx = job_get_aiocontext(&job->job);
 
     bdrv_ref(bs);
 
-    need_context_ops = bdrv_get_aio_context(bs) != job->job.aio_context;
+    need_context_ops = bdrv_get_aio_context(bs) != ctx;
 
-    if (need_context_ops && job->job.aio_context != qemu_get_aio_context()) {
-        aio_context_release(job->job.aio_context);
+    if (need_context_ops && ctx != qemu_get_aio_context()) {
+        aio_context_release(ctx);
     }
     c = bdrv_root_attach_child(bs, name, &child_job, 0, perm, shared_perm, job,
                                errp);
-    if (need_context_ops && job->job.aio_context != qemu_get_aio_context()) {
-        aio_context_acquire(job->job.aio_context);
+    if (need_context_ops && ctx != qemu_get_aio_context()) {
+        aio_context_acquire(ctx);
     }
     if (c == NULL) {
         return -EPERM;
@@ -303,37 +304,41 @@ int64_t block_job_ratelimit_get_delay(BlockJob *job, uint64_t n)
     return ratelimit_calculate_delay(&job->limit, n);
 }
 
-BlockJobInfo *block_job_query(BlockJob *job, Error **errp)
+BlockJobInfo *block_job_query(BlockJob *blkjob, Error **errp)
 {
     BlockJobInfo *info;
     uint64_t progress_current, progress_total;
+    int job_ret;
+    Job *job = &blkjob->job;
 
-    if (block_job_is_internal(job)) {
+    if (block_job_is_internal(blkjob)) {
         error_setg(errp, "Cannot query QEMU internal jobs");
         return NULL;
     }
 
-    progress_get_snapshot(&job->job.progress, &progress_current,
+    progress_get_snapshot(&job->progress, &progress_current,
                           &progress_total);
 
     info = g_new0(BlockJobInfo, 1);
-    info->type      = g_strdup(job_type_str(&job->job));
-    info->device    = g_strdup(job->job.id);
-    info->busy      = qatomic_read(&job->job.busy);
-    info->paused    = job->job.pause_count > 0;
+    info->type      = g_strdup(job_type_str(job));
+    info->device    = g_strdup(job->id);
+    info->busy      = job_is_busy(job);
+    info->paused    = job_should_pause(job);
     info->offset    = progress_current;
     info->len       = progress_total;
-    info->speed     = job->speed;
-    info->io_status = job->iostatus;
-    info->ready     = job_is_ready(&job->job),
-    info->status    = job->job.status;
-    info->auto_finalize = job->job.auto_finalize;
-    info->auto_dismiss  = job->job.auto_dismiss;
-    if (job->job.ret) {
+    info->speed     = blkjob->speed;
+    info->io_status = blkjob->iostatus;
+    info->ready     = job_is_ready(job);
+    info->status    = job_get_status(job);
+    info->auto_finalize = job->auto_finalize;
+    info->auto_dismiss = job->auto_dismiss;
+    job_ret = job_get_ret(job);
+    if (job_ret) {
+        Error *job_err = job_get_err(job);
         info->has_error = true;
-        info->error = job->job.err ?
-                        g_strdup(error_get_pretty(job->job.err)) :
-                        g_strdup(strerror(-job->job.ret));
+        info->error = job_err ?
+                        g_strdup(error_get_pretty(job_err)) :
+                        g_strdup(strerror(-job_ret));
     }
     return info;
 }
@@ -367,26 +372,27 @@ static void block_job_event_cancelled(Notifier *n, void *opaque)
 
 static void block_job_event_completed(Notifier *n, void *opaque)
 {
-    BlockJob *job = opaque;
+    BlockJob *blkjob = opaque;
     const char *msg = NULL;
     uint64_t progress_current, progress_total;
+    Job *job = &blkjob->job;
 
-    if (block_job_is_internal(job)) {
+    if (block_job_is_internal(blkjob)) {
         return;
     }
 
-    if (job->job.ret < 0) {
-        msg = error_get_pretty(job->job.err);
+    if (job_get_ret(job) < 0) {
+        msg = error_get_pretty(job_get_err(job));
     }
 
-    progress_get_snapshot(&job->job.progress, &progress_current,
+    progress_get_snapshot(&job->progress, &progress_current,
                           &progress_total);
 
-    qapi_event_send_block_job_completed(job_type(&job->job),
-                                        job->job.id,
+    qapi_event_send_block_job_completed(job_type(job),
+                                        job->id,
                                         progress_total,
                                         progress_current,
-                                        job->speed,
+                                        blkjob->speed,
                                         !!msg,
                                         msg);
 }
@@ -498,7 +504,7 @@ void block_job_iostatus_reset(BlockJob *job)
     if (job->iostatus == BLOCK_DEVICE_IO_STATUS_OK) {
         return;
     }
-    assert(job->job.user_paused && job->job.pause_count > 0);
+    assert(job_user_paused(&job->job) && job_should_pause(&job->job));
     job->iostatus = BLOCK_DEVICE_IO_STATUS_OK;
 }
 
@@ -538,10 +544,10 @@ BlockErrorAction block_job_error_action(BlockJob *job, BlockdevOnError on_err,
                                         action);
     }
     if (action == BLOCK_ERROR_ACTION_STOP) {
-        if (!job->job.user_paused) {
+        if (!job_user_paused(&job->job)) {
             job_pause(&job->job);
             /* make the pause user visible, which will be resumed from QMP. */
-            job->job.user_paused = true;
+            job_set_user_paused(&job->job);
         }
         block_job_iostatus_set_err(job, error);
     }
diff --git a/job-qmp.c b/job-qmp.c
index 829a28aa70..12238a1643 100644
--- a/job-qmp.c
+++ b/job-qmp.c
@@ -42,7 +42,7 @@ static Job *find_job(const char *id, AioContext **aio_context, Error **errp)
         return NULL;
     }
 
-    *aio_context = job->aio_context;
+    *aio_context = job_get_aiocontext(job);
     aio_context_acquire(*aio_context);
 
     return job;
@@ -122,7 +122,7 @@ void qmp_job_finalize(const char *id, Error **errp)
      * automatically acquires the new one), so make sure we release the correct
      * one.
      */
-    aio_context = job->aio_context;
+    aio_context = job_get_aiocontext(job);
     job_unref(job);
     aio_context_release(aio_context);
 }
@@ -146,21 +146,23 @@ static JobInfo *job_query_single(Job *job, Error **errp)
     JobInfo *info;
     uint64_t progress_current;
     uint64_t progress_total;
+    Error *job_err;
 
     assert(!job_is_internal(job));
     progress_get_snapshot(&job->progress, &progress_current,
                           &progress_total);
+    job_err = job_get_err(job);
 
     info = g_new(JobInfo, 1);
     *info = (JobInfo) {
         .id                 = g_strdup(job->id),
         .type               = job_type(job),
-        .status             = job->status,
+        .status             = job_get_status(job),
         .current_progress   = progress_current,
         .total_progress     = progress_total,
-        .has_error          = !!job->err,
-        .error              = job->err ? \
-                              g_strdup(error_get_pretty(job->err)) : NULL,
+        .has_error          = !!job_err,
+        .error              = job_err ? \
+                              g_strdup(error_get_pretty(job_err)) : NULL,
     };
 
     return info;
@@ -178,7 +180,7 @@ JobInfoList *qmp_query_jobs(Error **errp)
         if (job_is_internal(job)) {
             continue;
         }
-        aio_context = job->aio_context;
+        aio_context = job_get_aiocontext(job);
         aio_context_acquire(aio_context);
         value = job_query_single(job, errp);
         aio_context_release(aio_context);
diff --git a/job.c b/job.c
index e7a5d28854..872bbebb01 100644
--- a/job.c
+++ b/job.c
@@ -94,6 +94,46 @@ static void __attribute__((__constructor__)) job_init(void)
     qemu_mutex_init(&job_mutex);
 }
 
+AioContext *job_get_aiocontext(Job *job)
+{
+    return job->aio_context;
+}
+
+void job_set_aiocontext(Job *job, AioContext *aio)
+{
+    job->aio_context = aio;
+}
+
+bool job_is_busy(Job *job)
+{
+    return qatomic_read(&job->busy);
+}
+
+int job_get_ret(Job *job)
+{
+    return job->ret;
+}
+
+Error *job_get_err(Job *job)
+{
+    return job->err;
+}
+
+JobStatus job_get_status(Job *job)
+{
+    return job->status;
+}
+
+void job_set_cancelled(Job *job, bool cancel)
+{
+    job->cancelled = cancel;
+}
+
+bool job_is_force_cancel(Job *job)
+{
+    return job->force_cancel;
+}
+
 JobTxn *job_txn_new(void)
 {
     JobTxn *txn = g_new0(JobTxn, 1);
@@ -269,11 +309,16 @@ static bool job_started(Job *job)
     return job->co;
 }
 
-static bool job_should_pause(Job *job)
+bool job_should_pause(Job *job)
 {
     return job->pause_count > 0;
 }
 
+bool job_is_paused(Job *job)
+{
+    return job->paused;
+}
+
 Job *job_next(Job *job)
 {
     if (!job) {
@@ -591,6 +636,11 @@ bool job_user_paused(Job *job)
     return job->user_paused;
 }
 
+void job_set_user_paused(Job *job)
+{
+    job->user_paused = true;
+}
+
 void job_user_resume(Job *job, Error **errp)
 {
     assert(job);
diff --git a/qemu-img.c b/qemu-img.c
index 7956a89965..d16bd367d9 100644
--- a/qemu-img.c
+++ b/qemu-img.c
@@ -921,7 +921,7 @@ static void run_block_job(BlockJob *job, Error **errp)
     if (!job_is_completed(&job->job)) {
         ret = job_complete_sync(&job->job, errp);
     } else {
-        ret = job->job.ret;
+        ret = job_get_ret(&job->job);
     }
     job_unref(&job->job);
     aio_context_release(aio_context);
-- 
2.31.1



^ permalink raw reply related	[flat|nested] 33+ messages in thread

* [RFC PATCH 2/6] job: _locked functions and public job_lock/unlock for next patch
  2021-07-07 16:58 [RFC PATCH 0/6] job: replace AioContext lock with job_mutex Emanuele Giuseppe Esposito
  2021-07-07 16:58 ` [RFC PATCH 1/6] job: use getter/setters instead of accessing the Job fields directly Emanuele Giuseppe Esposito
@ 2021-07-07 16:58 ` Emanuele Giuseppe Esposito
  2021-07-08 10:50   ` Stefan Hajnoczi
  2021-07-07 16:58 ` [RFC PATCH 3/6] job: minor changes to simplify locking Emanuele Giuseppe Esposito
                   ` (5 subsequent siblings)
  7 siblings, 1 reply; 33+ messages in thread
From: Emanuele Giuseppe Esposito @ 2021-07-07 16:58 UTC (permalink / raw)
  To: qemu-block
  Cc: Kevin Wolf, Emanuele Giuseppe Esposito,
	Vladimir Sementsov-Ogievskiy, qemu-devel, Wen Congyang,
	Xie Changlong, Markus Armbruster, Max Reitz, Stefan Hajnoczi,
	Paolo Bonzini, John Snow

Create _locked functions, to make next patch a little bit smaller.
Also set the locking functions as public, so that they can be used
also from structures using the Job struct.

Signed-off-by: Emanuele Giuseppe Esposito <eesposit@redhat.com>
---
 include/qemu/job.h | 23 +++++++++++++
 job.c              | 85 ++++++++++++++++++++++++++++++++++++++--------
 2 files changed, 93 insertions(+), 15 deletions(-)

diff --git a/include/qemu/job.h b/include/qemu/job.h
index 72c7d0f69d..ba2f9b2660 100644
--- a/include/qemu/job.h
+++ b/include/qemu/job.h
@@ -305,6 +305,7 @@ void job_txn_add_job(JobTxn *txn, Job *job);
 
 /** Returns the @ret field of a given Job. */
 int job_get_ret(Job *job);
+int job_get_ret_locked(Job *job);
 
 /** Returns the AioContext of a given Job. */
 AioContext *job_get_aiocontext(Job *job);
@@ -336,6 +337,24 @@ bool job_is_force_cancel(Job *job);
 /** Returns the statis of a given Job. */
 JobStatus job_get_status(Job *job);
 
+/**
+ * job_lock:
+ *
+ * Take the mutex protecting the list of jobs and their status.
+ * Most functions called by the monitor need to call job_lock
+ * and job_unlock manually.  On the other hand, function called
+ * by the block jobs themselves and by the block layer will take the
+ * lock for you.
+ */
+void job_lock(void);
+
+/**
+ * job_unlock:
+ *
+ * Release the mutex protecting the list of jobs and their status.
+ */
+void job_unlock(void);
+
 /**
  * Create a new long-running job and return it.
  *
@@ -424,6 +443,7 @@ void job_start(Job *job);
  * Continue the specified job by entering the coroutine.
  */
 void job_enter(Job *job);
+void job_enter_locked(Job *job);
 
 /**
  * @job: The job that is ready to pause.
@@ -462,12 +482,15 @@ bool job_is_internal(Job *job);
 
 /** Returns whether the job is scheduled for cancellation. */
 bool job_is_cancelled(Job *job);
+bool job_is_cancelled_locked(Job *job);
 
 /** Returns whether the job is in a completed state. */
 bool job_is_completed(Job *job);
+bool job_is_completed_locked(Job *job);
 
 /** Returns whether the job is ready to be completed. */
 bool job_is_ready(Job *job);
+bool job_is_ready_locked(Job *job);
 
 /**
  * Request @job to pause at the next pause point. Must be paired with
diff --git a/job.c b/job.c
index 872bbebb01..96fb8e9730 100644
--- a/job.c
+++ b/job.c
@@ -32,6 +32,10 @@
 #include "trace/trace-root.h"
 #include "qapi/qapi-events-job.h"
 
+/* job_mutex protexts the jobs list, but also the job operations. */
+static QemuMutex job_mutex;
+
+/* Protected by job_mutex */
 static QLIST_HEAD(, Job) jobs = QLIST_HEAD_INITIALIZER(jobs);
 
 /* Job State Transition Table */
@@ -64,27 +68,22 @@ bool JobVerbTable[JOB_VERB__MAX][JOB_STATUS__MAX] = {
 /* Transactional group of jobs */
 struct JobTxn {
 
-    /* Is this txn being cancelled? */
+    /* Is this txn being cancelled? Atomic.*/
     bool aborting;
 
-    /* List of jobs */
+    /* List of jobs. Protected by job_mutex. */
     QLIST_HEAD(, Job) jobs;
 
-    /* Reference count */
+    /* Reference count. Atomic. */
     int refcnt;
 };
 
-/* Right now, this mutex is only needed to synchronize accesses to job->busy
- * and job->sleep_timer, such as concurrent calls to job_do_yield and
- * job_enter. */
-static QemuMutex job_mutex;
-
-static void job_lock(void)
+void job_lock(void)
 {
     qemu_mutex_lock(&job_mutex);
 }
 
-static void job_unlock(void)
+void job_unlock(void)
 {
     qemu_mutex_unlock(&job_mutex);
 }
@@ -109,11 +108,22 @@ bool job_is_busy(Job *job)
     return qatomic_read(&job->busy);
 }
 
-int job_get_ret(Job *job)
+/* Called with job_mutex held. */
+int job_get_ret_locked(Job *job)
 {
     return job->ret;
 }
 
+/* Called with job_mutex *not* held. */
+int job_get_ret(Job *job)
+{
+    int ret;
+    job_lock();
+    ret = job_get_ret_locked(job);
+    job_unlock();
+    return ret;
+}
+
 Error *job_get_err(Job *job)
 {
     return job->err;
@@ -255,12 +265,24 @@ const char *job_type_str(const Job *job)
     return JobType_str(job_type(job));
 }
 
-bool job_is_cancelled(Job *job)
+/* Called with job_mutex held. */
+bool job_is_cancelled_locked(Job *job)
 {
     return job->cancelled;
 }
 
-bool job_is_ready(Job *job)
+/* Called with job_mutex *not* held. */
+bool job_is_cancelled(Job *job)
+{
+    bool ret;
+    job_lock();
+    ret = job_is_cancelled_locked(job);
+    job_unlock();
+    return ret;
+}
+
+/* Called with job_mutex held. */
+bool job_is_ready_locked(Job *job)
 {
     switch (job->status) {
     case JOB_STATUS_UNDEFINED:
@@ -282,7 +304,18 @@ bool job_is_ready(Job *job)
     return false;
 }
 
-bool job_is_completed(Job *job)
+/* Called with job_mutex *not* held. */
+bool job_is_ready(Job *job)
+{
+    bool ret;
+    job_lock();
+    ret = job_is_ready_locked(job);
+    job_unlock();
+    return ret;
+}
+
+/* Called with job_mutex held. */
+bool job_is_completed_locked(Job *job)
 {
     switch (job->status) {
     case JOB_STATUS_UNDEFINED:
@@ -304,6 +337,17 @@ bool job_is_completed(Job *job)
     return false;
 }
 
+/* Called with job_mutex *not* held. */
+bool job_is_completed(Job *job)
+{
+    bool ret;
+    job_lock();
+    ret = job_is_completed_locked(job);
+    job_unlock();
+    return ret;
+}
+
+/* Does not need job_mutex. Value is never modified */
 static bool job_started(Job *job)
 {
     return job->co;
@@ -503,11 +547,20 @@ void job_enter_cond(Job *job, bool(*fn)(Job *job))
     aio_co_enter(job->aio_context, job->co);
 }
 
-void job_enter(Job *job)
+/* Called with job_mutex held. */
+void job_enter_locked(Job *job)
 {
     job_enter_cond(job, NULL);
 }
 
+/* Called with job_mutex *not* held. */
+void job_enter(Job *job)
+{
+    job_lock();
+    job_enter_locked(job, NULL);
+    job_unlock();
+}
+
 /* Yield, and schedule a timer to reenter the coroutine after @ns nanoseconds.
  * Reentering the job coroutine with job_enter() before the timer has expired
  * is allowed and cancels the timer.
@@ -684,12 +737,14 @@ void job_dismiss(Job **jobptr, Error **errp)
     *jobptr = NULL;
 }
 
+/* Called with job_mutex held. */
 void job_early_fail(Job *job)
 {
     assert(job->status == JOB_STATUS_CREATED);
     job_do_dismiss(job);
 }
 
+/* Called with job_mutex held. */
 static void job_conclude(Job *job)
 {
     job_state_transition(job, JOB_STATUS_CONCLUDED);
-- 
2.31.1



^ permalink raw reply related	[flat|nested] 33+ messages in thread

* [RFC PATCH 3/6] job: minor changes to simplify locking
  2021-07-07 16:58 [RFC PATCH 0/6] job: replace AioContext lock with job_mutex Emanuele Giuseppe Esposito
  2021-07-07 16:58 ` [RFC PATCH 1/6] job: use getter/setters instead of accessing the Job fields directly Emanuele Giuseppe Esposito
  2021-07-07 16:58 ` [RFC PATCH 2/6] job: _locked functions and public job_lock/unlock for next patch Emanuele Giuseppe Esposito
@ 2021-07-07 16:58 ` Emanuele Giuseppe Esposito
  2021-07-08 10:55   ` Stefan Hajnoczi
  2021-07-13 17:56   ` Eric Blake
  2021-07-07 16:58 ` [RFC PATCH 4/6] job.h: categorize job fields Emanuele Giuseppe Esposito
                   ` (4 subsequent siblings)
  7 siblings, 2 replies; 33+ messages in thread
From: Emanuele Giuseppe Esposito @ 2021-07-07 16:58 UTC (permalink / raw)
  To: qemu-block
  Cc: Kevin Wolf, Emanuele Giuseppe Esposito,
	Vladimir Sementsov-Ogievskiy, qemu-devel, Wen Congyang,
	Xie Changlong, Markus Armbruster, Max Reitz, Stefan Hajnoczi,
	Paolo Bonzini, John Snow

Check for NULL id to job_get, so that in the next patch we can
move job_get inside a single critical section of job_create.

Also add missing notifier_list_init for the on_idle NotifierList,
which seems to have been forgot.

Signed-off-by: Emanuele Giuseppe Esposito <eesposit@redhat.com>
---
 job.c | 16 ++++++++++++----
 1 file changed, 12 insertions(+), 4 deletions(-)

diff --git a/job.c b/job.c
index 96fb8e9730..48b304c3ff 100644
--- a/job.c
+++ b/job.c
@@ -375,6 +375,10 @@ Job *job_get(const char *id)
 {
     Job *job;
 
+    if (!id) {
+        return NULL;
+    }
+
     QLIST_FOREACH(job, &jobs, job_list) {
         if (job->id && !strcmp(id, job->id)) {
             return job;
@@ -406,15 +410,18 @@ void *job_create(const char *job_id, const JobDriver *driver, JobTxn *txn,
             error_setg(errp, "Invalid job ID '%s'", job_id);
             return NULL;
         }
-        if (job_get(job_id)) {
-            error_setg(errp, "Job ID '%s' already in use", job_id);
-            return NULL;
-        }
     } else if (!(flags & JOB_INTERNAL)) {
         error_setg(errp, "An explicit job ID is required");
         return NULL;
     }
 
+    job_lock();
+    if (job_get(job_id)) {
+        error_setg(errp, "Job ID '%s' already in use", job_id);
+        job_unlock();
+        return NULL;
+    }
+
     job = g_malloc0(driver->instance_size);
     job->driver        = driver;
     job->id            = g_strdup(job_id);
@@ -434,6 +441,7 @@ void *job_create(const char *job_id, const JobDriver *driver, JobTxn *txn,
     notifier_list_init(&job->on_finalize_completed);
     notifier_list_init(&job->on_pending);
     notifier_list_init(&job->on_ready);
+    notifier_list_init(&job->on_idle);
 
     job_state_transition(job, JOB_STATUS_CREATED);
     aio_timer_init(qemu_get_aio_context(), &job->sleep_timer,
-- 
2.31.1



^ permalink raw reply related	[flat|nested] 33+ messages in thread

* [RFC PATCH 4/6] job.h: categorize job fields
  2021-07-07 16:58 [RFC PATCH 0/6] job: replace AioContext lock with job_mutex Emanuele Giuseppe Esposito
                   ` (2 preceding siblings ...)
  2021-07-07 16:58 ` [RFC PATCH 3/6] job: minor changes to simplify locking Emanuele Giuseppe Esposito
@ 2021-07-07 16:58 ` Emanuele Giuseppe Esposito
  2021-07-08 11:02   ` Stefan Hajnoczi
  2021-07-07 16:58 ` [RFC PATCH 5/6] job: use global job_mutex to protect struct Job Emanuele Giuseppe Esposito
                   ` (3 subsequent siblings)
  7 siblings, 1 reply; 33+ messages in thread
From: Emanuele Giuseppe Esposito @ 2021-07-07 16:58 UTC (permalink / raw)
  To: qemu-block
  Cc: Kevin Wolf, Emanuele Giuseppe Esposito,
	Vladimir Sementsov-Ogievskiy, qemu-devel, Wen Congyang,
	Xie Changlong, Markus Armbruster, Max Reitz, Stefan Hajnoczi,
	Paolo Bonzini, John Snow

This makes it easier to understand what needs to be protected
by a lock and what doesn't.

Signed-off-by: Emanuele Giuseppe Esposito <eesposit@redhat.com>
---
 include/qemu/job.h | 101 ++++++++++++++++++++++++++++++++++++---------
 1 file changed, 82 insertions(+), 19 deletions(-)

diff --git a/include/qemu/job.h b/include/qemu/job.h
index ba2f9b2660..4421d08d93 100644
--- a/include/qemu/job.h
+++ b/include/qemu/job.h
@@ -40,24 +40,40 @@ typedef struct JobTxn JobTxn;
  * Long-running operation.
  */
 typedef struct Job {
-    /** The ID of the job. May be NULL for internal jobs. */
+    /**
+     * The ID of the job. May be NULL for internal jobs.
+     * Set it in job_create and just read.
+     */
     char *id;
 
-    /** The type of this job. */
+    /**
+     * The type of this job.
+     * Set it in job_create and just read.
+     */
     const JobDriver *driver;
 
-    /** Reference count of the block job */
+    /**
+     * Reference count of the block job.
+     * Protected by job_mutex.
+     */
     int refcnt;
 
-    /** Current state; See @JobStatus for details. */
+    /**
+     * Current state; See @JobStatus for details.
+     * Protected by job_mutex.
+     */
     JobStatus status;
 
-    /** AioContext to run the job coroutine in */
+    /**
+     * AioContext to run the job coroutine in.
+     * Atomic.
+     */
     AioContext *aio_context;
 
     /**
      * The coroutine that executes the job.  If not NULL, it is reentered when
      * busy is false and the job is cancelled.
+     * Set it in job_create and just read.
      */
     Coroutine *co;
 
@@ -70,13 +86,15 @@ typedef struct Job {
     /**
      * Counter for pause request. If non-zero, the block job is either paused,
      * or if busy == true will pause itself as soon as possible.
+     * Protected by job_mutex.
      */
     int pause_count;
 
     /**
      * Set to false by the job while the coroutine has yielded and may be
      * re-entered by job_enter(). There may still be I/O or event loop activity
-     * pending. Accessed under block_job_mutex (in blockjob.c).
+     * pending.
+     * Protected by job_mutex.
      *
      * When the job is deferred to the main loop, busy is true as long as the
      * bottom half is still pending.
@@ -86,12 +104,14 @@ typedef struct Job {
     /**
      * Set to true by the job while it is in a quiescent state, where
      * no I/O or event loop activity is pending.
+     * Protected by job_mutex.
      */
     bool paused;
 
     /**
      * Set to true if the job is paused by user.  Can be unpaused with the
      * block-job-resume QMP command.
+     * Protected by job_mutex.
      */
     bool user_paused;
 
@@ -100,22 +120,33 @@ typedef struct Job {
      * always be tested just before toggling the busy flag from false
      * to true.  After a job has been cancelled, it should only yield
      * if #aio_poll will ("sooner or later") reenter the coroutine.
+     * Protected by job_mutex.
      */
     bool cancelled;
 
     /**
      * Set to true if the job should abort immediately without waiting
      * for data to be in sync.
+     * Protected by job_mutex.
      */
     bool force_cancel;
 
-    /** Set to true when the job has deferred work to the main loop. */
+    /**
+     * Set to true when the job has deferred work to the main loop.
+     * Protected by job_mutex.
+     */
     bool deferred_to_main_loop;
 
-    /** True if this job should automatically finalize itself */
+    /**
+     * True if this job should automatically finalize itself.
+     * Set it in job_create and just read.
+     */
     bool auto_finalize;
 
-    /** True if this job should automatically dismiss itself */
+    /**
+     * True if this job should automatically dismiss itself.
+     * Set it in job_create and just read.
+     */
     bool auto_dismiss;
 
     ProgressMeter progress;
@@ -124,6 +155,7 @@ typedef struct Job {
      * Return code from @run and/or @prepare callback(s).
      * Not final until the job has reached the CONCLUDED status.
      * 0 on success, -errno on failure.
+     * Protected by job_mutex.
      */
     int ret;
 
@@ -131,37 +163,68 @@ typedef struct Job {
      * Error object for a failed job.
      * If job->ret is nonzero and an error object was not set, it will be set
      * to strerror(-job->ret) during job_completed.
+     * Protected by job_mutex.
      */
     Error *err;
 
-    /** The completion function that will be called when the job completes.  */
+    /**
+     * The completion function that will be called when the job completes.
+     * Set it in job_create and just read.
+     */
     BlockCompletionFunc *cb;
 
-    /** The opaque value that is passed to the completion function.  */
+    /**
+     * The opaque value that is passed to the completion function.
+     * Set it in job_create and just read.
+     */
     void *opaque;
 
-    /** Notifiers called when a cancelled job is finalised */
+    /**
+     * Notifiers called when a cancelled job is finalised.
+     * Protected by job_mutex.
+     */
     NotifierList on_finalize_cancelled;
 
-    /** Notifiers called when a successfully completed job is finalised */
+    /**
+     * Notifiers called when a successfully completed job is finalised.
+     * Protected by job_mutex.
+     */
     NotifierList on_finalize_completed;
 
-    /** Notifiers called when the job transitions to PENDING */
+    /**
+     * Notifiers called when the job transitions to PENDING.
+     * Protected by job_mutex.
+     */
     NotifierList on_pending;
 
-    /** Notifiers called when the job transitions to READY */
+    /**
+     * Notifiers called when the job transitions to READY.
+     * Protected by job_mutex.
+     */
     NotifierList on_ready;
 
-    /** Notifiers called when the job coroutine yields or terminates */
+    /**
+     * Notifiers called when the job coroutine yields or terminates.
+     * Protected by job_mutex.
+     */
     NotifierList on_idle;
 
-    /** Element of the list of jobs */
+    /**
+     * Element of the list of jobs.
+     * Protected by job_mutex.
+     */
     QLIST_ENTRY(Job) job_list;
 
-    /** Transaction this job is part of */
+    /**
+     * Transaction this job is part of.
+     * Protected by job_mutex.
+     */
     JobTxn *txn;
 
-    /** Element of the list of jobs in a job transaction */
+    /**
+     * Element of the list of jobs in a job transaction.
+     * Protected by job_mutex.
+     */
     QLIST_ENTRY(Job) txn_list;
 } Job;
 
-- 
2.31.1



^ permalink raw reply related	[flat|nested] 33+ messages in thread

* [RFC PATCH 5/6] job: use global job_mutex to protect struct Job
  2021-07-07 16:58 [RFC PATCH 0/6] job: replace AioContext lock with job_mutex Emanuele Giuseppe Esposito
                   ` (3 preceding siblings ...)
  2021-07-07 16:58 ` [RFC PATCH 4/6] job.h: categorize job fields Emanuele Giuseppe Esposito
@ 2021-07-07 16:58 ` Emanuele Giuseppe Esposito
  2021-07-08 12:56   ` Stefan Hajnoczi
  2021-07-07 16:58 ` [RFC PATCH 6/6] jobs: remove unnecessary AioContext aquire/release pairs Emanuele Giuseppe Esposito
                   ` (2 subsequent siblings)
  7 siblings, 1 reply; 33+ messages in thread
From: Emanuele Giuseppe Esposito @ 2021-07-07 16:58 UTC (permalink / raw)
  To: qemu-block
  Cc: Kevin Wolf, Emanuele Giuseppe Esposito,
	Vladimir Sementsov-Ogievskiy, qemu-devel, Wen Congyang,
	Xie Changlong, Markus Armbruster, Max Reitz, Stefan Hajnoczi,
	Paolo Bonzini, John Snow

This lock is going to replace most of the AioContext locks
in the job and blockjob, so that a Job can run in an arbitrary
AioContext.

Signed-off-by: Emanuele Giuseppe Esposito <eesposit@redhat.com>
---
 include/block/blockjob_int.h |   1 +
 include/qemu/job.h           |   2 +
 block/backup.c               |   4 +
 block/mirror.c               |  11 +-
 blockdev.c                   |  62 ++++----
 blockjob.c                   |  67 +++++++--
 job-qmp.c                    |  55 +++----
 job.c                        | 284 +++++++++++++++++++++++++++--------
 qemu-img.c                   |  15 +-
 9 files changed, 350 insertions(+), 151 deletions(-)

diff --git a/include/block/blockjob_int.h b/include/block/blockjob_int.h
index 6633d83da2..8b91126506 100644
--- a/include/block/blockjob_int.h
+++ b/include/block/blockjob_int.h
@@ -53,6 +53,7 @@ struct BlockJobDriver {
      */
     void (*attached_aio_context)(BlockJob *job, AioContext *new_context);
 
+    /* Called with job mutex *not* held. */
     void (*set_speed)(BlockJob *job, int64_t speed);
 };
 
diff --git a/include/qemu/job.h b/include/qemu/job.h
index 4421d08d93..359f4e6b3a 100644
--- a/include/qemu/job.h
+++ b/include/qemu/job.h
@@ -49,6 +49,8 @@ typedef struct Job {
     /**
      * The type of this job.
      * Set it in job_create and just read.
+     * All calls to the driver function must be not locked by job_mutex,
+     * to avoid deadlocks.
      */
     const JobDriver *driver;
 
diff --git a/block/backup.c b/block/backup.c
index bd3614ce70..80ce956299 100644
--- a/block/backup.c
+++ b/block/backup.c
@@ -315,6 +315,10 @@ static void coroutine_fn backup_pause(Job *job)
     }
 }
 
+/*
+ * Called with job mutex *not* held (we don't want to call block_copy_kick
+ * with the lock held!)
+ */
 static void coroutine_fn backup_set_speed(BlockJob *job, int64_t speed)
 {
     BackupBlockJob *s = container_of(job, BackupBlockJob, common);
diff --git a/block/mirror.c b/block/mirror.c
index 49aaaafffa..deefaa6a39 100644
--- a/block/mirror.c
+++ b/block/mirror.c
@@ -1150,9 +1150,11 @@ static void mirror_complete(Job *job, Error **errp)
     s->should_complete = true;
 
     /* If the job is paused, it will be re-entered when it is resumed */
+    job_lock();
     if (!job_is_paused(job)) {
-        job_enter(job);
+        job_enter_locked(job);
     }
+    job_unlock();
 }
 
 static void coroutine_fn mirror_pause(Job *job)
@@ -1171,10 +1173,13 @@ static bool mirror_drained_poll(BlockJob *job)
      * from one of our own drain sections, to avoid a deadlock waiting for
      * ourselves.
      */
-    if (!job_is_paused(&s->common.job) && !job_is_cancelled(&s->common.job) &&
-        !s->in_drain) {
+    job_lock();
+    if (!job_is_paused(&s->common.job) &&
+        !job_is_cancelled_locked(&s->common.job) && !s->in_drain) {
+        job_unlock();
         return true;
     }
+    job_unlock();
 
     return !!s->in_flight;
 }
diff --git a/blockdev.c b/blockdev.c
index 8e2c15370e..9255aea6a2 100644
--- a/blockdev.c
+++ b/blockdev.c
@@ -150,9 +150,11 @@ void blockdev_mark_auto_del(BlockBackend *blk)
             AioContext *aio_context = job_get_aiocontext(&job->job);
             aio_context_acquire(aio_context);
 
+            job_lock();
             job_cancel(&job->job, false);
 
             aio_context_release(aio_context);
+            job_unlock();
         }
     }
 
@@ -3309,48 +3311,44 @@ out:
     aio_context_release(aio_context);
 }
 
-/* Get a block job using its ID and acquire its AioContext */
-static BlockJob *find_block_job(const char *id, AioContext **aio_context,
-                                Error **errp)
+/* Get a block job using its ID and acquire its job_lock */
+static BlockJob *find_block_job(const char *id, Error **errp)
 {
     BlockJob *job;
 
     assert(id != NULL);
 
-    *aio_context = NULL;
-
+    job_lock();
     job = block_job_get(id);
 
     if (!job) {
         error_set(errp, ERROR_CLASS_DEVICE_NOT_ACTIVE,
                   "Block job '%s' not found", id);
+        job_unlock();
         return NULL;
     }
 
-    *aio_context = blk_get_aio_context(job->blk);
-    aio_context_acquire(*aio_context);
-
     return job;
 }
 
+/* Called with job_mutex *not* held. */
 void qmp_block_job_set_speed(const char *device, int64_t speed, Error **errp)
 {
-    AioContext *aio_context;
-    BlockJob *job = find_block_job(device, &aio_context, errp);
+    BlockJob *job = find_block_job(device, errp);
 
     if (!job) {
         return;
     }
 
     block_job_set_speed(job, speed, errp);
-    aio_context_release(aio_context);
+    job_unlock();
 }
 
+/* Called with job_mutex *not* held. */
 void qmp_block_job_cancel(const char *device,
                           bool has_force, bool force, Error **errp)
 {
-    AioContext *aio_context;
-    BlockJob *job = find_block_job(device, &aio_context, errp);
+    BlockJob *job = find_block_job(device, errp);
 
     if (!job) {
         return;
@@ -3369,13 +3367,13 @@ void qmp_block_job_cancel(const char *device,
     trace_qmp_block_job_cancel(job);
     job_user_cancel(&job->job, force, errp);
 out:
-    aio_context_release(aio_context);
+    job_unlock();
 }
 
+/* Called with job_mutex *not* held. */
 void qmp_block_job_pause(const char *device, Error **errp)
 {
-    AioContext *aio_context;
-    BlockJob *job = find_block_job(device, &aio_context, errp);
+    BlockJob *job = find_block_job(device, errp);
 
     if (!job) {
         return;
@@ -3383,13 +3381,13 @@ void qmp_block_job_pause(const char *device, Error **errp)
 
     trace_qmp_block_job_pause(job);
     job_user_pause(&job->job, errp);
-    aio_context_release(aio_context);
+    job_unlock();
 }
 
+/* Called with job_mutex *not* held. */
 void qmp_block_job_resume(const char *device, Error **errp)
 {
-    AioContext *aio_context;
-    BlockJob *job = find_block_job(device, &aio_context, errp);
+    BlockJob *job = find_block_job(device, errp);
 
     if (!job) {
         return;
@@ -3397,13 +3395,13 @@ void qmp_block_job_resume(const char *device, Error **errp)
 
     trace_qmp_block_job_resume(job);
     job_user_resume(&job->job, errp);
-    aio_context_release(aio_context);
+    job_unlock();
 }
 
+/* Called with job_mutex *not* held. */
 void qmp_block_job_complete(const char *device, Error **errp)
 {
-    AioContext *aio_context;
-    BlockJob *job = find_block_job(device, &aio_context, errp);
+    BlockJob *job = find_block_job(device, errp);
 
     if (!job) {
         return;
@@ -3411,13 +3409,13 @@ void qmp_block_job_complete(const char *device, Error **errp)
 
     trace_qmp_block_job_complete(job);
     job_complete(&job->job, errp);
-    aio_context_release(aio_context);
+    job_unlock();
 }
 
+/* Called with job_mutex *not* held. */
 void qmp_block_job_finalize(const char *id, Error **errp)
 {
-    AioContext *aio_context;
-    BlockJob *job = find_block_job(id, &aio_context, errp);
+    BlockJob *job = find_block_job(id, errp);
 
     if (!job) {
         return;
@@ -3427,20 +3425,14 @@ void qmp_block_job_finalize(const char *id, Error **errp)
     job_ref(&job->job);
     job_finalize(&job->job, errp);
 
-    /*
-     * Job's context might have changed via job_finalize (and job_txn_apply
-     * automatically acquires the new one), so make sure we release the correct
-     * one.
-     */
-    aio_context = blk_get_aio_context(job->blk);
     job_unref(&job->job);
-    aio_context_release(aio_context);
+    job_unlock();
 }
 
+/* Called with job_mutex *not* held. */
 void qmp_block_job_dismiss(const char *id, Error **errp)
 {
-    AioContext *aio_context;
-    BlockJob *bjob = find_block_job(id, &aio_context, errp);
+    BlockJob *bjob = find_block_job(id, errp);
     Job *job;
 
     if (!bjob) {
@@ -3450,7 +3442,7 @@ void qmp_block_job_dismiss(const char *id, Error **errp)
     trace_qmp_block_job_dismiss(bjob);
     job = &bjob->job;
     job_dismiss(&job, errp);
-    aio_context_release(aio_context);
+    job_unlock();
 }
 
 void qmp_change_backing_file(const char *device,
diff --git a/blockjob.c b/blockjob.c
index 7f49f03ec7..e7b289089b 100644
--- a/blockjob.c
+++ b/blockjob.c
@@ -42,15 +42,16 @@
  * The first includes functions used by the monitor.  The monitor is
  * peculiar in that it accesses the block job list with block_job_get, and
  * therefore needs consistency across block_job_get and the actual operation
- * (e.g. block_job_set_speed).  The consistency is achieved with
- * aio_context_acquire/release.  These functions are declared in blockjob.h.
+ * (e.g. block_job_set_speed).  To achieve this consistency, the caller
+ * calls block_job_lock/block_job_unlock itself around the whole operation.
+ * These functions are declared in blockjob.h.
  *
  * The second includes functions used by the block job drivers and sometimes
- * by the core block layer.  These do not care about locking, because the
- * whole coroutine runs under the AioContext lock, and are declared in
- * blockjob_int.h.
+ * by the core block layer. These delegate the locking to the callee instead,
+ * and are declared in blockjob_int.h.
  */
 
+/* Does not need job_mutex. Value is never modified */
 static bool is_block_job(Job *job)
 {
     return job_type(job) == JOB_TYPE_BACKUP ||
@@ -59,6 +60,7 @@ static bool is_block_job(Job *job)
            job_type(job) == JOB_TYPE_STREAM;
 }
 
+/* Called with job_mutex *not* held. */
 BlockJob *block_job_next(BlockJob *bjob)
 {
     Job *job = bjob ? &bjob->job : NULL;
@@ -70,6 +72,7 @@ BlockJob *block_job_next(BlockJob *bjob)
     return job ? container_of(job, BlockJob, job) : NULL;
 }
 
+/* Called with job_mutex held. */
 BlockJob *block_job_get(const char *id)
 {
     Job *job = job_get(id);
@@ -97,24 +100,31 @@ static char *child_job_get_parent_desc(BdrvChild *c)
     return g_strdup_printf("%s job '%s'", job_type_str(&job->job), job->job.id);
 }
 
+/* Called with job_mutex *not* held. */
 static void child_job_drained_begin(BdrvChild *c)
 {
     BlockJob *job = c->opaque;
+    job_lock();
     job_pause(&job->job);
+    job_unlock();
 }
 
+/* Called with job_mutex *not* held. */
 static bool child_job_drained_poll(BdrvChild *c)
 {
     BlockJob *bjob = c->opaque;
     Job *job = &bjob->job;
     const BlockJobDriver *drv = block_job_driver(bjob);
 
+    job_lock();
     /* An inactive or completed job doesn't have any pending requests. Jobs
      * with !job->busy are either already paused or have a pause point after
      * being reentered, so no job driver code will run before they pause. */
-    if (!job_is_busy(job) || job_is_completed(job)) {
+    if (!job_is_busy(job) || job_is_completed_locked(job)) {
+        job_unlock();
         return false;
     }
+    job_unlock();
 
     /* Otherwise, assume that it isn't fully stopped yet, but allow the job to
      * override this assumption. */
@@ -125,10 +135,13 @@ static bool child_job_drained_poll(BdrvChild *c)
     }
 }
 
+/* Called with job_mutex *not* held. */
 static void child_job_drained_end(BdrvChild *c, int *drained_end_counter)
 {
     BlockJob *job = c->opaque;
+    job_lock();
     job_resume(&job->job);
+    job_unlock();
 }
 
 static bool child_job_can_set_aio_ctx(BdrvChild *c, AioContext *ctx,
@@ -246,11 +259,15 @@ int block_job_add_bdrv(BlockJob *job, const char *name, BlockDriverState *bs,
     return 0;
 }
 
+/* Called with job_mutex held. Temporarly releases the lock. */
 static void block_job_on_idle(Notifier *n, void *opaque)
 {
+    job_unlock();
     aio_wait_kick();
+    job_lock();
 }
 
+/* Does not need job_mutex. Value is never modified */
 bool block_job_is_internal(BlockJob *job)
 {
     return (job->job.id == NULL);
@@ -267,6 +284,7 @@ static bool job_timer_pending(Job *job)
     return timer_pending(&job->sleep_timer);
 }
 
+/* Called with job_mutex held. May temporarly release the lock. */
 bool block_job_set_speed(BlockJob *job, int64_t speed, Error **errp)
 {
     const BlockJobDriver *drv = block_job_driver(job);
@@ -286,7 +304,9 @@ bool block_job_set_speed(BlockJob *job, int64_t speed, Error **errp)
     job->speed = speed;
 
     if (drv->set_speed) {
+        job_unlock();
         drv->set_speed(job, speed);
+        job_lock();
     }
 
     if (speed && speed <= old_speed) {
@@ -304,6 +324,7 @@ int64_t block_job_ratelimit_get_delay(BlockJob *job, uint64_t n)
     return ratelimit_calculate_delay(&job->limit, n);
 }
 
+/* Called with block_job_mutex *not* held. */
 BlockJobInfo *block_job_query(BlockJob *blkjob, Error **errp)
 {
     BlockJobInfo *info;
@@ -319,6 +340,7 @@ BlockJobInfo *block_job_query(BlockJob *blkjob, Error **errp)
     progress_get_snapshot(&job->progress, &progress_current,
                           &progress_total);
 
+    job_lock();
     info = g_new0(BlockJobInfo, 1);
     info->type      = g_strdup(job_type_str(job));
     info->device    = g_strdup(job->id);
@@ -328,11 +350,11 @@ BlockJobInfo *block_job_query(BlockJob *blkjob, Error **errp)
     info->len       = progress_total;
     info->speed     = blkjob->speed;
     info->io_status = blkjob->iostatus;
-    info->ready     = job_is_ready(job);
+    info->ready     = job_is_ready_locked(job);
     info->status    = job_get_status(job);
     info->auto_finalize = job->auto_finalize;
     info->auto_dismiss = job->auto_dismiss;
-    job_ret = job_get_ret(job);
+    job_ret = job_get_ret_locked(job);
     if (job_ret) {
         Error *job_err = job_get_err(job);
         info->has_error = true;
@@ -340,9 +362,11 @@ BlockJobInfo *block_job_query(BlockJob *blkjob, Error **errp)
                         g_strdup(error_get_pretty(job_err)) :
                         g_strdup(strerror(-job_ret));
     }
+    job_unlock();
     return info;
 }
 
+/* Called with job_mutex held. */
 static void block_job_iostatus_set_err(BlockJob *job, int error)
 {
     if (job->iostatus == BLOCK_DEVICE_IO_STATUS_OK) {
@@ -351,6 +375,7 @@ static void block_job_iostatus_set_err(BlockJob *job, int error)
     }
 }
 
+/* Called with job_mutex held. */
 static void block_job_event_cancelled(Notifier *n, void *opaque)
 {
     BlockJob *job = opaque;
@@ -370,6 +395,7 @@ static void block_job_event_cancelled(Notifier *n, void *opaque)
                                         job->speed);
 }
 
+/* Called with job_mutex held. */
 static void block_job_event_completed(Notifier *n, void *opaque)
 {
     BlockJob *blkjob = opaque;
@@ -381,7 +407,7 @@ static void block_job_event_completed(Notifier *n, void *opaque)
         return;
     }
 
-    if (job_get_ret(job) < 0) {
+    if (job_get_ret_locked(job) < 0) {
         msg = error_get_pretty(job_get_err(job));
     }
 
@@ -397,6 +423,7 @@ static void block_job_event_completed(Notifier *n, void *opaque)
                                         msg);
 }
 
+/* Called with job_mutex held. */
 static void block_job_event_pending(Notifier *n, void *opaque)
 {
     BlockJob *job = opaque;
@@ -409,6 +436,7 @@ static void block_job_event_pending(Notifier *n, void *opaque)
                                       job->job.id);
 }
 
+/* Called with job_mutex held. */
 static void block_job_event_ready(Notifier *n, void *opaque)
 {
     BlockJob *job = opaque;
@@ -430,10 +458,11 @@ static void block_job_event_ready(Notifier *n, void *opaque)
 
 
 /*
- * API for block job drivers and the block layer.  These functions are
- * declared in blockjob_int.h.
+ * API for block job drivers and the block layer, who do not know about
+ * job_mutex.  These functions are declared in blockjob_int.h.
  */
 
+/* Called with block_job_mutex *not* held, but temporarly releases it. */
 void *block_job_create(const char *job_id, const BlockJobDriver *driver,
                        JobTxn *txn, BlockDriverState *bs, uint64_t perm,
                        uint64_t shared_perm, int64_t speed, int flags,
@@ -472,6 +501,8 @@ void *block_job_create(const char *job_id, const BlockJobDriver *driver,
     job->ready_notifier.notify = block_job_event_ready;
     job->idle_notifier.notify = block_job_on_idle;
 
+    job_lock();
+
     notifier_list_add(&job->job.on_finalize_cancelled,
                       &job->finalize_cancelled_notifier);
     notifier_list_add(&job->job.on_finalize_completed,
@@ -482,7 +513,11 @@ void *block_job_create(const char *job_id, const BlockJobDriver *driver,
 
     error_setg(&job->blocker, "block device is in use by block job: %s",
                job_type_str(&job->job));
+
+    job_unlock();
+    /* calls drain and friends, that already take the lock */
     block_job_add_bdrv(job, "main node", bs, 0, BLK_PERM_ALL, &error_abort);
+    job_lock();
 
     bdrv_op_unblock(bs, BLOCK_OP_TYPE_DATAPLANE, job->blocker);
 
@@ -493,27 +528,35 @@ void *block_job_create(const char *job_id, const BlockJobDriver *driver,
 
     if (!block_job_set_speed(job, speed, errp)) {
         job_early_fail(&job->job);
+        job_unlock();
         return NULL;
     }
 
+    job_unlock();
     return job;
 }
 
+/* Called with job_mutex *not* held. */
 void block_job_iostatus_reset(BlockJob *job)
 {
+    job_lock();
     if (job->iostatus == BLOCK_DEVICE_IO_STATUS_OK) {
+        job_unlock();
         return;
     }
     assert(job_user_paused(&job->job) && job_should_pause(&job->job));
     job->iostatus = BLOCK_DEVICE_IO_STATUS_OK;
+    job_unlock();
 }
 
+/* Called with job_mutex *not* held. */
 void block_job_user_resume(Job *job)
 {
     BlockJob *bjob = container_of(job, BlockJob, job);
     block_job_iostatus_reset(bjob);
 }
 
+/* Called with job_mutex *not* held. */
 BlockErrorAction block_job_error_action(BlockJob *job, BlockdevOnError on_err,
                                         int is_read, int error)
 {
@@ -544,12 +587,14 @@ BlockErrorAction block_job_error_action(BlockJob *job, BlockdevOnError on_err,
                                         action);
     }
     if (action == BLOCK_ERROR_ACTION_STOP) {
+        job_lock();
         if (!job_user_paused(&job->job)) {
             job_pause(&job->job);
             /* make the pause user visible, which will be resumed from QMP. */
             job_set_user_paused(&job->job);
         }
         block_job_iostatus_set_err(job, error);
+        job_unlock();
     }
     return action;
 }
diff --git a/job-qmp.c b/job-qmp.c
index 12238a1643..03f3946490 100644
--- a/job-qmp.c
+++ b/job-qmp.c
@@ -29,29 +29,26 @@
 #include "qapi/error.h"
 #include "trace/trace-root.h"
 
-/* Get a job using its ID and acquire its AioContext */
-static Job *find_job(const char *id, AioContext **aio_context, Error **errp)
+/* Get a job using its ID and acquire its job_lock */
+static Job *find_job(const char *id, Error **errp)
 {
     Job *job;
 
-    *aio_context = NULL;
+    job_lock();
 
     job = job_get(id);
     if (!job) {
         error_setg(errp, "Job not found");
+        job_unlock();
         return NULL;
     }
 
-    *aio_context = job_get_aiocontext(job);
-    aio_context_acquire(*aio_context);
-
     return job;
 }
 
 void qmp_job_cancel(const char *id, Error **errp)
 {
-    AioContext *aio_context;
-    Job *job = find_job(id, &aio_context, errp);
+    Job *job = find_job(id, errp);
 
     if (!job) {
         return;
@@ -59,13 +56,12 @@ void qmp_job_cancel(const char *id, Error **errp)
 
     trace_qmp_job_cancel(job);
     job_user_cancel(job, true, errp);
-    aio_context_release(aio_context);
+    job_unlock();
 }
 
 void qmp_job_pause(const char *id, Error **errp)
 {
-    AioContext *aio_context;
-    Job *job = find_job(id, &aio_context, errp);
+    Job *job = find_job(id, errp);
 
     if (!job) {
         return;
@@ -73,13 +69,12 @@ void qmp_job_pause(const char *id, Error **errp)
 
     trace_qmp_job_pause(job);
     job_user_pause(job, errp);
-    aio_context_release(aio_context);
+    job_unlock();
 }
 
 void qmp_job_resume(const char *id, Error **errp)
 {
-    AioContext *aio_context;
-    Job *job = find_job(id, &aio_context, errp);
+    Job *job = find_job(id, errp);
 
     if (!job) {
         return;
@@ -87,13 +82,12 @@ void qmp_job_resume(const char *id, Error **errp)
 
     trace_qmp_job_resume(job);
     job_user_resume(job, errp);
-    aio_context_release(aio_context);
+    job_unlock();
 }
 
 void qmp_job_complete(const char *id, Error **errp)
 {
-    AioContext *aio_context;
-    Job *job = find_job(id, &aio_context, errp);
+    Job *job = find_job(id, errp);
 
     if (!job) {
         return;
@@ -101,13 +95,12 @@ void qmp_job_complete(const char *id, Error **errp)
 
     trace_qmp_job_complete(job);
     job_complete(job, errp);
-    aio_context_release(aio_context);
+    job_unlock();
 }
 
 void qmp_job_finalize(const char *id, Error **errp)
 {
-    AioContext *aio_context;
-    Job *job = find_job(id, &aio_context, errp);
+    Job *job = find_job(id, errp);
 
     if (!job) {
         return;
@@ -117,20 +110,13 @@ void qmp_job_finalize(const char *id, Error **errp)
     job_ref(job);
     job_finalize(job, errp);
 
-    /*
-     * Job's context might have changed via job_finalize (and job_txn_apply
-     * automatically acquires the new one), so make sure we release the correct
-     * one.
-     */
-    aio_context = job_get_aiocontext(job);
     job_unref(job);
-    aio_context_release(aio_context);
+    job_unlock();
 }
 
 void qmp_job_dismiss(const char *id, Error **errp)
 {
-    AioContext *aio_context;
-    Job *job = find_job(id, &aio_context, errp);
+    Job *job = find_job(id, errp);
 
     if (!job) {
         return;
@@ -138,9 +124,10 @@ void qmp_job_dismiss(const char *id, Error **errp)
 
     trace_qmp_job_dismiss(job);
     job_dismiss(&job, errp);
-    aio_context_release(aio_context);
+    job_unlock();
 }
 
+/* Called with job_mutex held. */
 static JobInfo *job_query_single(Job *job, Error **errp)
 {
     JobInfo *info;
@@ -175,15 +162,15 @@ JobInfoList *qmp_query_jobs(Error **errp)
 
     for (job = job_next(NULL); job; job = job_next(job)) {
         JobInfo *value;
-        AioContext *aio_context;
 
         if (job_is_internal(job)) {
             continue;
         }
-        aio_context = job_get_aiocontext(job);
-        aio_context_acquire(aio_context);
+
+        job_lock();
         value = job_query_single(job, errp);
-        aio_context_release(aio_context);
+        job_unlock();
+
         if (!value) {
             qapi_free_JobInfoList(head);
             return NULL;
diff --git a/job.c b/job.c
index 48b304c3ff..e2006532b5 100644
--- a/job.c
+++ b/job.c
@@ -93,19 +93,22 @@ static void __attribute__((__constructor__)) job_init(void)
     qemu_mutex_init(&job_mutex);
 }
 
+/* Does not need job_mutex */
 AioContext *job_get_aiocontext(Job *job)
 {
-    return job->aio_context;
+    return qatomic_read(&job->aio_context);
 }
 
+/* Does not need job_mutex */
 void job_set_aiocontext(Job *job, AioContext *aio)
 {
-    job->aio_context = aio;
+    qatomic_set(&job->aio_context, aio);
 }
 
+/* Called with job_mutex held. */
 bool job_is_busy(Job *job)
 {
-    return qatomic_read(&job->busy);
+    return job->busy;
 }
 
 /* Called with job_mutex held. */
@@ -124,59 +127,75 @@ int job_get_ret(Job *job)
     return ret;
 }
 
+/* Called with job_mutex held. */
 Error *job_get_err(Job *job)
 {
     return job->err;
 }
 
+/* Called with job_mutex held. */
 JobStatus job_get_status(Job *job)
 {
     return job->status;
 }
-
+/* Called with job_mutex *not* held. */
 void job_set_cancelled(Job *job, bool cancel)
 {
+    job_lock();
     job->cancelled = cancel;
+    job_unlock();
 }
 
+/* Called with job_mutex *not* held. */
 bool job_is_force_cancel(Job *job)
 {
-    return job->force_cancel;
+    bool ret;
+    job_lock();
+    ret = job->force_cancel;
+    job_unlock();
+    return ret;
 }
 
+/* Does not need job_mutex */
 JobTxn *job_txn_new(void)
 {
     JobTxn *txn = g_new0(JobTxn, 1);
     QLIST_INIT(&txn->jobs);
-    txn->refcnt = 1;
+    qatomic_set(&txn->refcnt, 1);
     return txn;
 }
 
+/* Does not need job_mutex */
 static void job_txn_ref(JobTxn *txn)
 {
-    txn->refcnt++;
+    qatomic_inc(&txn->refcnt);
 }
 
+/* Does not need job_mutex */
 void job_txn_unref(JobTxn *txn)
 {
-    if (txn && --txn->refcnt == 0) {
+    if (txn && qatomic_dec_fetch(&txn->refcnt) == 0) {
         g_free(txn);
     }
 }
 
+/* Called with job_mutex *not* held. */
 void job_txn_add_job(JobTxn *txn, Job *job)
 {
     if (!txn) {
         return;
     }
 
+    job_lock();
     assert(!job->txn);
     job->txn = txn;
 
     QLIST_INSERT_HEAD(&txn->jobs, job, txn_list);
+    job_unlock();
     job_txn_ref(txn);
 }
 
+/* Called with job_mutex held. */
 static void job_txn_del_job(Job *job)
 {
     if (job->txn) {
@@ -186,6 +205,7 @@ static void job_txn_del_job(Job *job)
     }
 }
 
+/* Called with job_mutex held. */
 static int job_txn_apply(Job *job, int fn(Job *))
 {
     AioContext *inner_ctx;
@@ -221,11 +241,13 @@ static int job_txn_apply(Job *job, int fn(Job *))
     return rc;
 }
 
+/* Does not need job_mutex */
 bool job_is_internal(Job *job)
 {
     return (job->id == NULL);
 }
 
+/* Called with job_mutex held. */
 static void job_state_transition(Job *job, JobStatus s1)
 {
     JobStatus s0 = job->status;
@@ -241,6 +263,7 @@ static void job_state_transition(Job *job, JobStatus s1)
     }
 }
 
+/* Called with job_mutex held. */
 int job_apply_verb(Job *job, JobVerb verb, Error **errp)
 {
     JobStatus s0 = job->status;
@@ -255,11 +278,13 @@ int job_apply_verb(Job *job, JobVerb verb, Error **errp)
     return -EPERM;
 }
 
+/* Does not need job_mutex. Value is never modified */
 JobType job_type(const Job *job)
 {
     return job->driver->job_type;
 }
 
+/* Does not need job_mutex. Value is never modified */
 const char *job_type_str(const Job *job)
 {
     return JobType_str(job_type(job));
@@ -353,24 +378,34 @@ static bool job_started(Job *job)
     return job->co;
 }
 
+/* Called with job_mutex held. */
 bool job_should_pause(Job *job)
 {
     return job->pause_count > 0;
 }
 
+/* Called with job_mutex held. */
 bool job_is_paused(Job *job)
 {
     return job->paused;
 }
 
+/* Called with job_mutex *not* held. */
 Job *job_next(Job *job)
 {
+    Job *ret;
+    job_lock();
     if (!job) {
-        return QLIST_FIRST(&jobs);
+        ret = QLIST_FIRST(&jobs);
+        job_unlock();
+        return ret;
     }
-    return QLIST_NEXT(job, job_list);
+    ret = QLIST_NEXT(job, job_list);
+    job_unlock();
+    return ret;
 }
 
+/* Called with job_mutex held. */
 Job *job_get(const char *id)
 {
     Job *job;
@@ -388,13 +423,14 @@ Job *job_get(const char *id)
     return NULL;
 }
 
+/* Called with job_mutex *not* held. */
 static void job_sleep_timer_cb(void *opaque)
 {
     Job *job = opaque;
-
     job_enter(job);
 }
 
+/* Called with job_mutex *not* held. */
 void *job_create(const char *job_id, const JobDriver *driver, JobTxn *txn,
                  AioContext *ctx, int flags, BlockCompletionFunc *cb,
                  void *opaque, Error **errp)
@@ -449,6 +485,7 @@ void *job_create(const char *job_id, const JobDriver *driver, JobTxn *txn,
                    job_sleep_timer_cb, job);
 
     QLIST_INSERT_HEAD(&jobs, job, job_list);
+    job_unlock();
 
     /* Single jobs are modeled as single-job transactions for sake of
      * consolidating the job management logic */
@@ -463,11 +500,13 @@ void *job_create(const char *job_id, const JobDriver *driver, JobTxn *txn,
     return job;
 }
 
+/* Called with job_mutex held. */
 void job_ref(Job *job)
 {
     ++job->refcnt;
 }
 
+/* Called with job_mutex held. Temporarly releases the lock. */
 void job_unref(Job *job)
 {
     if (--job->refcnt == 0) {
@@ -476,7 +515,9 @@ void job_unref(Job *job)
         assert(!job->txn);
 
         if (job->driver->free) {
+            job_unlock();
             job->driver->free(job);
+            job_lock();
         }
 
         QLIST_REMOVE(job, job_list);
@@ -488,46 +529,55 @@ void job_unref(Job *job)
     }
 }
 
+/* API is thread safe */
 void job_progress_update(Job *job, uint64_t done)
 {
     progress_work_done(&job->progress, done);
 }
 
+/* API is thread safe */
 void job_progress_set_remaining(Job *job, uint64_t remaining)
 {
     progress_set_remaining(&job->progress, remaining);
 }
 
+/* API is thread safe */
 void job_progress_increase_remaining(Job *job, uint64_t delta)
 {
     progress_increase_remaining(&job->progress, delta);
 }
 
+/* Called with job_mutex held. */
 void job_event_cancelled(Job *job)
 {
     notifier_list_notify(&job->on_finalize_cancelled, job);
 }
 
+/* Called with job_mutex held. */
 void job_event_completed(Job *job)
 {
     notifier_list_notify(&job->on_finalize_completed, job);
 }
 
+/* Called with job_mutex held. */
 static void job_event_pending(Job *job)
 {
     notifier_list_notify(&job->on_pending, job);
 }
 
+/* Called with job_mutex held. */
 static void job_event_ready(Job *job)
 {
     notifier_list_notify(&job->on_ready, job);
 }
 
+/* Called with job_mutex held. */
 static void job_event_idle(Job *job)
 {
     notifier_list_notify(&job->on_idle, job);
 }
 
+/* Called with job_mutex held, but releases it temporarly. */
 void job_enter_cond(Job *job, bool(*fn)(Job *job))
 {
     if (!job_started(job)) {
@@ -537,14 +587,11 @@ void job_enter_cond(Job *job, bool(*fn)(Job *job))
         return;
     }
 
-    job_lock();
     if (job->busy) {
-        job_unlock();
         return;
     }
 
     if (fn && !fn(job)) {
-        job_unlock();
         return;
     }
 
@@ -552,7 +599,8 @@ void job_enter_cond(Job *job, bool(*fn)(Job *job))
     timer_del(&job->sleep_timer);
     job->busy = true;
     job_unlock();
-    aio_co_enter(job->aio_context, job->co);
+    aio_co_enter(job_get_aiocontext(job), job->co);
+    job_lock();
 }
 
 /* Called with job_mutex held. */
@@ -565,7 +613,7 @@ void job_enter_locked(Job *job)
 void job_enter(Job *job)
 {
     job_lock();
-    job_enter_locked(job, NULL);
+    job_enter_locked(job);
     job_unlock();
 }
 
@@ -574,7 +622,11 @@ void job_enter(Job *job)
  * is allowed and cancels the timer.
  *
  * If @ns is (uint64_t) -1, no timer is scheduled and job_enter() must be
- * called explicitly. */
+ * called explicitly.
+ *
+ * Called with job_mutex *not* held (we don't want the coroutine
+ * to yield with the lock held!).
+ */
 static void coroutine_fn job_do_yield(Job *job, uint64_t ns)
 {
     job_lock();
@@ -587,86 +639,122 @@ static void coroutine_fn job_do_yield(Job *job, uint64_t ns)
     qemu_coroutine_yield();
 
     /* Set by job_enter_cond() before re-entering the coroutine.  */
+    job_lock();
     assert(job->busy);
+    job_unlock();
 }
 
+/*
+ * Called with job_mutex *not* held (we don't want the coroutine
+ * to yield with the lock held!).
+ */
 void coroutine_fn job_pause_point(Job *job)
 {
     assert(job && job_started(job));
 
+    job_lock();
     if (!job_should_pause(job)) {
+        job_unlock();
         return;
     }
-    if (job_is_cancelled(job)) {
+    if (job_is_cancelled_locked(job)) {
+        job_unlock();
         return;
     }
 
     if (job->driver->pause) {
+        job_unlock();
         job->driver->pause(job);
+        job_lock();
     }
 
-    if (job_should_pause(job) && !job_is_cancelled(job)) {
+    if (job_should_pause(job) && !job_is_cancelled_locked(job)) {
         JobStatus status = job->status;
         job_state_transition(job, status == JOB_STATUS_READY
                                   ? JOB_STATUS_STANDBY
                                   : JOB_STATUS_PAUSED);
         job->paused = true;
+        job_unlock();
         job_do_yield(job, -1);
+        job_lock();
         job->paused = false;
         job_state_transition(job, status);
     }
+    job_unlock();
 
     if (job->driver->resume) {
         job->driver->resume(job);
     }
 }
 
+/*
+ * Called with job_mutex *not* held (we don't want the coroutine
+ * to yield with the lock held!).
+ */
 void job_yield(Job *job)
 {
+    bool res;
+    job_lock();
     assert(job->busy);
 
     /* Check cancellation *before* setting busy = false, too!  */
-    if (job_is_cancelled(job)) {
+    if (job_is_cancelled_locked(job)) {
+        job_unlock();
         return;
     }
 
-    if (!job_should_pause(job)) {
+    res = job_should_pause(job);
+    job_unlock();
+
+    if (!res) {
         job_do_yield(job, -1);
     }
 
     job_pause_point(job);
 }
 
+/*
+ * Called with job_mutex *not* held (we don't want the coroutine
+ * to yield with the lock held!).
+ */
 void coroutine_fn job_sleep_ns(Job *job, int64_t ns)
 {
+    bool res;
+    job_lock();
     assert(job->busy);
 
     /* Check cancellation *before* setting busy = false, too!  */
-    if (job_is_cancelled(job)) {
+    if (job_is_cancelled_locked(job)) {
+        job_unlock();
         return;
     }
 
-    if (!job_should_pause(job)) {
+    res = job_should_pause(job);
+    job_unlock();
+
+    if (!res) {
         job_do_yield(job, qemu_clock_get_ns(QEMU_CLOCK_REALTIME) + ns);
     }
 
     job_pause_point(job);
 }
 
-/* Assumes the block_job_mutex is held */
+/* Called with job_mutex held. */
 static bool job_timer_not_pending(Job *job)
 {
     return !timer_pending(&job->sleep_timer);
 }
 
+/* Called with job_mutex held. */
 void job_pause(Job *job)
 {
     job->pause_count++;
     if (!job->paused) {
-        job_enter(job);
+        job_enter_locked(job);
     }
 }
 
+/* Called with job_mutex held. */
 void job_resume(Job *job)
 {
     assert(job->pause_count > 0);
@@ -679,6 +767,7 @@ void job_resume(Job *job)
     job_enter_cond(job, job_timer_not_pending);
 }
 
+/* Called with job_mutex held. */
 void job_user_pause(Job *job, Error **errp)
 {
     if (job_apply_verb(job, JOB_VERB_PAUSE, errp)) {
@@ -692,16 +781,19 @@ void job_user_pause(Job *job, Error **errp)
     job_pause(job);
 }
 
+/* Called with job_mutex held. */
 bool job_user_paused(Job *job)
 {
     return job->user_paused;
 }
 
+/* Called with job_mutex held. */
 void job_set_user_paused(Job *job)
 {
     job->user_paused = true;
 }
 
+/* Called with job_mutex held. Temporarly releases the lock. */
 void job_user_resume(Job *job, Error **errp)
 {
     assert(job);
@@ -713,12 +805,15 @@ void job_user_resume(Job *job, Error **errp)
         return;
     }
     if (job->driver->user_resume) {
+        job_unlock();
         job->driver->user_resume(job);
+        job_lock();
     }
     job->user_paused = false;
     job_resume(job);
 }
 
+/* Called with job_mutex held. */
 static void job_do_dismiss(Job *job)
 {
     assert(job);
@@ -732,6 +827,7 @@ static void job_do_dismiss(Job *job)
     job_unref(job);
 }
 
+/* Called with job_mutex held. */
 void job_dismiss(Job **jobptr, Error **errp)
 {
     Job *job = *jobptr;
@@ -761,9 +857,10 @@ static void job_conclude(Job *job)
     }
 }
 
+/* Called with job_mutex held. */
 static void job_update_rc(Job *job)
 {
-    if (!job->ret && job_is_cancelled(job)) {
+    if (!job->ret && job_is_cancelled_locked(job)) {
         job->ret = -ECANCELED;
     }
     if (job->ret) {
@@ -774,22 +871,25 @@ static void job_update_rc(Job *job)
     }
 }
 
+/* Called with job_mutex *not* held. */
 static void job_commit(Job *job)
 {
-    assert(!job->ret);
+    assert(!job_get_ret(job));
     if (job->driver->commit) {
         job->driver->commit(job);
     }
 }
 
+/* Called with job_mutex *not* held. */
 static void job_abort(Job *job)
 {
-    assert(job->ret);
+    assert(job_get_ret(job));
     if (job->driver->abort) {
         job->driver->abort(job);
     }
 }
 
+/* Called with job_mutex *not* held. */
 static void job_clean(Job *job)
 {
     if (job->driver->clean) {
@@ -797,14 +897,18 @@ static void job_clean(Job *job)
     }
 }
 
+/* Called with job lock held, but it releases it temporarily */
 static int job_finalize_single(Job *job)
 {
-    assert(job_is_completed(job));
+    int ret;
+    assert(job_is_completed_locked(job));
 
     /* Ensure abort is called for late-transactional failures */
     job_update_rc(job);
 
-    if (!job->ret) {
+    ret = job->ret;
+    job_unlock();
+    if (!ret) {
         job_commit(job);
     } else {
         job_abort(job);
@@ -812,12 +916,13 @@ static int job_finalize_single(Job *job)
     job_clean(job);
 
     if (job->cb) {
-        job->cb(job->opaque, job->ret);
+        job->cb(job->opaque, ret);
     }
+    job_lock();
 
     /* Emit events only if we actually started */
     if (job_started(job)) {
-        if (job_is_cancelled(job)) {
+        if (job_is_cancelled_locked(job)) {
             job_event_cancelled(job);
         } else {
             job_event_completed(job);
@@ -829,15 +934,20 @@ static int job_finalize_single(Job *job)
     return 0;
 }
 
+/* Called with job_mutex held. Temporarly releases the lock. */
 static void job_cancel_async(Job *job, bool force)
 {
     if (job->driver->cancel) {
+        job_unlock();
         job->driver->cancel(job, force);
+        job_lock();
     }
     if (job->user_paused) {
         /* Do not call job_enter here, the caller will handle it.  */
         if (job->driver->user_resume) {
+            job_unlock();
             job->driver->user_resume(job);
+            job_lock();
         }
         job->user_paused = false;
         assert(job->pause_count > 0);
@@ -848,27 +958,21 @@ static void job_cancel_async(Job *job, bool force)
     job->force_cancel |= force;
 }
 
+/* Called with job_mutex held. */
 static void job_completed_txn_abort(Job *job)
 {
-    AioContext *outer_ctx = job->aio_context;
     AioContext *ctx;
     JobTxn *txn = job->txn;
     Job *other_job;
 
-    if (txn->aborting) {
+    if (qatomic_cmpxchg(&txn->aborting, false, true)) {
         /*
          * We are cancelled by another job, which will handle everything.
          */
         return;
     }
-    txn->aborting = true;
     job_txn_ref(txn);
 
-    /* We can only hold the single job's AioContext lock while calling
-     * job_finalize_single() because the finalization callbacks can involve
-     * calls of AIO_WAIT_WHILE(), which could deadlock otherwise. */
-    aio_context_release(outer_ctx);
-
     /* Other jobs are effectively cancelled by us, set the status for
      * them; this job, however, may or may not be cancelled, depending
      * on the caller, so leave it. */
@@ -884,33 +988,39 @@ static void job_completed_txn_abort(Job *job)
         other_job = QLIST_FIRST(&txn->jobs);
         ctx = other_job->aio_context;
         aio_context_acquire(ctx);
-        if (!job_is_completed(other_job)) {
-            assert(job_is_cancelled(other_job));
+        if (!job_is_completed_locked(other_job)) {
+            assert(job_is_cancelled_locked(other_job));
             job_finish_sync(other_job, NULL, NULL);
         }
         job_finalize_single(other_job);
         aio_context_release(ctx);
     }
 
-    aio_context_acquire(outer_ctx);
-
     job_txn_unref(txn);
 }
 
+/* Called with job_mutex held. Temporarly releases the lock. */
 static int job_prepare(Job *job)
 {
+    int ret;
+
     if (job->ret == 0 && job->driver->prepare) {
-        job->ret = job->driver->prepare(job);
+        job_unlock();
+        ret = job->driver->prepare(job);
+        job_lock();
+        job->ret = ret;
         job_update_rc(job);
     }
     return job->ret;
 }
 
+/* Does not need job_mutex */
 static int job_needs_finalize(Job *job)
 {
     return !job->auto_finalize;
 }
 
+/* Called with job_mutex held. */
 static void job_do_finalize(Job *job)
 {
     int rc;
@@ -925,6 +1035,7 @@ static void job_do_finalize(Job *job)
     }
 }
 
+/* Called with job_mutex held. */
 void job_finalize(Job *job, Error **errp)
 {
     assert(job && job->id);
@@ -934,6 +1045,7 @@ void job_finalize(Job *job, Error **errp)
     job_do_finalize(job);
 }
 
+/* Called with job_mutex held. */
 static int job_transition_to_pending(Job *job)
 {
     job_state_transition(job, JOB_STATUS_PENDING);
@@ -943,17 +1055,22 @@ static int job_transition_to_pending(Job *job)
     return 0;
 }
 
+/* Called with job_mutex *not* held. */
 void job_transition_to_ready(Job *job)
 {
+    job_lock();
     job_state_transition(job, JOB_STATUS_READY);
     job_event_ready(job);
+    job_unlock();
 }
 
+/* Called with job_mutex held. */
 static void job_completed_txn_success(Job *job)
 {
-    JobTxn *txn = job->txn;
+    JobTxn *txn;
     Job *other_job;
 
+    txn = job->txn;
     job_state_transition(job, JOB_STATUS_WAITING);
 
     /*
@@ -961,7 +1078,7 @@ static void job_completed_txn_success(Job *job)
      * txn.
      */
     QLIST_FOREACH(other_job, &txn->jobs, txn_list) {
-        if (!job_is_completed(other_job)) {
+        if (!job_is_completed_locked(other_job)) {
             return;
         }
         assert(other_job->ret == 0);
@@ -975,9 +1092,10 @@ static void job_completed_txn_success(Job *job)
     }
 }
 
+/* Called with job_mutex held. */
 static void job_completed(Job *job)
 {
-    assert(job && job->txn && !job_is_completed(job));
+    assert(job && job->txn && !job_is_completed_locked(job));
 
     job_update_rc(job);
     trace_job_completed(job, job->ret);
@@ -988,14 +1106,16 @@ static void job_completed(Job *job)
     }
 }
 
-/** Useful only as a type shim for aio_bh_schedule_oneshot. */
+/**
+ * Useful only as a type shim for aio_bh_schedule_oneshot.
+ *  Called with job_mutex *not* held.
+ */
 static void job_exit(void *opaque)
 {
     Job *job = (Job *)opaque;
-    AioContext *ctx;
 
+    job_lock();
     job_ref(job);
-    aio_context_acquire(job->aio_context);
 
     /* This is a lie, we're not quiescent, but still doing the completion
      * callbacks. However, completion callbacks tend to involve operations that
@@ -1012,29 +1132,40 @@ static void job_exit(void *opaque)
      * acquiring the new lock, and we ref/unref to avoid job_completed freeing
      * the job underneath us.
      */
-    ctx = job->aio_context;
     job_unref(job);
-    aio_context_release(ctx);
+    job_unlock();
 }
 
 /**
  * All jobs must allow a pause point before entering their job proper. This
  * ensures that jobs can be paused prior to being started, then resumed later.
+ *
+ * Called with job_mutex *not* held.
  */
 static void coroutine_fn job_co_entry(void *opaque)
 {
     Job *job = opaque;
+    Error *local_error = NULL;
+    int ret;
 
     assert(job && job->driver && job->driver->run);
     job_pause_point(job);
-    job->ret = job->driver->run(job, &job->err);
+    ret = job->driver->run(job, &local_error);
+    job_lock();
+    if (local_error) {
+        error_propagate(&job->err, local_error);
+    }
+    job->ret = ret;
     job->deferred_to_main_loop = true;
     job->busy = true;
+    job_unlock();
     aio_bh_schedule_oneshot(qemu_get_aio_context(), job_exit, job);
 }
 
+/* Called with job_mutex *not* held. */
 void job_start(Job *job)
 {
+    job_lock();
     assert(job && !job_started(job) && job->paused &&
            job->driver && job->driver->run);
     job->co = qemu_coroutine_create(job_co_entry, job);
@@ -1042,9 +1173,11 @@ void job_start(Job *job)
     job->busy = true;
     job->paused = false;
     job_state_transition(job, JOB_STATUS_RUNNING);
-    aio_co_enter(job->aio_context, job->co);
+    job_unlock();
+    aio_co_enter(job_get_aiocontext(job), job->co);
 }
 
+/* Called with job_mutex held. */
 void job_cancel(Job *job, bool force)
 {
     if (job->status == JOB_STATUS_CONCLUDED) {
@@ -1057,10 +1190,11 @@ void job_cancel(Job *job, bool force)
     } else if (job->deferred_to_main_loop) {
         job_completed_txn_abort(job);
     } else {
-        job_enter(job);
+        job_enter_locked(job);
     }
 }
 
+/* Called with job_mutex held. */
 void job_user_cancel(Job *job, bool force, Error **errp)
 {
     if (job_apply_verb(job, JOB_VERB_CANCEL, errp)) {
@@ -1069,19 +1203,36 @@ void job_user_cancel(Job *job, bool force, Error **errp)
     job_cancel(job, force);
 }
 
-/* A wrapper around job_cancel() taking an Error ** parameter so it may be
+/*
+ * A wrapper around job_cancel() taking an Error ** parameter so it may be
  * used with job_finish_sync() without the need for (rather nasty) function
- * pointer casts there. */
+ * pointer casts there.
+ *
+ * Called with job_mutex held.
+ */
 static void job_cancel_err(Job *job, Error **errp)
 {
     job_cancel(job, false);
 }
 
+/*
+ * Called with job_mutex *not* held, unlike most other APIs consumed
+ * by the monitor!
+ */
 int job_cancel_sync(Job *job)
 {
-    return job_finish_sync(job, &job_cancel_err, NULL);
+    int ret;
+
+    job_lock();
+    ret = job_finish_sync(job, &job_cancel_err, NULL);
+    job_unlock();
+    return ret;
 }
 
+/*
+ * Called with job_mutex *not* held, unlike most other APIs consumed
+ * by the monitor!
+ */
 void job_cancel_sync_all(void)
 {
     Job *job;
@@ -1095,11 +1246,13 @@ void job_cancel_sync_all(void)
     }
 }
 
+/* Called with job_mutex held. */
 int job_complete_sync(Job *job, Error **errp)
 {
     return job_finish_sync(job, job_complete, errp);
 }
 
+/* Called with job_mutex held. Temporarly releases the lock. */
 void job_complete(Job *job, Error **errp)
 {
     /* Should not be reachable via external interface for internal jobs */
@@ -1107,15 +1260,18 @@ void job_complete(Job *job, Error **errp)
     if (job_apply_verb(job, JOB_VERB_COMPLETE, errp)) {
         return;
     }
-    if (job_is_cancelled(job) || !job->driver->complete) {
+    if (job_is_cancelled_locked(job) || !job->driver->complete) {
         error_setg(errp, "The active block job '%s' cannot be completed",
                    job->id);
         return;
     }
 
+    job_unlock();
     job->driver->complete(job, errp);
+    job_lock();
 }
 
+/* Called with job_mutex held. */
 int job_finish_sync(Job *job, void (*finish)(Job *, Error **errp), Error **errp)
 {
     Error *local_err = NULL;
@@ -1132,10 +1288,12 @@ int job_finish_sync(Job *job, void (*finish)(Job *, Error **errp), Error **errp)
         return -EBUSY;
     }
 
-    AIO_WAIT_WHILE(job->aio_context,
-                   (job_enter(job), !job_is_completed(job)));
+    job_unlock();
+    AIO_WAIT_WHILE(NULL, (job_enter(job), !job_is_completed(job)));
+    job_lock();
 
-    ret = (job_is_cancelled(job) && job->ret == 0) ? -ECANCELED : job->ret;
+    ret = (job_is_cancelled_locked(job) && job->ret == 0) ?
+           -ECANCELED : job->ret;
     job_unref(job);
     return ret;
 }
diff --git a/qemu-img.c b/qemu-img.c
index d16bd367d9..82debde038 100644
--- a/qemu-img.c
+++ b/qemu-img.c
@@ -898,17 +898,19 @@ static void common_block_job_cb(void *opaque, int ret)
     }
 }
 
+/* Called with job_mutex held. Releases it temporarly */
 static void run_block_job(BlockJob *job, Error **errp)
 {
     uint64_t progress_current, progress_total;
     AioContext *aio_context = blk_get_aio_context(job->blk);
     int ret = 0;
 
-    aio_context_acquire(aio_context);
     job_ref(&job->job);
     do {
         float progress = 0.0f;
+        job_unlock();
         aio_poll(aio_context, true);
+        job_lock();
 
         progress_get_snapshot(&job->job.progress, &progress_current,
                               &progress_total);
@@ -916,15 +918,15 @@ static void run_block_job(BlockJob *job, Error **errp)
             progress = (float)progress_current / progress_total * 100.f;
         }
         qemu_progress_print(progress, 0);
-    } while (!job_is_ready(&job->job) && !job_is_completed(&job->job));
+    } while (!job_is_ready_locked(&job->job) &&
+             !job_is_completed_locked(&job->job));
 
-    if (!job_is_completed(&job->job)) {
+    if (!job_is_completed_locked(&job->job)) {
         ret = job_complete_sync(&job->job, errp);
     } else {
-        ret = job_get_ret(&job->job);
+        ret = job_get_ret_locked(&job->job);
     }
     job_unref(&job->job);
-    aio_context_release(aio_context);
 
     /* publish completion progress only when success */
     if (!ret) {
@@ -1076,9 +1078,12 @@ static int img_commit(int argc, char **argv)
         bdrv_ref(bs);
     }
 
+    job_lock();
     job = block_job_get("commit");
     assert(job);
     run_block_job(job, &local_err);
+    job_unlock();
+
     if (local_err) {
         goto unref_backing;
     }
-- 
2.31.1



^ permalink raw reply related	[flat|nested] 33+ messages in thread

* [RFC PATCH 6/6] jobs: remove unnecessary AioContext aquire/release pairs
  2021-07-07 16:58 [RFC PATCH 0/6] job: replace AioContext lock with job_mutex Emanuele Giuseppe Esposito
                   ` (4 preceding siblings ...)
  2021-07-07 16:58 ` [RFC PATCH 5/6] job: use global job_mutex to protect struct Job Emanuele Giuseppe Esposito
@ 2021-07-07 16:58 ` Emanuele Giuseppe Esposito
  2021-07-08 10:36 ` [RFC PATCH 0/6] job: replace AioContext lock with job_mutex Stefan Hajnoczi
  2021-07-08 13:09 ` Stefan Hajnoczi
  7 siblings, 0 replies; 33+ messages in thread
From: Emanuele Giuseppe Esposito @ 2021-07-07 16:58 UTC (permalink / raw)
  To: qemu-block
  Cc: Kevin Wolf, Emanuele Giuseppe Esposito,
	Vladimir Sementsov-Ogievskiy, qemu-devel, Wen Congyang,
	Xie Changlong, Markus Armbruster, Max Reitz, Stefan Hajnoczi,
	Paolo Bonzini, John Snow

Now that we use the job_mutex, remove unnecessary aio_context_acquire/release
pairs. However, some place still needs it, so try to reduce the
aio_context critical section to the minimum.

This patch is separated from the one before because here we are removing
locks without substituting it with aiocontext_acquire/release pairs.

These sections will also be removed in future, when the underlaying bdrv_*
API will also be free of context locks.

Signed-off-by: Emanuele Giuseppe Esposito <eesposit@redhat.com>
---
 block/mirror.c                 |   6 ++
 block/monitor/block-hmp-cmds.c |   6 --
 blockdev.c                     | 173 ++++++++-------------------------
 blockjob.c                     |   3 +
 job.c                          |   9 +-
 qemu-img.c                     |   4 -
 6 files changed, 54 insertions(+), 147 deletions(-)

diff --git a/block/mirror.c b/block/mirror.c
index deefaa6a39..8d30c53690 100644
--- a/block/mirror.c
+++ b/block/mirror.c
@@ -1857,6 +1857,7 @@ void mirror_start(const char *job_id, BlockDriverState *bs,
 {
     bool is_none_mode;
     BlockDriverState *base;
+    AioContext *aio_context;
 
     if ((mode == MIRROR_SYNC_MODE_INCREMENTAL) ||
         (mode == MIRROR_SYNC_MODE_BITMAP)) {
@@ -1866,11 +1867,16 @@ void mirror_start(const char *job_id, BlockDriverState *bs,
     }
     is_none_mode = mode == MIRROR_SYNC_MODE_NONE;
     base = mode == MIRROR_SYNC_MODE_TOP ? bdrv_backing_chain_next(bs) : NULL;
+
+    aio_context = bdrv_get_aio_context(bs);
+    aio_context_acquire(aio_context);
     mirror_start_job(job_id, bs, creation_flags, target, replaces,
                      speed, granularity, buf_size, backing_mode, zero_target,
                      on_source_error, on_target_error, unmap, NULL, NULL,
                      &mirror_job_driver, is_none_mode, base, false,
                      filter_node_name, true, copy_mode, errp);
+    aio_context_release(aio_context);
+
 }
 
 BlockJob *commit_active_start(const char *job_id, BlockDriverState *bs,
diff --git a/block/monitor/block-hmp-cmds.c b/block/monitor/block-hmp-cmds.c
index 3e6670c963..99095afae7 100644
--- a/block/monitor/block-hmp-cmds.c
+++ b/block/monitor/block-hmp-cmds.c
@@ -206,7 +206,6 @@ void hmp_commit(Monitor *mon, const QDict *qdict)
         ret = blk_commit_all();
     } else {
         BlockDriverState *bs;
-        AioContext *aio_context;
 
         blk = blk_by_name(device);
         if (!blk) {
@@ -219,12 +218,7 @@ void hmp_commit(Monitor *mon, const QDict *qdict)
         }
 
         bs = bdrv_skip_implicit_filters(blk_bs(blk));
-        aio_context = bdrv_get_aio_context(bs);
-        aio_context_acquire(aio_context);
-
         ret = bdrv_commit(bs);
-
-        aio_context_release(aio_context);
     }
     if (ret < 0) {
         error_report("'commit' error for '%s': %s", device, strerror(-ret));
diff --git a/blockdev.c b/blockdev.c
index 9255aea6a2..119cb9a539 100644
--- a/blockdev.c
+++ b/blockdev.c
@@ -147,13 +147,8 @@ void blockdev_mark_auto_del(BlockBackend *blk)
 
     for (job = block_job_next(NULL); job; job = block_job_next(job)) {
         if (block_job_has_bdrv(job, blk_bs(blk))) {
-            AioContext *aio_context = job_get_aiocontext(&job->job);
-            aio_context_acquire(aio_context);
-
             job_lock();
             job_cancel(&job->job, false);
-
-            aio_context_release(aio_context);
             job_unlock();
         }
     }
@@ -1714,7 +1709,6 @@ static void drive_backup_prepare(BlkActionState *common, Error **errp)
     }
 
     aio_context = bdrv_get_aio_context(bs);
-    aio_context_acquire(aio_context);
 
     /* Paired with .clean() */
     bdrv_drained_begin(bs);
@@ -1726,7 +1720,7 @@ static void drive_backup_prepare(BlkActionState *common, Error **errp)
 
     /* Early check to avoid creating target */
     if (bdrv_op_is_blocked(bs, BLOCK_OP_TYPE_BACKUP_SOURCE, errp)) {
-        goto out;
+        return;
     }
 
     flags = bs->open_flags | BDRV_O_RDWR;
@@ -1756,7 +1750,7 @@ static void drive_backup_prepare(BlkActionState *common, Error **errp)
     size = bdrv_getlength(bs);
     if (size < 0) {
         error_setg_errno(errp, -size, "bdrv_getlength failed");
-        goto out;
+        return;
     }
 
     if (backup->mode != NEW_IMAGE_MODE_EXISTING) {
@@ -1779,7 +1773,7 @@ static void drive_backup_prepare(BlkActionState *common, Error **errp)
 
     if (local_err) {
         error_propagate(errp, local_err);
-        goto out;
+        return;
     }
 
     options = qdict_new();
@@ -1791,12 +1785,11 @@ static void drive_backup_prepare(BlkActionState *common, Error **errp)
 
     target_bs = bdrv_open(backup->target, NULL, options, flags, errp);
     if (!target_bs) {
-        goto out;
+        return;
     }
 
     /* Honor bdrv_try_set_aio_context() context acquisition requirements. */
     old_context = bdrv_get_aio_context(target_bs);
-    aio_context_release(aio_context);
     aio_context_acquire(old_context);
 
     ret = bdrv_try_set_aio_context(target_bs, aio_context, errp);
@@ -1807,7 +1800,6 @@ static void drive_backup_prepare(BlkActionState *common, Error **errp)
     }
 
     aio_context_release(old_context);
-    aio_context_acquire(aio_context);
 
     if (set_backing_hd) {
         if (bdrv_set_backing_hd(target_bs, source, errp) < 0) {
@@ -1816,29 +1808,21 @@ static void drive_backup_prepare(BlkActionState *common, Error **errp)
     }
 
     state->bs = bs;
-
+    aio_context_acquire(aio_context);
     state->job = do_backup_common(qapi_DriveBackup_base(backup),
                                   bs, target_bs, aio_context,
                                   common->block_job_txn, errp);
-
+    aio_context_release(aio_context);
 unref:
     bdrv_unref(target_bs);
-out:
-    aio_context_release(aio_context);
 }
 
 static void drive_backup_commit(BlkActionState *common)
 {
     DriveBackupState *state = DO_UPCAST(DriveBackupState, common, common);
-    AioContext *aio_context;
-
-    aio_context = bdrv_get_aio_context(state->bs);
-    aio_context_acquire(aio_context);
 
     assert(state->job);
     job_start(&state->job->job);
-
-    aio_context_release(aio_context);
 }
 
 static void drive_backup_abort(BlkActionState *common)
@@ -1846,32 +1830,18 @@ static void drive_backup_abort(BlkActionState *common)
     DriveBackupState *state = DO_UPCAST(DriveBackupState, common, common);
 
     if (state->job) {
-        AioContext *aio_context;
-
-        aio_context = bdrv_get_aio_context(state->bs);
-        aio_context_acquire(aio_context);
-
         job_cancel_sync(&state->job->job);
-
-        aio_context_release(aio_context);
     }
 }
 
 static void drive_backup_clean(BlkActionState *common)
 {
     DriveBackupState *state = DO_UPCAST(DriveBackupState, common, common);
-    AioContext *aio_context;
 
-    if (!state->bs) {
-        return;
+    if (state->bs) {
+        bdrv_drained_end(state->bs);
     }
 
-    aio_context = bdrv_get_aio_context(state->bs);
-    aio_context_acquire(aio_context);
-
-    bdrv_drained_end(state->bs);
-
-    aio_context_release(aio_context);
 }
 
 typedef struct BlockdevBackupState {
@@ -1931,15 +1901,9 @@ static void blockdev_backup_prepare(BlkActionState *common, Error **errp)
 static void blockdev_backup_commit(BlkActionState *common)
 {
     BlockdevBackupState *state = DO_UPCAST(BlockdevBackupState, common, common);
-    AioContext *aio_context;
-
-    aio_context = bdrv_get_aio_context(state->bs);
-    aio_context_acquire(aio_context);
 
     assert(state->job);
     job_start(&state->job->job);
-
-    aio_context_release(aio_context);
 }
 
 static void blockdev_backup_abort(BlkActionState *common)
@@ -1947,32 +1911,17 @@ static void blockdev_backup_abort(BlkActionState *common)
     BlockdevBackupState *state = DO_UPCAST(BlockdevBackupState, common, common);
 
     if (state->job) {
-        AioContext *aio_context;
-
-        aio_context = bdrv_get_aio_context(state->bs);
-        aio_context_acquire(aio_context);
-
         job_cancel_sync(&state->job->job);
-
-        aio_context_release(aio_context);
     }
 }
 
 static void blockdev_backup_clean(BlkActionState *common)
 {
     BlockdevBackupState *state = DO_UPCAST(BlockdevBackupState, common, common);
-    AioContext *aio_context;
 
-    if (!state->bs) {
-        return;
+    if (state->bs) {
+        bdrv_drained_end(state->bs);
     }
-
-    aio_context = bdrv_get_aio_context(state->bs);
-    aio_context_acquire(aio_context);
-
-    bdrv_drained_end(state->bs);
-
-    aio_context_release(aio_context);
 }
 
 typedef struct BlockDirtyBitmapState {
@@ -2486,7 +2435,6 @@ void qmp_block_stream(bool has_job_id, const char *job_id, const char *device,
     BlockDriverState *bs, *iter, *iter_end;
     BlockDriverState *base_bs = NULL;
     BlockDriverState *bottom_bs = NULL;
-    AioContext *aio_context;
     Error *local_err = NULL;
     int job_flags = JOB_DEFAULT;
 
@@ -2517,52 +2465,46 @@ void qmp_block_stream(bool has_job_id, const char *job_id, const char *device,
         return;
     }
 
-    aio_context = bdrv_get_aio_context(bs);
-    aio_context_acquire(aio_context);
-
     if (has_base) {
         base_bs = bdrv_find_backing_image(bs, base);
         if (base_bs == NULL) {
             error_setg(errp, "Can't find '%s' in the backing chain", base);
-            goto out;
+            return;
         }
-        assert(bdrv_get_aio_context(base_bs) == aio_context);
     }
 
     if (has_base_node) {
         base_bs = bdrv_lookup_bs(NULL, base_node, errp);
         if (!base_bs) {
-            goto out;
+            return;
         }
         if (bs == base_bs || !bdrv_chain_contains(bs, base_bs)) {
             error_setg(errp, "Node '%s' is not a backing image of '%s'",
                        base_node, device);
-            goto out;
+            return;
         }
-        assert(bdrv_get_aio_context(base_bs) == aio_context);
         bdrv_refresh_filename(base_bs);
     }
 
     if (has_bottom) {
         bottom_bs = bdrv_lookup_bs(NULL, bottom, errp);
         if (!bottom_bs) {
-            goto out;
+            return;
         }
         if (!bottom_bs->drv) {
             error_setg(errp, "Node '%s' is not open", bottom);
-            goto out;
+            return;
         }
         if (bottom_bs->drv->is_filter) {
             error_setg(errp, "Node '%s' is a filter, use a non-filter node "
                        "as 'bottom'", bottom);
-            goto out;
+            return;
         }
         if (!bdrv_chain_contains(bs, bottom_bs)) {
             error_setg(errp, "Node '%s' is not in a chain starting from '%s'",
                        bottom, device);
-            goto out;
+            return;
         }
-        assert(bdrv_get_aio_context(bottom_bs) == aio_context);
     }
 
     /*
@@ -2573,7 +2515,7 @@ void qmp_block_stream(bool has_job_id, const char *job_id, const char *device,
          iter = bdrv_filter_or_cow_bs(iter))
     {
         if (bdrv_op_is_blocked(iter, BLOCK_OP_TYPE_STREAM, errp)) {
-            goto out;
+            return;
         }
     }
 
@@ -2582,7 +2524,7 @@ void qmp_block_stream(bool has_job_id, const char *job_id, const char *device,
     if (base_bs == NULL && has_backing_file) {
         error_setg(errp, "backing file specified, but streaming the "
                          "entire chain");
-        goto out;
+        return;
     }
 
     if (has_auto_finalize && !auto_finalize) {
@@ -2597,13 +2539,10 @@ void qmp_block_stream(bool has_job_id, const char *job_id, const char *device,
                  filter_node_name, &local_err);
     if (local_err) {
         error_propagate(errp, local_err);
-        goto out;
+        return;
     }
 
     trace_qmp_block_stream(bs);
-
-out:
-    aio_context_release(aio_context);
 }
 
 void qmp_block_commit(bool has_job_id, const char *job_id, const char *device,
@@ -2622,7 +2561,6 @@ void qmp_block_commit(bool has_job_id, const char *job_id, const char *device,
     BlockDriverState *bs;
     BlockDriverState *iter;
     BlockDriverState *base_bs, *top_bs;
-    AioContext *aio_context;
     Error *local_err = NULL;
     int job_flags = JOB_DEFAULT;
     uint64_t top_perm, top_shared;
@@ -2661,11 +2599,8 @@ void qmp_block_commit(bool has_job_id, const char *job_id, const char *device,
         return;
     }
 
-    aio_context = bdrv_get_aio_context(bs);
-    aio_context_acquire(aio_context);
-
     if (bdrv_op_is_blocked(bs, BLOCK_OP_TYPE_COMMIT_SOURCE, errp)) {
-        goto out;
+        return;
     }
 
     /* default top_bs is the active layer */
@@ -2673,16 +2608,16 @@ void qmp_block_commit(bool has_job_id, const char *job_id, const char *device,
 
     if (has_top_node && has_top) {
         error_setg(errp, "'top-node' and 'top' are mutually exclusive");
-        goto out;
+        return;
     } else if (has_top_node) {
         top_bs = bdrv_lookup_bs(NULL, top_node, errp);
         if (top_bs == NULL) {
-            goto out;
+            return;
         }
         if (!bdrv_chain_contains(bs, top_bs)) {
             error_setg(errp, "'%s' is not in this backing file chain",
                        top_node);
-            goto out;
+            return;
         }
     } else if (has_top && top) {
         /* This strcmp() is just a shortcut, there is no need to
@@ -2696,52 +2631,48 @@ void qmp_block_commit(bool has_job_id, const char *job_id, const char *device,
 
     if (top_bs == NULL) {
         error_setg(errp, "Top image file %s not found", top ? top : "NULL");
-        goto out;
+        return;
     }
 
-    assert(bdrv_get_aio_context(top_bs) == aio_context);
-
     if (has_base_node && has_base) {
         error_setg(errp, "'base-node' and 'base' are mutually exclusive");
-        goto out;
+        return;
     } else if (has_base_node) {
         base_bs = bdrv_lookup_bs(NULL, base_node, errp);
         if (base_bs == NULL) {
-            goto out;
+            return;
         }
         if (!bdrv_chain_contains(top_bs, base_bs)) {
             error_setg(errp, "'%s' is not in this backing file chain",
                        base_node);
-            goto out;
+            return;
         }
     } else if (has_base && base) {
         base_bs = bdrv_find_backing_image(top_bs, base);
         if (base_bs == NULL) {
             error_setg(errp, "Can't find '%s' in the backing chain", base);
-            goto out;
+            return;
         }
     } else {
         base_bs = bdrv_find_base(top_bs);
         if (base_bs == NULL) {
             error_setg(errp, "There is no backimg image");
-            goto out;
+            return;
         }
     }
 
-    assert(bdrv_get_aio_context(base_bs) == aio_context);
-
     for (iter = top_bs; iter != bdrv_filter_or_cow_bs(base_bs);
          iter = bdrv_filter_or_cow_bs(iter))
     {
         if (bdrv_op_is_blocked(iter, BLOCK_OP_TYPE_COMMIT_TARGET, errp)) {
-            goto out;
+            return;
         }
     }
 
     /* Do not allow attempts to commit an image into itself */
     if (top_bs == base_bs) {
         error_setg(errp, "cannot commit an image into itself");
-        goto out;
+        return;
     }
 
     /*
@@ -2764,7 +2695,7 @@ void qmp_block_commit(bool has_job_id, const char *job_id, const char *device,
                 error_setg(errp, "'backing-file' specified, but 'top' has a "
                                  "writer on it");
             }
-            goto out;
+            return;
         }
         if (!has_job_id) {
             /*
@@ -2780,7 +2711,7 @@ void qmp_block_commit(bool has_job_id, const char *job_id, const char *device,
     } else {
         BlockDriverState *overlay_bs = bdrv_find_overlay(bs, top_bs);
         if (bdrv_op_is_blocked(overlay_bs, BLOCK_OP_TYPE_COMMIT_TARGET, errp)) {
-            goto out;
+            return;
         }
         commit_start(has_job_id ? job_id : NULL, bs, base_bs, top_bs, job_flags,
                      speed, on_error, has_backing_file ? backing_file : NULL,
@@ -2788,11 +2719,8 @@ void qmp_block_commit(bool has_job_id, const char *job_id, const char *device,
     }
     if (local_err != NULL) {
         error_propagate(errp, local_err);
-        goto out;
+        return;
     }
-
-out:
-    aio_context_release(aio_context);
 }
 
 /* Common QMP interface for drive-backup and blockdev-backup */
@@ -3089,7 +3017,6 @@ void qmp_drive_mirror(DriveMirror *arg, Error **errp)
 {
     BlockDriverState *bs;
     BlockDriverState *target_backing_bs, *target_bs;
-    AioContext *aio_context;
     AioContext *old_context;
     BlockMirrorBackingMode backing_mode;
     Error *local_err = NULL;
@@ -3110,9 +3037,6 @@ void qmp_drive_mirror(DriveMirror *arg, Error **errp)
         return;
     }
 
-    aio_context = bdrv_get_aio_context(bs);
-    aio_context_acquire(aio_context);
-
     if (!arg->has_mode) {
         arg->mode = NEW_IMAGE_MODE_ABSOLUTE_PATHS;
     }
@@ -3134,14 +3058,14 @@ void qmp_drive_mirror(DriveMirror *arg, Error **errp)
     size = bdrv_getlength(bs);
     if (size < 0) {
         error_setg_errno(errp, -size, "bdrv_getlength failed");
-        goto out;
+        return;
     }
 
     if (arg->has_replaces) {
         if (!arg->has_node_name) {
             error_setg(errp, "a node-name must be provided when replacing a"
                              " named node of the graph");
-            goto out;
+            return;
         }
     }
 
@@ -3184,7 +3108,7 @@ void qmp_drive_mirror(DriveMirror *arg, Error **errp)
 
     if (local_err) {
         error_propagate(errp, local_err);
-        goto out;
+        return;
     }
 
     options = qdict_new();
@@ -3200,7 +3124,7 @@ void qmp_drive_mirror(DriveMirror *arg, Error **errp)
      */
     target_bs = bdrv_open(arg->target, NULL, options, flags, errp);
     if (!target_bs) {
-        goto out;
+        return;
     }
 
     zero_target = (arg->sync == MIRROR_SYNC_MODE_FULL &&
@@ -3210,10 +3134,9 @@ void qmp_drive_mirror(DriveMirror *arg, Error **errp)
 
     /* Honor bdrv_try_set_aio_context() context acquisition requirements. */
     old_context = bdrv_get_aio_context(target_bs);
-    aio_context_release(aio_context);
     aio_context_acquire(old_context);
 
-    ret = bdrv_try_set_aio_context(target_bs, aio_context, errp);
+    ret = bdrv_try_set_aio_context(target_bs, bdrv_get_aio_context(bs), errp);
     if (ret < 0) {
         bdrv_unref(target_bs);
         aio_context_release(old_context);
@@ -3221,7 +3144,6 @@ void qmp_drive_mirror(DriveMirror *arg, Error **errp)
     }
 
     aio_context_release(old_context);
-    aio_context_acquire(aio_context);
 
     blockdev_mirror_common(arg->has_job_id ? arg->job_id : NULL, bs, target_bs,
                            arg->has_replaces, arg->replaces, arg->sync,
@@ -3238,8 +3160,6 @@ void qmp_drive_mirror(DriveMirror *arg, Error **errp)
                            arg->has_auto_dismiss, arg->auto_dismiss,
                            errp);
     bdrv_unref(target_bs);
-out:
-    aio_context_release(aio_context);
 }
 
 void qmp_blockdev_mirror(bool has_job_id, const char *job_id,
@@ -3262,7 +3182,6 @@ void qmp_blockdev_mirror(bool has_job_id, const char *job_id,
 {
     BlockDriverState *bs;
     BlockDriverState *target_bs;
-    AioContext *aio_context;
     AioContext *old_context;
     BlockMirrorBackingMode backing_mode = MIRROR_LEAVE_BACKING_CHAIN;
     bool zero_target;
@@ -3282,16 +3201,14 @@ void qmp_blockdev_mirror(bool has_job_id, const char *job_id,
 
     /* Honor bdrv_try_set_aio_context() context acquisition requirements. */
     old_context = bdrv_get_aio_context(target_bs);
-    aio_context = bdrv_get_aio_context(bs);
     aio_context_acquire(old_context);
 
-    ret = bdrv_try_set_aio_context(target_bs, aio_context, errp);
+    ret = bdrv_try_set_aio_context(target_bs, bdrv_get_aio_context(bs), errp);
 
     aio_context_release(old_context);
-    aio_context_acquire(aio_context);
 
     if (ret < 0) {
-        goto out;
+        return;
     }
 
     blockdev_mirror_common(has_job_id ? job_id : NULL, bs, target_bs,
@@ -3307,8 +3224,6 @@ void qmp_blockdev_mirror(bool has_job_id, const char *job_id,
                            has_auto_finalize, auto_finalize,
                            has_auto_dismiss, auto_dismiss,
                            errp);
-out:
-    aio_context_release(aio_context);
 }
 
 /* Get a block job using its ID and acquire its job_lock */
@@ -3696,15 +3611,11 @@ BlockJobInfoList *qmp_query_block_jobs(Error **errp)
 
     for (job = block_job_next(NULL); job; job = block_job_next(job)) {
         BlockJobInfo *value;
-        AioContext *aio_context;
 
         if (block_job_is_internal(job)) {
             continue;
         }
-        aio_context = blk_get_aio_context(job->blk);
-        aio_context_acquire(aio_context);
         value = block_job_query(job, errp);
-        aio_context_release(aio_context);
         if (!value) {
             qapi_free_BlockJobInfoList(head);
             return NULL;
diff --git a/blockjob.c b/blockjob.c
index e7b289089b..633abb3811 100644
--- a/blockjob.c
+++ b/blockjob.c
@@ -195,6 +195,7 @@ static const BdrvChildClass child_job = {
     .get_parent_aio_context = child_job_get_parent_aio_context,
 };
 
+/* Called with BQL held.  */
 void block_job_remove_all_bdrv(BlockJob *job)
 {
     /*
@@ -216,6 +217,7 @@ void block_job_remove_all_bdrv(BlockJob *job)
     }
 }
 
+/* Called with BQL held.  */
 bool block_job_has_bdrv(BlockJob *job, BlockDriverState *bs)
 {
     GSList *el;
@@ -230,6 +232,7 @@ bool block_job_has_bdrv(BlockJob *job, BlockDriverState *bs)
     return false;
 }
 
+/* Called with BQL held.  */
 int block_job_add_bdrv(BlockJob *job, const char *name, BlockDriverState *bs,
                        uint64_t perm, uint64_t shared_perm, Error **errp)
 {
diff --git a/job.c b/job.c
index e2006532b5..b86fce3679 100644
--- a/job.c
+++ b/job.c
@@ -220,7 +220,6 @@ static int job_txn_apply(Job *job, int fn(Job *))
      * break AIO_WAIT_WHILE from within fn.
      */
     job_ref(job);
-    aio_context_release(job->aio_context);
 
     QLIST_FOREACH_SAFE(other_job, &txn->jobs, txn_list, next) {
         inner_ctx = other_job->aio_context;
@@ -232,11 +231,6 @@ static int job_txn_apply(Job *job, int fn(Job *))
         }
     }
 
-    /*
-     * Note that job->aio_context might have been changed by calling fn, so we
-     * can't use a local variable to cache it.
-     */
-    aio_context_acquire(job->aio_context);
     job_unref(job);
     return rc;
 }
@@ -515,8 +509,11 @@ void job_unref(Job *job)
         assert(!job->txn);
 
         if (job->driver->free) {
+            AioContext *ctx = job_get_aiocontext(job);
             job_unlock();
+            aio_context_acquire(ctx);
             job->driver->free(job);
+            aio_context_release(ctx);
             job_lock();
         }
 
diff --git a/qemu-img.c b/qemu-img.c
index 82debde038..10bbe88b03 100644
--- a/qemu-img.c
+++ b/qemu-img.c
@@ -946,7 +946,6 @@ static int img_commit(int argc, char **argv)
     Error *local_err = NULL;
     CommonBlockJobCBInfo cbi;
     bool image_opts = false;
-    AioContext *aio_context;
     int64_t rate_limit = 0;
 
     fmt = NULL;
@@ -1060,12 +1059,9 @@ static int img_commit(int argc, char **argv)
         .bs   = bs,
     };
 
-    aio_context = bdrv_get_aio_context(bs);
-    aio_context_acquire(aio_context);
     commit_active_start("commit", bs, base_bs, JOB_DEFAULT, rate_limit,
                         BLOCKDEV_ON_ERROR_REPORT, NULL, common_block_job_cb,
                         &cbi, false, &local_err);
-    aio_context_release(aio_context);
     if (local_err) {
         goto done;
     }
-- 
2.31.1



^ permalink raw reply related	[flat|nested] 33+ messages in thread

* Re: [RFC PATCH 0/6] job: replace AioContext lock with job_mutex
  2021-07-07 16:58 [RFC PATCH 0/6] job: replace AioContext lock with job_mutex Emanuele Giuseppe Esposito
                   ` (5 preceding siblings ...)
  2021-07-07 16:58 ` [RFC PATCH 6/6] jobs: remove unnecessary AioContext aquire/release pairs Emanuele Giuseppe Esposito
@ 2021-07-08 10:36 ` Stefan Hajnoczi
  2021-07-08 11:32   ` Paolo Bonzini
  2021-07-08 13:09 ` Stefan Hajnoczi
  7 siblings, 1 reply; 33+ messages in thread
From: Stefan Hajnoczi @ 2021-07-08 10:36 UTC (permalink / raw)
  To: Emanuele Giuseppe Esposito
  Cc: Kevin Wolf, Vladimir Sementsov-Ogievskiy, qemu-block, qemu-devel,
	Wen Congyang, Xie Changlong, Markus Armbruster, Max Reitz,
	Paolo Bonzini, John Snow

[-- Attachment #1: Type: text/plain, Size: 2680 bytes --]

On Wed, Jul 07, 2021 at 06:58:07PM +0200, Emanuele Giuseppe Esposito wrote:
> This is a continuation on the work to reduce (and possibly get rid of) the usage of AioContext lock, by introducing smaller granularity locks to keep the thread safety.
> 
> This series aims to:
> 1) remove the aiocontext lock and substitute it with the already existing
>    global job_mutex
> 2) fix what it looks like to be an oversight when moving the blockjob.c logic
>    into the more generic job.c: job_mutex was introduced especially to
>    protect job->busy flag, but it seems that it was not used in successive
>    patches, because there are multiple code sections that directly
>    access the field without any locking.
> 3) use job_mutex instead of the aiocontext_lock
> 4) extend the reach of the job_mutex to protect all shared fields
>    that the job structure has.
> 
> The reason why we propose to use the existing job_mutex and not make one for
> each job is to keep things as simple as possible for now, and because
> the jobs are not in the execution critical path, so we can affort
> some delays.
> Having a lock per job would increase overall complexity and
> increase the chances of deadlocks (one good example could be the job
> transactions, where multiple jobs are grouped together).
> Anyways, the per-job mutex can always be added in the future.
> 
> Patch 1-4 are in preparation for patch 5. They try to simplify and clarify
> the job_mutex usage. Patch 5 tries to add proper syncronization to the job
> structure, replacing the AioContext lock when necessary.
> Patch 6 just removes unnecessary AioContext locks that are now unneeded.
> 
> 
> RFC: I am not sure the way I layed out the locks is ideal.
> But their usage should not make deadlocks. I also made sure
> the series passess all qemu_iotests.
> 
> What is very clear from this patch is that it
> is strictly related to the brdv_* and lower level calls, because
> they also internally check or even use the aiocontext lock.
> Therefore, in order to make it work, I temporarly added some
> aiocontext_acquire/release pair around the function that
> still assert for them or assume they are hold and temporarly
> unlock (unlock() - lock()).

Sounds like the issue is that this patch series assumes AioContext locks
are no longer required for calling the blk_*()/bdrv_*() APIs? That is
not the case yet, so you had to then add those aio_context_lock() calls
back in elsewhere. This approach introduces unnecessary risk. I think we
should wait until blk_*()/bdrv_*() no longer requires the caller to hold
the AioContext lock before applying this series.

Stefan

[-- Attachment #2: signature.asc --]
[-- Type: application/pgp-signature, Size: 488 bytes --]

^ permalink raw reply	[flat|nested] 33+ messages in thread

* Re: [RFC PATCH 2/6] job: _locked functions and public job_lock/unlock for next patch
  2021-07-07 16:58 ` [RFC PATCH 2/6] job: _locked functions and public job_lock/unlock for next patch Emanuele Giuseppe Esposito
@ 2021-07-08 10:50   ` Stefan Hajnoczi
  2021-07-12  8:43     ` Emanuele Giuseppe Esposito
  0 siblings, 1 reply; 33+ messages in thread
From: Stefan Hajnoczi @ 2021-07-08 10:50 UTC (permalink / raw)
  To: Emanuele Giuseppe Esposito
  Cc: Kevin Wolf, Vladimir Sementsov-Ogievskiy, qemu-block, qemu-devel,
	Wen Congyang, Xie Changlong, Markus Armbruster, Max Reitz,
	Paolo Bonzini, John Snow

[-- Attachment #1: Type: text/plain, Size: 1314 bytes --]

On Wed, Jul 07, 2021 at 06:58:09PM +0200, Emanuele Giuseppe Esposito wrote:
> diff --git a/job.c b/job.c
> index 872bbebb01..96fb8e9730 100644
> --- a/job.c
> +++ b/job.c
> @@ -32,6 +32,10 @@
>  #include "trace/trace-root.h"
>  #include "qapi/qapi-events-job.h"
>  
> +/* job_mutex protexts the jobs list, but also the job operations. */
> +static QemuMutex job_mutex;

It's unclear what protecting "job operations" means. I would prefer a
fine-grained per-job lock that protects the job's fields instead of a
global lock with an unclear scope.

> +
> +/* Protected by job_mutex */
>  static QLIST_HEAD(, Job) jobs = QLIST_HEAD_INITIALIZER(jobs);
>  
>  /* Job State Transition Table */
> @@ -64,27 +68,22 @@ bool JobVerbTable[JOB_VERB__MAX][JOB_STATUS__MAX] = {
>  /* Transactional group of jobs */
>  struct JobTxn {
>  
> -    /* Is this txn being cancelled? */
> +    /* Is this txn being cancelled? Atomic.*/
>      bool aborting;

The comment says atomic but this field is not accessed using atomic
operations (at least at this point in the patch series)?

>  
> -    /* List of jobs */
> +    /* List of jobs. Protected by job_mutex. */
>      QLIST_HEAD(, Job) jobs;
>  
> -    /* Reference count */
> +    /* Reference count. Atomic. */
>      int refcnt;

Same.

[-- Attachment #2: signature.asc --]
[-- Type: application/pgp-signature, Size: 488 bytes --]

^ permalink raw reply	[flat|nested] 33+ messages in thread

* Re: [RFC PATCH 3/6] job: minor changes to simplify locking
  2021-07-07 16:58 ` [RFC PATCH 3/6] job: minor changes to simplify locking Emanuele Giuseppe Esposito
@ 2021-07-08 10:55   ` Stefan Hajnoczi
  2021-07-12  8:43     ` Emanuele Giuseppe Esposito
  2021-07-13 17:56   ` Eric Blake
  1 sibling, 1 reply; 33+ messages in thread
From: Stefan Hajnoczi @ 2021-07-08 10:55 UTC (permalink / raw)
  To: Emanuele Giuseppe Esposito
  Cc: Kevin Wolf, Vladimir Sementsov-Ogievskiy, qemu-block, qemu-devel,
	Wen Congyang, Xie Changlong, Markus Armbruster, Max Reitz,
	Paolo Bonzini, John Snow

[-- Attachment #1: Type: text/plain, Size: 937 bytes --]

On Wed, Jul 07, 2021 at 06:58:10PM +0200, Emanuele Giuseppe Esposito wrote:
> @@ -406,15 +410,18 @@ void *job_create(const char *job_id, const JobDriver *driver, JobTxn *txn,
>              error_setg(errp, "Invalid job ID '%s'", job_id);
>              return NULL;
>          }
> -        if (job_get(job_id)) {
> -            error_setg(errp, "Job ID '%s' already in use", job_id);
> -            return NULL;
> -        }
>      } else if (!(flags & JOB_INTERNAL)) {
>          error_setg(errp, "An explicit job ID is required");
>          return NULL;
>      }
>  
> +    job_lock();
> +    if (job_get(job_id)) {
> +        error_setg(errp, "Job ID '%s' already in use", job_id);
> +        job_unlock();
> +        return NULL;
> +    }
> +

Where is the matching job_unlock() in the success case? Please consider
lock guard macros like QEMU_LOCK_GUARD()/WITH_QEMU_LOCK_GUARD() to
prevent common errors.

[-- Attachment #2: signature.asc --]
[-- Type: application/pgp-signature, Size: 488 bytes --]

^ permalink raw reply	[flat|nested] 33+ messages in thread

* Re: [RFC PATCH 4/6] job.h: categorize job fields
  2021-07-07 16:58 ` [RFC PATCH 4/6] job.h: categorize job fields Emanuele Giuseppe Esposito
@ 2021-07-08 11:02   ` Stefan Hajnoczi
  2021-07-12  8:43     ` Emanuele Giuseppe Esposito
  0 siblings, 1 reply; 33+ messages in thread
From: Stefan Hajnoczi @ 2021-07-08 11:02 UTC (permalink / raw)
  To: Emanuele Giuseppe Esposito
  Cc: Kevin Wolf, Vladimir Sementsov-Ogievskiy, qemu-block, qemu-devel,
	Wen Congyang, Xie Changlong, Markus Armbruster, Max Reitz,
	Paolo Bonzini, John Snow

[-- Attachment #1: Type: text/plain, Size: 344 bytes --]

On Wed, Jul 07, 2021 at 06:58:11PM +0200, Emanuele Giuseppe Esposito wrote:
> -    /** AioContext to run the job coroutine in */
> +    /**
> +     * AioContext to run the job coroutine in.
> +     * Atomic.
> +     */
>      AioContext *aio_context;

This isn't accessed using atomic operations, so I'm not sure why it's
documented as atomic?

[-- Attachment #2: signature.asc --]
[-- Type: application/pgp-signature, Size: 488 bytes --]

^ permalink raw reply	[flat|nested] 33+ messages in thread

* Re: [RFC PATCH 0/6] job: replace AioContext lock with job_mutex
  2021-07-08 10:36 ` [RFC PATCH 0/6] job: replace AioContext lock with job_mutex Stefan Hajnoczi
@ 2021-07-08 11:32   ` Paolo Bonzini
  2021-07-08 12:14     ` Kevin Wolf
  2021-07-08 13:04     ` Stefan Hajnoczi
  0 siblings, 2 replies; 33+ messages in thread
From: Paolo Bonzini @ 2021-07-08 11:32 UTC (permalink / raw)
  To: Stefan Hajnoczi, Emanuele Giuseppe Esposito
  Cc: Kevin Wolf, Vladimir Sementsov-Ogievskiy, qemu-block,
	Wen Congyang, Xie Changlong, qemu-devel, Markus Armbruster,
	Max Reitz, John Snow

On 08/07/21 12:36, Stefan Hajnoczi wrote:
>> What is very clear from this patch is that it
>> is strictly related to the brdv_* and lower level calls, because
>> they also internally check or even use the aiocontext lock.
>> Therefore, in order to make it work, I temporarly added some
>> aiocontext_acquire/release pair around the function that
>> still assert for them or assume they are hold and temporarly
>> unlock (unlock() - lock()).
>
> Sounds like the issue is that this patch series assumes AioContext locks
> are no longer required for calling the blk_*()/bdrv_*() APIs? That is
> not the case yet, so you had to then add those aio_context_lock() calls
> back in elsewhere. This approach introduces unnecessary risk. I think we
> should wait until blk_*()/bdrv_*() no longer requires the caller to hold
> the AioContext lock before applying this series.

In general I'm in favor of pushing the lock further down into smaller 
and smaller critical sections; it's a good approach to make further 
audits easier until it's "obvious" that the lock is unnecessary.  I 
haven't yet reviewed Emanuele's patches to see if this is what he's 
doing where he's adding the acquire/release calls, but that's my 
understanding of both his cover letter and your reply.

The I/O blk_*()/bdrv_*() *should* not require the caller to hold the 
AioContext lock; all drivers use their own CoMutex or QemuMutex when 
needed, and generic code should also be ready (caveat emptor).  Others, 
such as reopen, are a mess that requires a separate audit.  Restricting 
acquire/release to be only around those seems like a good starting point.

Paolo



^ permalink raw reply	[flat|nested] 33+ messages in thread

* Re: [RFC PATCH 0/6] job: replace AioContext lock with job_mutex
  2021-07-08 11:32   ` Paolo Bonzini
@ 2021-07-08 12:14     ` Kevin Wolf
  2021-07-08 13:04     ` Stefan Hajnoczi
  1 sibling, 0 replies; 33+ messages in thread
From: Kevin Wolf @ 2021-07-08 12:14 UTC (permalink / raw)
  To: Paolo Bonzini
  Cc: Emanuele Giuseppe Esposito, Vladimir Sementsov-Ogievskiy,
	qemu-block, Wen Congyang, Xie Changlong, qemu-devel,
	Markus Armbruster, Stefan Hajnoczi, Max Reitz, John Snow

Am 08.07.2021 um 13:32 hat Paolo Bonzini geschrieben:
> On 08/07/21 12:36, Stefan Hajnoczi wrote:
> > > What is very clear from this patch is that it
> > > is strictly related to the brdv_* and lower level calls, because
> > > they also internally check or even use the aiocontext lock.
> > > Therefore, in order to make it work, I temporarly added some
> > > aiocontext_acquire/release pair around the function that
> > > still assert for them or assume they are hold and temporarly
> > > unlock (unlock() - lock()).
> > 
> > Sounds like the issue is that this patch series assumes AioContext locks
> > are no longer required for calling the blk_*()/bdrv_*() APIs? That is
> > not the case yet, so you had to then add those aio_context_lock() calls
> > back in elsewhere. This approach introduces unnecessary risk. I think we
> > should wait until blk_*()/bdrv_*() no longer requires the caller to hold
> > the AioContext lock before applying this series.
> 
> In general I'm in favor of pushing the lock further down into smaller and
> smaller critical sections; it's a good approach to make further audits
> easier until it's "obvious" that the lock is unnecessary.  I haven't yet
> reviewed Emanuele's patches to see if this is what he's doing where he's
> adding the acquire/release calls, but that's my understanding of both his
> cover letter and your reply.
> 
> The I/O blk_*()/bdrv_*() *should* not require the caller to hold the
> AioContext lock; all drivers use their own CoMutex or QemuMutex when needed,
> and generic code should also be ready (caveat emptor).  Others, such as
> reopen, are a mess that requires a separate audit.  Restricting
> acquire/release to be only around those seems like a good starting point.

Reopen isn't just a mess, but in fact buggy. After the following patch
goes in, the rule is simple: Don't hold any AioContext locks when
calling bdrv_reopen_multiple().

    'block: Acquire AioContexts during bdrv_reopen_multiple()'
    https://lists.gnu.org/archive/html/qemu-block/2021-07/msg00238.html

It still takes AioContext locks when it calls into other functions that
currently expect it, but that should be the same as usual then.

And once callers don't even hold the lock in the first place, we'll also
get rid of the ugly temporary lock release across reopen.

Kevin



^ permalink raw reply	[flat|nested] 33+ messages in thread

* Re: [RFC PATCH 5/6] job: use global job_mutex to protect struct Job
  2021-07-07 16:58 ` [RFC PATCH 5/6] job: use global job_mutex to protect struct Job Emanuele Giuseppe Esposito
@ 2021-07-08 12:56   ` Stefan Hajnoczi
  2021-07-12  8:43     ` Emanuele Giuseppe Esposito
  0 siblings, 1 reply; 33+ messages in thread
From: Stefan Hajnoczi @ 2021-07-08 12:56 UTC (permalink / raw)
  To: Emanuele Giuseppe Esposito
  Cc: Kevin Wolf, Vladimir Sementsov-Ogievskiy, qemu-block, qemu-devel,
	Wen Congyang, Xie Changlong, Markus Armbruster, Max Reitz,
	Paolo Bonzini, John Snow

[-- Attachment #1: Type: text/plain, Size: 4450 bytes --]

On Wed, Jul 07, 2021 at 06:58:12PM +0200, Emanuele Giuseppe Esposito wrote:
> This lock is going to replace most of the AioContext locks
> in the job and blockjob, so that a Job can run in an arbitrary
> AioContext.
> 
> Signed-off-by: Emanuele Giuseppe Esposito <eesposit@redhat.com>
> ---
>  include/block/blockjob_int.h |   1 +
>  include/qemu/job.h           |   2 +
>  block/backup.c               |   4 +
>  block/mirror.c               |  11 +-
>  blockdev.c                   |  62 ++++----
>  blockjob.c                   |  67 +++++++--
>  job-qmp.c                    |  55 +++----
>  job.c                        | 284 +++++++++++++++++++++++++++--------
>  qemu-img.c                   |  15 +-
>  9 files changed, 350 insertions(+), 151 deletions(-)
> 
> diff --git a/include/block/blockjob_int.h b/include/block/blockjob_int.h
> index 6633d83da2..8b91126506 100644
> --- a/include/block/blockjob_int.h
> +++ b/include/block/blockjob_int.h
> @@ -53,6 +53,7 @@ struct BlockJobDriver {
>       */
>      void (*attached_aio_context)(BlockJob *job, AioContext *new_context);
>  
> +    /* Called with job mutex *not* held. */
>      void (*set_speed)(BlockJob *job, int64_t speed);
>  };
>  
> diff --git a/include/qemu/job.h b/include/qemu/job.h
> index 4421d08d93..359f4e6b3a 100644
> --- a/include/qemu/job.h
> +++ b/include/qemu/job.h
> @@ -49,6 +49,8 @@ typedef struct Job {
>      /**
>       * The type of this job.
>       * Set it in job_create and just read.
> +     * All calls to the driver function must be not locked by job_mutex,
> +     * to avoid deadlocks.
>       */
>      const JobDriver *driver;
>  
> diff --git a/block/backup.c b/block/backup.c
> index bd3614ce70..80ce956299 100644
> --- a/block/backup.c
> +++ b/block/backup.c
> @@ -315,6 +315,10 @@ static void coroutine_fn backup_pause(Job *job)
>      }
>  }
>  
> +/*
> + * Called with job mutex *not* held (we don't want to call block_copy_kick
> + * with the lock held!)
> + */
>  static void coroutine_fn backup_set_speed(BlockJob *job, int64_t speed)
>  {
>      BackupBlockJob *s = container_of(job, BackupBlockJob, common);
> diff --git a/block/mirror.c b/block/mirror.c
> index 49aaaafffa..deefaa6a39 100644
> --- a/block/mirror.c
> +++ b/block/mirror.c
> @@ -1150,9 +1150,11 @@ static void mirror_complete(Job *job, Error **errp)
>      s->should_complete = true;
>  
>      /* If the job is paused, it will be re-entered when it is resumed */
> +    job_lock();
>      if (!job_is_paused(job)) {
> -        job_enter(job);
> +        job_enter_locked(job);
>      }
> +    job_unlock();
>  }
>  
>  static void coroutine_fn mirror_pause(Job *job)
> @@ -1171,10 +1173,13 @@ static bool mirror_drained_poll(BlockJob *job)
>       * from one of our own drain sections, to avoid a deadlock waiting for
>       * ourselves.
>       */
> -    if (!job_is_paused(&s->common.job) && !job_is_cancelled(&s->common.job) &&
> -        !s->in_drain) {
> +    job_lock();
> +    if (!job_is_paused(&s->common.job) &&
> +        !job_is_cancelled_locked(&s->common.job) && !s->in_drain) {
> +        job_unlock();
>          return true;
>      }
> +    job_unlock();
>  
>      return !!s->in_flight;
>  }
> diff --git a/blockdev.c b/blockdev.c
> index 8e2c15370e..9255aea6a2 100644
> --- a/blockdev.c
> +++ b/blockdev.c
> @@ -150,9 +150,11 @@ void blockdev_mark_auto_del(BlockBackend *blk)
>              AioContext *aio_context = job_get_aiocontext(&job->job);
>              aio_context_acquire(aio_context);
>  
> +            job_lock();
>              job_cancel(&job->job, false);
>  
>              aio_context_release(aio_context);
> +            job_unlock();

This looks strange. The way it's written suggests there is a reason why
job_unlock() has to be called after aio_context_release(). Can
job_unlock() be called immediately after job_cancel()?

>          }
>      }
>  
> @@ -3309,48 +3311,44 @@ out:
>      aio_context_release(aio_context);
>  }
>  
> -/* Get a block job using its ID and acquire its AioContext */
> -static BlockJob *find_block_job(const char *id, AioContext **aio_context,
> -                                Error **errp)
> +/* Get a block job using its ID and acquire its job_lock */

"its" suggests job_lock is per-Job. I suggest saying something like
"Returns with job_lock held on success" instead.

[-- Attachment #2: signature.asc --]
[-- Type: application/pgp-signature, Size: 488 bytes --]

^ permalink raw reply	[flat|nested] 33+ messages in thread

* Re: [RFC PATCH 0/6] job: replace AioContext lock with job_mutex
  2021-07-08 11:32   ` Paolo Bonzini
  2021-07-08 12:14     ` Kevin Wolf
@ 2021-07-08 13:04     ` Stefan Hajnoczi
  2021-07-12  8:41       ` Emanuele Giuseppe Esposito
  1 sibling, 1 reply; 33+ messages in thread
From: Stefan Hajnoczi @ 2021-07-08 13:04 UTC (permalink / raw)
  To: Paolo Bonzini
  Cc: Emanuele Giuseppe Esposito, Kevin Wolf,
	Vladimir Sementsov-Ogievskiy, qemu-block, Wen Congyang,
	Xie Changlong, qemu-devel, Markus Armbruster, Max Reitz,
	John Snow

[-- Attachment #1: Type: text/plain, Size: 1996 bytes --]

On Thu, Jul 08, 2021 at 01:32:12PM +0200, Paolo Bonzini wrote:
> On 08/07/21 12:36, Stefan Hajnoczi wrote:
> > > What is very clear from this patch is that it
> > > is strictly related to the brdv_* and lower level calls, because
> > > they also internally check or even use the aiocontext lock.
> > > Therefore, in order to make it work, I temporarly added some
> > > aiocontext_acquire/release pair around the function that
> > > still assert for them or assume they are hold and temporarly
> > > unlock (unlock() - lock()).
> > 
> > Sounds like the issue is that this patch series assumes AioContext locks
> > are no longer required for calling the blk_*()/bdrv_*() APIs? That is
> > not the case yet, so you had to then add those aio_context_lock() calls
> > back in elsewhere. This approach introduces unnecessary risk. I think we
> > should wait until blk_*()/bdrv_*() no longer requires the caller to hold
> > the AioContext lock before applying this series.
> 
> In general I'm in favor of pushing the lock further down into smaller and
> smaller critical sections; it's a good approach to make further audits
> easier until it's "obvious" that the lock is unnecessary.  I haven't yet
> reviewed Emanuele's patches to see if this is what he's doing where he's
> adding the acquire/release calls, but that's my understanding of both his
> cover letter and your reply.

The problem is the unnecessary risk. We know what the goal is for
blk_*()/bdrv_*() but it's not quite there yet. Does making changes in
block jobs help solve the final issues with blk_*()/bdrv_*()?

If yes, then it's a risk worth taking. If no, then spending time
developing interim code, reviewing those patches, and risking breakage
doesn't seem worth it. I'd rather wait for blk_*()/bdrv_*() to be fully
complete and then see patches that delete aio_context_acquire() in most
places or add locks in the remaining places where the caller was relying
on the AioContext lock.

Stefan

[-- Attachment #2: signature.asc --]
[-- Type: application/pgp-signature, Size: 488 bytes --]

^ permalink raw reply	[flat|nested] 33+ messages in thread

* Re: [RFC PATCH 0/6] job: replace AioContext lock with job_mutex
  2021-07-07 16:58 [RFC PATCH 0/6] job: replace AioContext lock with job_mutex Emanuele Giuseppe Esposito
                   ` (6 preceding siblings ...)
  2021-07-08 10:36 ` [RFC PATCH 0/6] job: replace AioContext lock with job_mutex Stefan Hajnoczi
@ 2021-07-08 13:09 ` Stefan Hajnoczi
  2021-07-12  8:42   ` Emanuele Giuseppe Esposito
  7 siblings, 1 reply; 33+ messages in thread
From: Stefan Hajnoczi @ 2021-07-08 13:09 UTC (permalink / raw)
  To: Emanuele Giuseppe Esposito
  Cc: Kevin Wolf, Vladimir Sementsov-Ogievskiy, qemu-block, qemu-devel,
	Wen Congyang, Xie Changlong, Markus Armbruster, Max Reitz,
	Paolo Bonzini, John Snow

[-- Attachment #1: Type: text/plain, Size: 1209 bytes --]

On Wed, Jul 07, 2021 at 06:58:07PM +0200, Emanuele Giuseppe Esposito wrote:
> This is a continuation on the work to reduce (and possibly get rid of) the usage of AioContext lock, by introducing smaller granularity locks to keep the thread safety.
> 
> This series aims to:
> 1) remove the aiocontext lock and substitute it with the already existing
>    global job_mutex
> 2) fix what it looks like to be an oversight when moving the blockjob.c logic
>    into the more generic job.c: job_mutex was introduced especially to
>    protect job->busy flag, but it seems that it was not used in successive
>    patches, because there are multiple code sections that directly
>    access the field without any locking.
> 3) use job_mutex instead of the aiocontext_lock
> 4) extend the reach of the job_mutex to protect all shared fields
>    that the job structure has.

Can you explain the big picture:

1. What are the rules for JobDrivers? Imagine you are implementing a new
   JobDriver. What do you need to know in order to write correct code?

2. What are the rules for monitor? The main pattern is looking up a job,
   invoking a job API on it, and then calling job_unlock().

Stefan

[-- Attachment #2: signature.asc --]
[-- Type: application/pgp-signature, Size: 488 bytes --]

^ permalink raw reply	[flat|nested] 33+ messages in thread

* Re: [RFC PATCH 0/6] job: replace AioContext lock with job_mutex
  2021-07-08 13:04     ` Stefan Hajnoczi
@ 2021-07-12  8:41       ` Emanuele Giuseppe Esposito
  2021-07-13 13:10         ` Stefan Hajnoczi
  0 siblings, 1 reply; 33+ messages in thread
From: Emanuele Giuseppe Esposito @ 2021-07-12  8:41 UTC (permalink / raw)
  To: Stefan Hajnoczi, Paolo Bonzini
  Cc: Kevin Wolf, Vladimir Sementsov-Ogievskiy, qemu-block,
	Wen Congyang, Xie Changlong, qemu-devel, Markus Armbruster,
	Max Reitz, John Snow



On 08/07/2021 15:04, Stefan Hajnoczi wrote:
> On Thu, Jul 08, 2021 at 01:32:12PM +0200, Paolo Bonzini wrote:
>> On 08/07/21 12:36, Stefan Hajnoczi wrote:
>>>> What is very clear from this patch is that it
>>>> is strictly related to the brdv_* and lower level calls, because
>>>> they also internally check or even use the aiocontext lock.
>>>> Therefore, in order to make it work, I temporarly added some
>>>> aiocontext_acquire/release pair around the function that
>>>> still assert for them or assume they are hold and temporarly
>>>> unlock (unlock() - lock()).
>>>
>>> Sounds like the issue is that this patch series assumes AioContext locks
>>> are no longer required for calling the blk_*()/bdrv_*() APIs? That is
>>> not the case yet, so you had to then add those aio_context_lock() calls
>>> back in elsewhere. This approach introduces unnecessary risk. I think we
>>> should wait until blk_*()/bdrv_*() no longer requires the caller to hold
>>> the AioContext lock before applying this series.
>>
>> In general I'm in favor of pushing the lock further down into smaller and
>> smaller critical sections; it's a good approach to make further audits
>> easier until it's "obvious" that the lock is unnecessary.  I haven't yet
>> reviewed Emanuele's patches to see if this is what he's doing where he's
>> adding the acquire/release calls, but that's my understanding of both his
>> cover letter and your reply.
> 
> The problem is the unnecessary risk. We know what the goal is for
> blk_*()/bdrv_*() but it's not quite there yet. Does making changes in
> block jobs help solve the final issues with blk_*()/bdrv_*()?

Correct me if I am wrong, but it seems to me that the bdrv_*()/blk_*() 
operation mostly take care of building, modifying and walking the bds 
graph. So since graph nodes can have multiple AioContext, it makes sense 
that we have a lock when modifying the graph, right?

If so, we can simply try to replace the AioContext lock with a graph 
lock, or something like that. But I am not sure of this.

Emanuele
> 
> If yes, then it's a risk worth taking. If no, then spending time
> developing interim code, reviewing those patches, and risking breakage
> doesn't seem worth it. I'd rather wait for blk_*()/bdrv_*() to be fully
> complete and then see patches that delete aio_context_acquire() in most
> places or add locks in the remaining places where the caller was relying
> on the AioContext lock.
> 
> Stefan
> 



^ permalink raw reply	[flat|nested] 33+ messages in thread

* Re: [RFC PATCH 0/6] job: replace AioContext lock with job_mutex
  2021-07-08 13:09 ` Stefan Hajnoczi
@ 2021-07-12  8:42   ` Emanuele Giuseppe Esposito
  2021-07-13 13:27     ` Stefan Hajnoczi
  0 siblings, 1 reply; 33+ messages in thread
From: Emanuele Giuseppe Esposito @ 2021-07-12  8:42 UTC (permalink / raw)
  To: Stefan Hajnoczi
  Cc: Kevin Wolf, Vladimir Sementsov-Ogievskiy, qemu-block, qemu-devel,
	Wen Congyang, Xie Changlong, Markus Armbruster, Max Reitz,
	Paolo Bonzini, John Snow



On 08/07/2021 15:09, Stefan Hajnoczi wrote:
> On Wed, Jul 07, 2021 at 06:58:07PM +0200, Emanuele Giuseppe Esposito wrote:
>> This is a continuation on the work to reduce (and possibly get rid of) the usage of AioContext lock, by introducing smaller granularity locks to keep the thread safety.
>>
>> This series aims to:
>> 1) remove the aiocontext lock and substitute it with the already existing
>>     global job_mutex
>> 2) fix what it looks like to be an oversight when moving the blockjob.c logic
>>     into the more generic job.c: job_mutex was introduced especially to
>>     protect job->busy flag, but it seems that it was not used in successive
>>     patches, because there are multiple code sections that directly
>>     access the field without any locking.
>> 3) use job_mutex instead of the aiocontext_lock
>> 4) extend the reach of the job_mutex to protect all shared fields
>>     that the job structure has.
> 
> Can you explain the big picture:
> 
> 1. What are the rules for JobDrivers? Imagine you are implementing a new
>     JobDriver. What do you need to know in order to write correct code?

I think that in general, the rules for JobDrivers remain the same. The 
job_mutex lock should be invisible (or almost) from the point of view of 
a JobDriver, because the job API available for it should take care of 
the necessary locking/unlocking.

> 
> 2. What are the rules for monitor? The main pattern is looking up a job,
>     invoking a job API on it, and then calling job_unlock().

The monitor instead is aware of this lock: the reason for that is 
exactly what you have described here.
Looking up + invoking a job API operation (for example calling 
find_job() and then job_pause() ) must be performed with the same lock 
hold all the time, otherwise other threads could modify the job while 
the monitor runs its command.

Please let me know if something is not clear and/or if you have 
additional comments on this!

Emanuele

> 
> Stefan
> 



^ permalink raw reply	[flat|nested] 33+ messages in thread

* Re: [RFC PATCH 2/6] job: _locked functions and public job_lock/unlock for next patch
  2021-07-08 10:50   ` Stefan Hajnoczi
@ 2021-07-12  8:43     ` Emanuele Giuseppe Esposito
  2021-07-13 13:32       ` Stefan Hajnoczi
  0 siblings, 1 reply; 33+ messages in thread
From: Emanuele Giuseppe Esposito @ 2021-07-12  8:43 UTC (permalink / raw)
  To: Stefan Hajnoczi
  Cc: Kevin Wolf, Vladimir Sementsov-Ogievskiy, qemu-block, qemu-devel,
	Wen Congyang, Xie Changlong, Markus Armbruster, Max Reitz,
	Paolo Bonzini, John Snow



On 08/07/2021 12:50, Stefan Hajnoczi wrote:
> On Wed, Jul 07, 2021 at 06:58:09PM +0200, Emanuele Giuseppe Esposito wrote:
>> diff --git a/job.c b/job.c
>> index 872bbebb01..96fb8e9730 100644
>> --- a/job.c
>> +++ b/job.c
>> @@ -32,6 +32,10 @@
>>   #include "trace/trace-root.h"
>>   #include "qapi/qapi-events-job.h"
>>   
>> +/* job_mutex protexts the jobs list, but also the job operations. */
>> +static QemuMutex job_mutex;
> 
> It's unclear what protecting "job operations" means. I would prefer a
> fine-grained per-job lock that protects the job's fields instead of a
> global lock with an unclear scope.

As I wrote in the cover letter, I wanted to try to keep things as simple 
as possible with a global lock. It is possible to try and have a per-job 
lock, but I don't know how complex will that be then.
I will try and see what I can do.

Maybe "job_mutex protexts the jobs list, but also makes the job API 
thread-safe"?

> 
>> +
>> +/* Protected by job_mutex */
>>   static QLIST_HEAD(, Job) jobs = QLIST_HEAD_INITIALIZER(jobs);
>>   
>>   /* Job State Transition Table */
>> @@ -64,27 +68,22 @@ bool JobVerbTable[JOB_VERB__MAX][JOB_STATUS__MAX] = {
>>   /* Transactional group of jobs */
>>   struct JobTxn {
>>   
>> -    /* Is this txn being cancelled? */
>> +    /* Is this txn being cancelled? Atomic.*/
>>       bool aborting;
> 
> The comment says atomic but this field is not accessed using atomic
> operations (at least at this point in the patch series)?

Yes sorry I messed up the hunks in one-two patches. These comments were 
supposed to be on patch 4 "job.h: categorize job fields". Even though 
that might also not be ideal, since that patch just introduces the 
comments, without applying the locking/protection yet.
On the other side, if I merge everything together in patch 5, it will be 
even harder to read.

Emanuele
> 
>>   
>> -    /* List of jobs */
>> +    /* List of jobs. Protected by job_mutex. */
>>       QLIST_HEAD(, Job) jobs;
>>   
>> -    /* Reference count */
>> +    /* Reference count. Atomic. */
>>       int refcnt;
> 
> Same.
> 



^ permalink raw reply	[flat|nested] 33+ messages in thread

* Re: [RFC PATCH 3/6] job: minor changes to simplify locking
  2021-07-08 10:55   ` Stefan Hajnoczi
@ 2021-07-12  8:43     ` Emanuele Giuseppe Esposito
  0 siblings, 0 replies; 33+ messages in thread
From: Emanuele Giuseppe Esposito @ 2021-07-12  8:43 UTC (permalink / raw)
  To: Stefan Hajnoczi
  Cc: Kevin Wolf, Vladimir Sementsov-Ogievskiy, qemu-block, qemu-devel,
	Wen Congyang, Xie Changlong, Markus Armbruster, Max Reitz,
	Paolo Bonzini, John Snow



On 08/07/2021 12:55, Stefan Hajnoczi wrote:
> On Wed, Jul 07, 2021 at 06:58:10PM +0200, Emanuele Giuseppe Esposito wrote:
>> @@ -406,15 +410,18 @@ void *job_create(const char *job_id, const JobDriver *driver, JobTxn *txn,
>>               error_setg(errp, "Invalid job ID '%s'", job_id);
>>               return NULL;
>>           }
>> -        if (job_get(job_id)) {
>> -            error_setg(errp, "Job ID '%s' already in use", job_id);
>> -            return NULL;
>> -        }
>>       } else if (!(flags & JOB_INTERNAL)) {
>>           error_setg(errp, "An explicit job ID is required");
>>           return NULL;
>>       }
>>   
>> +    job_lock();
>> +    if (job_get(job_id)) {
>> +        error_setg(errp, "Job ID '%s' already in use", job_id);
>> +        job_unlock();
>> +        return NULL;
>> +    }
>> +
> 
> Where is the matching job_unlock() in the success case? Please consider
> lock guard macros like QEMU_LOCK_GUARD()/WITH_QEMU_LOCK_GUARD() to
> prevent common errors.
> 
Again this is a dumb mistake, the job_lock/unlock lines should go on 
patch 5, not here. Apologies.

I did not use QEMU_LOCK_GUARD()/WITH_QEMU_LOCK_GUARD() yet because I 
added some assertions to make sure I also didn't create nested locking 
situations. The final version will certainly use them.

Emanuele



^ permalink raw reply	[flat|nested] 33+ messages in thread

* Re: [RFC PATCH 4/6] job.h: categorize job fields
  2021-07-08 11:02   ` Stefan Hajnoczi
@ 2021-07-12  8:43     ` Emanuele Giuseppe Esposito
  0 siblings, 0 replies; 33+ messages in thread
From: Emanuele Giuseppe Esposito @ 2021-07-12  8:43 UTC (permalink / raw)
  To: Stefan Hajnoczi
  Cc: Kevin Wolf, Vladimir Sementsov-Ogievskiy, qemu-block, qemu-devel,
	Wen Congyang, Xie Changlong, Markus Armbruster, Max Reitz,
	Paolo Bonzini, John Snow



On 08/07/2021 13:02, Stefan Hajnoczi wrote:
> On Wed, Jul 07, 2021 at 06:58:11PM +0200, Emanuele Giuseppe Esposito wrote:
>> -    /** AioContext to run the job coroutine in */
>> +    /**
>> +     * AioContext to run the job coroutine in.
>> +     * Atomic.
>> +     */
>>       AioContext *aio_context;
> 
> This isn't accessed using atomic operations, so I'm not sure why it's
> documented as atomic?
> 
Maybe this is unnecessary, but from what I understand right now when we 
want to change the AioContext of a child node, we need to acquire its 
AioContext lock, and then we try to set it. Without AioContext locks, my 
understanding is that this has to be protected somehow. Therefore I 
thought of setting this pointer atomically (actual code that does this 
is in patch 5).

Again same reasoning style applies here: this patch just adds a bounce 
of comments, but merging it with next one would just make it more 
unreadable.

Emanuele



^ permalink raw reply	[flat|nested] 33+ messages in thread

* Re: [RFC PATCH 5/6] job: use global job_mutex to protect struct Job
  2021-07-08 12:56   ` Stefan Hajnoczi
@ 2021-07-12  8:43     ` Emanuele Giuseppe Esposito
  0 siblings, 0 replies; 33+ messages in thread
From: Emanuele Giuseppe Esposito @ 2021-07-12  8:43 UTC (permalink / raw)
  To: Stefan Hajnoczi
  Cc: Kevin Wolf, Vladimir Sementsov-Ogievskiy, qemu-block, qemu-devel,
	Wen Congyang, Xie Changlong, Markus Armbruster, Max Reitz,
	Paolo Bonzini, John Snow



On 08/07/2021 14:56, Stefan Hajnoczi wrote:
> On Wed, Jul 07, 2021 at 06:58:12PM +0200, Emanuele Giuseppe Esposito wrote:
>> This lock is going to replace most of the AioContext locks
>> in the job and blockjob, so that a Job can run in an arbitrary
>> AioContext.
>>
>> Signed-off-by: Emanuele Giuseppe Esposito <eesposit@redhat.com>
>> ---
>>   include/block/blockjob_int.h |   1 +
>>   include/qemu/job.h           |   2 +
>>   block/backup.c               |   4 +
>>   block/mirror.c               |  11 +-
>>   blockdev.c                   |  62 ++++----
>>   blockjob.c                   |  67 +++++++--
>>   job-qmp.c                    |  55 +++----
>>   job.c                        | 284 +++++++++++++++++++++++++++--------
>>   qemu-img.c                   |  15 +-
>>   9 files changed, 350 insertions(+), 151 deletions(-)
>>
>> diff --git a/include/block/blockjob_int.h b/include/block/blockjob_int.h
>> index 6633d83da2..8b91126506 100644
>> --- a/include/block/blockjob_int.h
>> +++ b/include/block/blockjob_int.h
>> @@ -53,6 +53,7 @@ struct BlockJobDriver {
>>        */
>>       void (*attached_aio_context)(BlockJob *job, AioContext *new_context);
>>   
>> +    /* Called with job mutex *not* held. */
>>       void (*set_speed)(BlockJob *job, int64_t speed);
>>   };
>>   
>> diff --git a/include/qemu/job.h b/include/qemu/job.h
>> index 4421d08d93..359f4e6b3a 100644
>> --- a/include/qemu/job.h
>> +++ b/include/qemu/job.h
>> @@ -49,6 +49,8 @@ typedef struct Job {
>>       /**
>>        * The type of this job.
>>        * Set it in job_create and just read.
>> +     * All calls to the driver function must be not locked by job_mutex,
>> +     * to avoid deadlocks.
>>        */
>>       const JobDriver *driver;
>>   
>> diff --git a/block/backup.c b/block/backup.c
>> index bd3614ce70..80ce956299 100644
>> --- a/block/backup.c
>> +++ b/block/backup.c
>> @@ -315,6 +315,10 @@ static void coroutine_fn backup_pause(Job *job)
>>       }
>>   }
>>   
>> +/*
>> + * Called with job mutex *not* held (we don't want to call block_copy_kick
>> + * with the lock held!)
>> + */
>>   static void coroutine_fn backup_set_speed(BlockJob *job, int64_t speed)
>>   {
>>       BackupBlockJob *s = container_of(job, BackupBlockJob, common);
>> diff --git a/block/mirror.c b/block/mirror.c
>> index 49aaaafffa..deefaa6a39 100644
>> --- a/block/mirror.c
>> +++ b/block/mirror.c
>> @@ -1150,9 +1150,11 @@ static void mirror_complete(Job *job, Error **errp)
>>       s->should_complete = true;
>>   
>>       /* If the job is paused, it will be re-entered when it is resumed */
>> +    job_lock();
>>       if (!job_is_paused(job)) {
>> -        job_enter(job);
>> +        job_enter_locked(job);
>>       }
>> +    job_unlock();
>>   }
>>   
>>   static void coroutine_fn mirror_pause(Job *job)
>> @@ -1171,10 +1173,13 @@ static bool mirror_drained_poll(BlockJob *job)
>>        * from one of our own drain sections, to avoid a deadlock waiting for
>>        * ourselves.
>>        */
>> -    if (!job_is_paused(&s->common.job) && !job_is_cancelled(&s->common.job) &&
>> -        !s->in_drain) {
>> +    job_lock();
>> +    if (!job_is_paused(&s->common.job) &&
>> +        !job_is_cancelled_locked(&s->common.job) && !s->in_drain) {
>> +        job_unlock();
>>           return true;
>>       }
>> +    job_unlock();
>>   
>>       return !!s->in_flight;
>>   }
>> diff --git a/blockdev.c b/blockdev.c
>> index 8e2c15370e..9255aea6a2 100644
>> --- a/blockdev.c
>> +++ b/blockdev.c
>> @@ -150,9 +150,11 @@ void blockdev_mark_auto_del(BlockBackend *blk)
>>               AioContext *aio_context = job_get_aiocontext(&job->job);
>>               aio_context_acquire(aio_context);
>>   
>> +            job_lock();
>>               job_cancel(&job->job, false);
>>   
>>               aio_context_release(aio_context);
>> +            job_unlock();
> 
> This looks strange. The way it's written suggests there is a reason why
> job_unlock() has to be called after aio_context_release(). Can
> job_unlock() be called immediately after job_cancel()?

Yes, another mistake I shouldn't have done.
> 
>>           }
>>       }
>>   
>> @@ -3309,48 +3311,44 @@ out:
>>       aio_context_release(aio_context);
>>   }
>>   
>> -/* Get a block job using its ID and acquire its AioContext */
>> -static BlockJob *find_block_job(const char *id, AioContext **aio_context,
>> -                                Error **errp)
>> +/* Get a block job using its ID and acquire its job_lock */
> 
> "its" suggests job_lock is per-Job. I suggest saying something like
> "Returns with job_lock held on success" instead.
> 
Agree. Changed the same comment also for find_job() in job-qmp.c

Emanuele



^ permalink raw reply	[flat|nested] 33+ messages in thread

* Re: [RFC PATCH 0/6] job: replace AioContext lock with job_mutex
  2021-07-12  8:41       ` Emanuele Giuseppe Esposito
@ 2021-07-13 13:10         ` Stefan Hajnoczi
  2021-07-13 15:18           ` Vladimir Sementsov-Ogievskiy
  2021-07-16 15:23           ` Kevin Wolf
  0 siblings, 2 replies; 33+ messages in thread
From: Stefan Hajnoczi @ 2021-07-13 13:10 UTC (permalink / raw)
  To: Emanuele Giuseppe Esposito
  Cc: Kevin Wolf, Vladimir Sementsov-Ogievskiy, qemu-block,
	Wen Congyang, Xie Changlong, qemu-devel, Markus Armbruster,
	Paolo Bonzini, Max Reitz, John Snow

[-- Attachment #1: Type: text/plain, Size: 3234 bytes --]

On Mon, Jul 12, 2021 at 10:41:46AM +0200, Emanuele Giuseppe Esposito wrote:
> 
> 
> On 08/07/2021 15:04, Stefan Hajnoczi wrote:
> > On Thu, Jul 08, 2021 at 01:32:12PM +0200, Paolo Bonzini wrote:
> > > On 08/07/21 12:36, Stefan Hajnoczi wrote:
> > > > > What is very clear from this patch is that it
> > > > > is strictly related to the brdv_* and lower level calls, because
> > > > > they also internally check or even use the aiocontext lock.
> > > > > Therefore, in order to make it work, I temporarly added some
> > > > > aiocontext_acquire/release pair around the function that
> > > > > still assert for them or assume they are hold and temporarly
> > > > > unlock (unlock() - lock()).
> > > > 
> > > > Sounds like the issue is that this patch series assumes AioContext locks
> > > > are no longer required for calling the blk_*()/bdrv_*() APIs? That is
> > > > not the case yet, so you had to then add those aio_context_lock() calls
> > > > back in elsewhere. This approach introduces unnecessary risk. I think we
> > > > should wait until blk_*()/bdrv_*() no longer requires the caller to hold
> > > > the AioContext lock before applying this series.
> > > 
> > > In general I'm in favor of pushing the lock further down into smaller and
> > > smaller critical sections; it's a good approach to make further audits
> > > easier until it's "obvious" that the lock is unnecessary.  I haven't yet
> > > reviewed Emanuele's patches to see if this is what he's doing where he's
> > > adding the acquire/release calls, but that's my understanding of both his
> > > cover letter and your reply.
> > 
> > The problem is the unnecessary risk. We know what the goal is for
> > blk_*()/bdrv_*() but it's not quite there yet. Does making changes in
> > block jobs help solve the final issues with blk_*()/bdrv_*()?
> 
> Correct me if I am wrong, but it seems to me that the bdrv_*()/blk_*()
> operation mostly take care of building, modifying and walking the bds graph.
> So since graph nodes can have multiple AioContext, it makes sense that we
> have a lock when modifying the graph, right?
> 
> If so, we can simply try to replace the AioContext lock with a graph lock,
> or something like that. But I am not sure of this.

Block graph manipulation (all_bdrv_states and friends) requires the BQL.
It has always been this way.

This raises the question: if block graph manipulation is already under
the BQL and BlockDriver callbacks don't need the AioContext anymore, why
are aio_context_acquire() calls still needed in block jobs?

AIO_WAIT_WHILE() requires that AioContext is acquired according to its
documentation, but I'm not sure that's true anymore. Thread-safe/atomic
primitives are used by AIO_WAIT_WHILE(), so as long as the condition
being waited for is thread-safe too it should work without the
AioContext lock.

Back to my comment about unnecessary risk, pushing the lock down is a
strategy for exploring the problem, but I'm not sure those intermediate
commits need to be committed to qemu.git/master because of the time
required to review them and the risk of introducing (temporary) bugs.
Maybe there's a benefit to this patch series that I've missed?

Stefan

[-- Attachment #2: signature.asc --]
[-- Type: application/pgp-signature, Size: 488 bytes --]

^ permalink raw reply	[flat|nested] 33+ messages in thread

* Re: [RFC PATCH 0/6] job: replace AioContext lock with job_mutex
  2021-07-12  8:42   ` Emanuele Giuseppe Esposito
@ 2021-07-13 13:27     ` Stefan Hajnoczi
  0 siblings, 0 replies; 33+ messages in thread
From: Stefan Hajnoczi @ 2021-07-13 13:27 UTC (permalink / raw)
  To: Emanuele Giuseppe Esposito
  Cc: Kevin Wolf, Vladimir Sementsov-Ogievskiy, qemu-block, qemu-devel,
	Wen Congyang, Xie Changlong, Markus Armbruster, Max Reitz,
	Paolo Bonzini, John Snow

[-- Attachment #1: Type: text/plain, Size: 2079 bytes --]

On Mon, Jul 12, 2021 at 10:42:47AM +0200, Emanuele Giuseppe Esposito wrote:
> On 08/07/2021 15:09, Stefan Hajnoczi wrote:
> > On Wed, Jul 07, 2021 at 06:58:07PM +0200, Emanuele Giuseppe Esposito wrote:
> > > This is a continuation on the work to reduce (and possibly get rid of) the usage of AioContext lock, by introducing smaller granularity locks to keep the thread safety.
> > > 
> > > This series aims to:
> > > 1) remove the aiocontext lock and substitute it with the already existing
> > >     global job_mutex
> > > 2) fix what it looks like to be an oversight when moving the blockjob.c logic
> > >     into the more generic job.c: job_mutex was introduced especially to
> > >     protect job->busy flag, but it seems that it was not used in successive
> > >     patches, because there are multiple code sections that directly
> > >     access the field without any locking.
> > > 3) use job_mutex instead of the aiocontext_lock
> > > 4) extend the reach of the job_mutex to protect all shared fields
> > >     that the job structure has.
> > 
> > Can you explain the big picture:
> > 
> > 1. What are the rules for JobDrivers? Imagine you are implementing a new
> >     JobDriver. What do you need to know in order to write correct code?
> 
> I think that in general, the rules for JobDrivers remain the same. The
> job_mutex lock should be invisible (or almost) from the point of view of a
> JobDriver, because the job API available for it should take care of the
> necessary locking/unlocking.
> 
> > 
> > 2. What are the rules for monitor? The main pattern is looking up a job,
> >     invoking a job API on it, and then calling job_unlock().
> 
> The monitor instead is aware of this lock: the reason for that is exactly
> what you have described here.
> Looking up + invoking a job API operation (for example calling find_job()
> and then job_pause() ) must be performed with the same lock hold all the
> time, otherwise other threads could modify the job while the monitor runs
> its command.

That helps, thanks!

Stefan

[-- Attachment #2: signature.asc --]
[-- Type: application/pgp-signature, Size: 488 bytes --]

^ permalink raw reply	[flat|nested] 33+ messages in thread

* Re: [RFC PATCH 2/6] job: _locked functions and public job_lock/unlock for next patch
  2021-07-12  8:43     ` Emanuele Giuseppe Esposito
@ 2021-07-13 13:32       ` Stefan Hajnoczi
  0 siblings, 0 replies; 33+ messages in thread
From: Stefan Hajnoczi @ 2021-07-13 13:32 UTC (permalink / raw)
  To: Emanuele Giuseppe Esposito
  Cc: Kevin Wolf, Vladimir Sementsov-Ogievskiy, qemu-block, qemu-devel,
	Wen Congyang, Xie Changlong, Markus Armbruster, Max Reitz,
	Paolo Bonzini, John Snow

[-- Attachment #1: Type: text/plain, Size: 2397 bytes --]

On Mon, Jul 12, 2021 at 10:43:07AM +0200, Emanuele Giuseppe Esposito wrote:
> 
> 
> On 08/07/2021 12:50, Stefan Hajnoczi wrote:
> > On Wed, Jul 07, 2021 at 06:58:09PM +0200, Emanuele Giuseppe Esposito wrote:
> > > diff --git a/job.c b/job.c
> > > index 872bbebb01..96fb8e9730 100644
> > > --- a/job.c
> > > +++ b/job.c
> > > @@ -32,6 +32,10 @@
> > >   #include "trace/trace-root.h"
> > >   #include "qapi/qapi-events-job.h"
> > > +/* job_mutex protexts the jobs list, but also the job operations. */
> > > +static QemuMutex job_mutex;
> > 
> > It's unclear what protecting "job operations" means. I would prefer a
> > fine-grained per-job lock that protects the job's fields instead of a
> > global lock with an unclear scope.
> 
> As I wrote in the cover letter, I wanted to try to keep things as simple as
> possible with a global lock. It is possible to try and have a per-job lock,
> but I don't know how complex will that be then.
> I will try and see what I can do.
> 
> Maybe "job_mutex protexts the jobs list, but also makes the job API
> thread-safe"?

That's clearer, thanks. I thought "job operations" meant the processing
that the actual block jobs do (commit, mirror, stream, backup).

> 
> > 
> > > +
> > > +/* Protected by job_mutex */
> > >   static QLIST_HEAD(, Job) jobs = QLIST_HEAD_INITIALIZER(jobs);
> > >   /* Job State Transition Table */
> > > @@ -64,27 +68,22 @@ bool JobVerbTable[JOB_VERB__MAX][JOB_STATUS__MAX] = {
> > >   /* Transactional group of jobs */
> > >   struct JobTxn {
> > > -    /* Is this txn being cancelled? */
> > > +    /* Is this txn being cancelled? Atomic.*/
> > >       bool aborting;
> > 
> > The comment says atomic but this field is not accessed using atomic
> > operations (at least at this point in the patch series)?
> 
> Yes sorry I messed up the hunks in one-two patches. These comments were
> supposed to be on patch 4 "job.h: categorize job fields". Even though that
> might also not be ideal, since that patch just introduces the comments,
> without applying the locking/protection yet.
> On the other side, if I merge everything together in patch 5, it will be
> even harder to read.

The commit description can describe changes that currently have no
effect but are anticipating a later patch. That helps reviewers
understand whether the change is intentional/correct.

Stefan

[-- Attachment #2: signature.asc --]
[-- Type: application/pgp-signature, Size: 488 bytes --]

^ permalink raw reply	[flat|nested] 33+ messages in thread

* Re: [RFC PATCH 0/6] job: replace AioContext lock with job_mutex
  2021-07-13 13:10         ` Stefan Hajnoczi
@ 2021-07-13 15:18           ` Vladimir Sementsov-Ogievskiy
  2021-07-13 16:38             ` Stefan Hajnoczi
  2021-07-16 15:23           ` Kevin Wolf
  1 sibling, 1 reply; 33+ messages in thread
From: Vladimir Sementsov-Ogievskiy @ 2021-07-13 15:18 UTC (permalink / raw)
  To: Stefan Hajnoczi, Emanuele Giuseppe Esposito
  Cc: Paolo Bonzini, Kevin Wolf, qemu-block, qemu-devel, Wen Congyang,
	Xie Changlong, Markus Armbruster, Max Reitz, John Snow

13.07.2021 16:10, Stefan Hajnoczi wrote:
> On Mon, Jul 12, 2021 at 10:41:46AM +0200, Emanuele Giuseppe Esposito wrote:
>>
>>
>> On 08/07/2021 15:04, Stefan Hajnoczi wrote:
>>> On Thu, Jul 08, 2021 at 01:32:12PM +0200, Paolo Bonzini wrote:
>>>> On 08/07/21 12:36, Stefan Hajnoczi wrote:
>>>>>> What is very clear from this patch is that it
>>>>>> is strictly related to the brdv_* and lower level calls, because
>>>>>> they also internally check or even use the aiocontext lock.
>>>>>> Therefore, in order to make it work, I temporarly added some
>>>>>> aiocontext_acquire/release pair around the function that
>>>>>> still assert for them or assume they are hold and temporarly
>>>>>> unlock (unlock() - lock()).
>>>>>
>>>>> Sounds like the issue is that this patch series assumes AioContext locks
>>>>> are no longer required for calling the blk_*()/bdrv_*() APIs? That is
>>>>> not the case yet, so you had to then add those aio_context_lock() calls
>>>>> back in elsewhere. This approach introduces unnecessary risk. I think we
>>>>> should wait until blk_*()/bdrv_*() no longer requires the caller to hold
>>>>> the AioContext lock before applying this series.
>>>>
>>>> In general I'm in favor of pushing the lock further down into smaller and
>>>> smaller critical sections; it's a good approach to make further audits
>>>> easier until it's "obvious" that the lock is unnecessary.  I haven't yet
>>>> reviewed Emanuele's patches to see if this is what he's doing where he's
>>>> adding the acquire/release calls, but that's my understanding of both his
>>>> cover letter and your reply.
>>>
>>> The problem is the unnecessary risk. We know what the goal is for
>>> blk_*()/bdrv_*() but it's not quite there yet. Does making changes in
>>> block jobs help solve the final issues with blk_*()/bdrv_*()?
>>
>> Correct me if I am wrong, but it seems to me that the bdrv_*()/blk_*()
>> operation mostly take care of building, modifying and walking the bds graph.
>> So since graph nodes can have multiple AioContext, it makes sense that we
>> have a lock when modifying the graph, right?
>>
>> If so, we can simply try to replace the AioContext lock with a graph lock,
>> or something like that. But I am not sure of this.
> 
> Block graph manipulation (all_bdrv_states and friends) requires the BQL.
> It has always been this way.
> 
> This raises the question: if block graph manipulation is already under
> the BQL and BlockDriver callbacks don't need the AioContext anymore, why

I don't believe that block drivers are thread-safe now. They have some mutexes.. But who make an audit of thread-safety? I work mostly with nbd and qcow2 drivers, and they never seemd to be thread-safe to me. For example, qcow2 driver has enough operations that are done from non-coroutine context and therefore qcow2 co-mutex just not locked.

> are aio_context_acquire() calls still needed in block jobs?
> 
> AIO_WAIT_WHILE() requires that AioContext is acquired according to its
> documentation, but I'm not sure that's true anymore. Thread-safe/atomic
> primitives are used by AIO_WAIT_WHILE(), so as long as the condition
> being waited for is thread-safe too it should work without the
> AioContext lock.
> 
> Back to my comment about unnecessary risk, pushing the lock down is a
> strategy for exploring the problem, but I'm not sure those intermediate
> commits need to be committed to qemu.git/master because of the time
> required to review them and the risk of introducing (temporary) bugs.

I agree. Add my bit of criticism:

What I dislike about the whole thread-safe update you do:

1. There is no proof of concept - some good example of multiqueue, or something that uses mutliple threads and shows good performance. Something that works, and shows where are we going to and why it is good. That may be draft patches with a lot of "FIXME" and "TODO", but working. For now I feel that I've spent my time to reviewing and proving to myself thread-safety of two previous thread-safe series, but I don't have a hope to see a benefit of it in the near future..

1.1 If we have a proof of concept, that also gives a kind of plan: a list of subsystems (patch series) to do step by step and finally come to what we want. Do you have a kind of plan (of the whole feature) now?

2. There are no tests: something that doesn't work before the series and start to work after. Why it is important:

All these thread-safe primitives are complicated enough. They hard to review and prove correctness. And very simple to break by new code. Tests that runs by CI proves that we don't break subsystems that are already thread-safe. For example, you've recently updated block-copy and several related things. But we have no tests. So, assume, a year later you finish the work of updating all other subsystems to be thread-safe. You'll have no guarantee that block-copy is still thread-safe, and you'll have to start from the beginning.

3. As I said, I really doubt that block drivers are already thread safe. An audit and/or tests are needed at least.


-- 
Best regards,
Vladimir


^ permalink raw reply	[flat|nested] 33+ messages in thread

* Re: [RFC PATCH 0/6] job: replace AioContext lock with job_mutex
  2021-07-13 15:18           ` Vladimir Sementsov-Ogievskiy
@ 2021-07-13 16:38             ` Stefan Hajnoczi
  2021-07-15 12:35               ` Vladimir Sementsov-Ogievskiy
  0 siblings, 1 reply; 33+ messages in thread
From: Stefan Hajnoczi @ 2021-07-13 16:38 UTC (permalink / raw)
  To: Vladimir Sementsov-Ogievskiy
  Cc: Emanuele Giuseppe Esposito, Kevin Wolf, qemu-block, Wen Congyang,
	Xie Changlong, qemu-devel, Markus Armbruster, Paolo Bonzini,
	Max Reitz, John Snow

[-- Attachment #1: Type: text/plain, Size: 5066 bytes --]

On Tue, Jul 13, 2021 at 06:18:39PM +0300, Vladimir Sementsov-Ogievskiy wrote:
> 13.07.2021 16:10, Stefan Hajnoczi wrote:
> > On Mon, Jul 12, 2021 at 10:41:46AM +0200, Emanuele Giuseppe Esposito wrote:
> > > 
> > > 
> > > On 08/07/2021 15:04, Stefan Hajnoczi wrote:
> > > > On Thu, Jul 08, 2021 at 01:32:12PM +0200, Paolo Bonzini wrote:
> > > > > On 08/07/21 12:36, Stefan Hajnoczi wrote:
> > > > > > > What is very clear from this patch is that it
> > > > > > > is strictly related to the brdv_* and lower level calls, because
> > > > > > > they also internally check or even use the aiocontext lock.
> > > > > > > Therefore, in order to make it work, I temporarly added some
> > > > > > > aiocontext_acquire/release pair around the function that
> > > > > > > still assert for them or assume they are hold and temporarly
> > > > > > > unlock (unlock() - lock()).
> > > > > > 
> > > > > > Sounds like the issue is that this patch series assumes AioContext locks
> > > > > > are no longer required for calling the blk_*()/bdrv_*() APIs? That is
> > > > > > not the case yet, so you had to then add those aio_context_lock() calls
> > > > > > back in elsewhere. This approach introduces unnecessary risk. I think we
> > > > > > should wait until blk_*()/bdrv_*() no longer requires the caller to hold
> > > > > > the AioContext lock before applying this series.
> > > > > 
> > > > > In general I'm in favor of pushing the lock further down into smaller and
> > > > > smaller critical sections; it's a good approach to make further audits
> > > > > easier until it's "obvious" that the lock is unnecessary.  I haven't yet
> > > > > reviewed Emanuele's patches to see if this is what he's doing where he's
> > > > > adding the acquire/release calls, but that's my understanding of both his
> > > > > cover letter and your reply.
> > > > 
> > > > The problem is the unnecessary risk. We know what the goal is for
> > > > blk_*()/bdrv_*() but it's not quite there yet. Does making changes in
> > > > block jobs help solve the final issues with blk_*()/bdrv_*()?
> > > 
> > > Correct me if I am wrong, but it seems to me that the bdrv_*()/blk_*()
> > > operation mostly take care of building, modifying and walking the bds graph.
> > > So since graph nodes can have multiple AioContext, it makes sense that we
> > > have a lock when modifying the graph, right?
> > > 
> > > If so, we can simply try to replace the AioContext lock with a graph lock,
> > > or something like that. But I am not sure of this.
> > 
> > Block graph manipulation (all_bdrv_states and friends) requires the BQL.
> > It has always been this way.
> > 
> > This raises the question: if block graph manipulation is already under
> > the BQL and BlockDriver callbacks don't need the AioContext anymore, why
> 
> I don't believe that block drivers are thread-safe now. They have some mutexes.. But who make an audit of thread-safety?

Emanuele :)

FWIW I took a look at the stream, mirror, and backup jobs today and
couldn't find anything that's unsafe after this series. I was expecting
to find issues but I think Emanuele and Paolo have taken care of them.

> > are aio_context_acquire() calls still needed in block jobs?
> > 
> > AIO_WAIT_WHILE() requires that AioContext is acquired according to its
> > documentation, but I'm not sure that's true anymore. Thread-safe/atomic
> > primitives are used by AIO_WAIT_WHILE(), so as long as the condition
> > being waited for is thread-safe too it should work without the
> > AioContext lock.
> > 
> > Back to my comment about unnecessary risk, pushing the lock down is a
> > strategy for exploring the problem, but I'm not sure those intermediate
> > commits need to be committed to qemu.git/master because of the time
> > required to review them and the risk of introducing (temporary) bugs.
> 
> I agree. Add my bit of criticism:
> 
> What I dislike about the whole thread-safe update you do:
> 
> 1. There is no proof of concept - some good example of multiqueue, or something that uses mutliple threads and shows good performance. Something that works, and shows where are we going to and why it is good. That may be draft patches with a lot of "FIXME" and "TODO", but working. For now I feel that I've spent my time to reviewing and proving to myself thread-safety of two previous thread-safe series, but I don't have a hope to see a benefit of it in the near future..

The multi-queue block layer should improve performance in cases where
the bottleneck is a single IOThread. It will allow users to assign more
than one IOThread.

But I think the bigger impact of this work will be addressing
long-standing problems with the block layer's programming model. We
continue to have IOThread bugs because there are many undocumented
assumptions. With the multi-queue block layer the code switches to a
more well-understood multi-threaded programming model and hopefully
fewer issues will arise because there is no problematic AioContext lock
to worry about.

Stefan

[-- Attachment #2: signature.asc --]
[-- Type: application/pgp-signature, Size: 488 bytes --]

^ permalink raw reply	[flat|nested] 33+ messages in thread

* Re: [RFC PATCH 3/6] job: minor changes to simplify locking
  2021-07-07 16:58 ` [RFC PATCH 3/6] job: minor changes to simplify locking Emanuele Giuseppe Esposito
  2021-07-08 10:55   ` Stefan Hajnoczi
@ 2021-07-13 17:56   ` Eric Blake
  1 sibling, 0 replies; 33+ messages in thread
From: Eric Blake @ 2021-07-13 17:56 UTC (permalink / raw)
  To: Emanuele Giuseppe Esposito
  Cc: Kevin Wolf, Vladimir Sementsov-Ogievskiy, qemu-block,
	Wen Congyang, Xie Changlong, qemu-devel, Markus Armbruster,
	Stefan Hajnoczi, Paolo Bonzini, Max Reitz, John Snow

On Wed, Jul 07, 2021 at 06:58:10PM +0200, Emanuele Giuseppe Esposito wrote:
> Check for NULL id to job_get, so that in the next patch we can
> move job_get inside a single critical section of job_create.
> 
> Also add missing notifier_list_init for the on_idle NotifierList,
> which seems to have been forgot.

forgotten

> 
> Signed-off-by: Emanuele Giuseppe Esposito <eesposit@redhat.com>
> ---
>  job.c | 16 ++++++++++++----
>  1 file changed, 12 insertions(+), 4 deletions(-)

-- 
Eric Blake, Principal Software Engineer
Red Hat, Inc.           +1-919-301-3266
Virtualization:  qemu.org | libvirt.org



^ permalink raw reply	[flat|nested] 33+ messages in thread

* Re: [RFC PATCH 0/6] job: replace AioContext lock with job_mutex
  2021-07-13 16:38             ` Stefan Hajnoczi
@ 2021-07-15 12:35               ` Vladimir Sementsov-Ogievskiy
  2021-07-15 13:29                 ` Stefan Hajnoczi
  0 siblings, 1 reply; 33+ messages in thread
From: Vladimir Sementsov-Ogievskiy @ 2021-07-15 12:35 UTC (permalink / raw)
  To: Stefan Hajnoczi
  Cc: Emanuele Giuseppe Esposito, Paolo Bonzini, Kevin Wolf,
	qemu-block, qemu-devel, Wen Congyang, Xie Changlong,
	Markus Armbruster, Max Reitz, John Snow

13.07.2021 19:38, Stefan Hajnoczi wrote:
> On Tue, Jul 13, 2021 at 06:18:39PM +0300, Vladimir Sementsov-Ogievskiy wrote:
>> 13.07.2021 16:10, Stefan Hajnoczi wrote:
>>> On Mon, Jul 12, 2021 at 10:41:46AM +0200, Emanuele Giuseppe Esposito wrote:
>>>>
>>>>
>>>> On 08/07/2021 15:04, Stefan Hajnoczi wrote:
>>>>> On Thu, Jul 08, 2021 at 01:32:12PM +0200, Paolo Bonzini wrote:
>>>>>> On 08/07/21 12:36, Stefan Hajnoczi wrote:
>>>>>>>> What is very clear from this patch is that it
>>>>>>>> is strictly related to the brdv_* and lower level calls, because
>>>>>>>> they also internally check or even use the aiocontext lock.
>>>>>>>> Therefore, in order to make it work, I temporarly added some
>>>>>>>> aiocontext_acquire/release pair around the function that
>>>>>>>> still assert for them or assume they are hold and temporarly
>>>>>>>> unlock (unlock() - lock()).
>>>>>>>
>>>>>>> Sounds like the issue is that this patch series assumes AioContext locks
>>>>>>> are no longer required for calling the blk_*()/bdrv_*() APIs? That is
>>>>>>> not the case yet, so you had to then add those aio_context_lock() calls
>>>>>>> back in elsewhere. This approach introduces unnecessary risk. I think we
>>>>>>> should wait until blk_*()/bdrv_*() no longer requires the caller to hold
>>>>>>> the AioContext lock before applying this series.
>>>>>>
>>>>>> In general I'm in favor of pushing the lock further down into smaller and
>>>>>> smaller critical sections; it's a good approach to make further audits
>>>>>> easier until it's "obvious" that the lock is unnecessary.  I haven't yet
>>>>>> reviewed Emanuele's patches to see if this is what he's doing where he's
>>>>>> adding the acquire/release calls, but that's my understanding of both his
>>>>>> cover letter and your reply.
>>>>>
>>>>> The problem is the unnecessary risk. We know what the goal is for
>>>>> blk_*()/bdrv_*() but it's not quite there yet. Does making changes in
>>>>> block jobs help solve the final issues with blk_*()/bdrv_*()?
>>>>
>>>> Correct me if I am wrong, but it seems to me that the bdrv_*()/blk_*()
>>>> operation mostly take care of building, modifying and walking the bds graph.
>>>> So since graph nodes can have multiple AioContext, it makes sense that we
>>>> have a lock when modifying the graph, right?
>>>>
>>>> If so, we can simply try to replace the AioContext lock with a graph lock,
>>>> or something like that. But I am not sure of this.
>>>
>>> Block graph manipulation (all_bdrv_states and friends) requires the BQL.
>>> It has always been this way.
>>>
>>> This raises the question: if block graph manipulation is already under
>>> the BQL and BlockDriver callbacks don't need the AioContext anymore, why
>>
>> I don't believe that block drivers are thread-safe now. They have some mutexes.. But who make an audit of thread-safety?
> 
> Emanuele :)
> 
> FWIW I took a look at the stream, mirror, and backup jobs today and
> couldn't find anything that's unsafe after this series. I was expecting
> to find issues but I think Emanuele and Paolo have taken care of them.


Hmm, do you mean that all jobs are thread-safe?

Looking at the mirror, what protects s->ops_in_flight for example? It's accessed from job coroutines and from mirror_top filter write operation.

> 
>>> are aio_context_acquire() calls still needed in block jobs?
>>>
>>> AIO_WAIT_WHILE() requires that AioContext is acquired according to its
>>> documentation, but I'm not sure that's true anymore. Thread-safe/atomic
>>> primitives are used by AIO_WAIT_WHILE(), so as long as the condition
>>> being waited for is thread-safe too it should work without the
>>> AioContext lock.
>>>
>>> Back to my comment about unnecessary risk, pushing the lock down is a
>>> strategy for exploring the problem, but I'm not sure those intermediate
>>> commits need to be committed to qemu.git/master because of the time
>>> required to review them and the risk of introducing (temporary) bugs.
>>
>> I agree. Add my bit of criticism:
>>
>> What I dislike about the whole thread-safe update you do:
>>
>> 1. There is no proof of concept - some good example of multiqueue, or something that uses mutliple threads and shows good performance. Something that works, and shows where are we going to and why it is good. That may be draft patches with a lot of "FIXME" and "TODO", but working. For now I feel that I've spent my time to reviewing and proving to myself thread-safety of two previous thread-safe series, but I don't have a hope to see a benefit of it in the near future..
> 
> The multi-queue block layer should improve performance in cases where
> the bottleneck is a single IOThread. It will allow users to assign more
> than one IOThread.
> 
> But I think the bigger impact of this work will be addressing
> long-standing problems with the block layer's programming model. We
> continue to have IOThread bugs because there are many undocumented
> assumptions. With the multi-queue block layer the code switches to a
> more well-understood multi-threaded programming model and hopefully
> fewer issues will arise because there is no problematic AioContext lock
> to worry about.
> 
> Stefan
> 


-- 
Best regards,
Vladimir


^ permalink raw reply	[flat|nested] 33+ messages in thread

* Re: [RFC PATCH 0/6] job: replace AioContext lock with job_mutex
  2021-07-15 12:35               ` Vladimir Sementsov-Ogievskiy
@ 2021-07-15 13:29                 ` Stefan Hajnoczi
  0 siblings, 0 replies; 33+ messages in thread
From: Stefan Hajnoczi @ 2021-07-15 13:29 UTC (permalink / raw)
  To: Vladimir Sementsov-Ogievskiy
  Cc: Emanuele Giuseppe Esposito, Kevin Wolf, qemu-block, Wen Congyang,
	Xie Changlong, qemu-devel, Markus Armbruster, Paolo Bonzini,
	Max Reitz, John Snow

[-- Attachment #1: Type: text/plain, Size: 4014 bytes --]

On Thu, Jul 15, 2021 at 03:35:37PM +0300, Vladimir Sementsov-Ogievskiy wrote:
> 13.07.2021 19:38, Stefan Hajnoczi wrote:
> > On Tue, Jul 13, 2021 at 06:18:39PM +0300, Vladimir Sementsov-Ogievskiy wrote:
> > > 13.07.2021 16:10, Stefan Hajnoczi wrote:
> > > > On Mon, Jul 12, 2021 at 10:41:46AM +0200, Emanuele Giuseppe Esposito wrote:
> > > > > 
> > > > > 
> > > > > On 08/07/2021 15:04, Stefan Hajnoczi wrote:
> > > > > > On Thu, Jul 08, 2021 at 01:32:12PM +0200, Paolo Bonzini wrote:
> > > > > > > On 08/07/21 12:36, Stefan Hajnoczi wrote:
> > > > > > > > > What is very clear from this patch is that it
> > > > > > > > > is strictly related to the brdv_* and lower level calls, because
> > > > > > > > > they also internally check or even use the aiocontext lock.
> > > > > > > > > Therefore, in order to make it work, I temporarly added some
> > > > > > > > > aiocontext_acquire/release pair around the function that
> > > > > > > > > still assert for them or assume they are hold and temporarly
> > > > > > > > > unlock (unlock() - lock()).
> > > > > > > > 
> > > > > > > > Sounds like the issue is that this patch series assumes AioContext locks
> > > > > > > > are no longer required for calling the blk_*()/bdrv_*() APIs? That is
> > > > > > > > not the case yet, so you had to then add those aio_context_lock() calls
> > > > > > > > back in elsewhere. This approach introduces unnecessary risk. I think we
> > > > > > > > should wait until blk_*()/bdrv_*() no longer requires the caller to hold
> > > > > > > > the AioContext lock before applying this series.
> > > > > > > 
> > > > > > > In general I'm in favor of pushing the lock further down into smaller and
> > > > > > > smaller critical sections; it's a good approach to make further audits
> > > > > > > easier until it's "obvious" that the lock is unnecessary.  I haven't yet
> > > > > > > reviewed Emanuele's patches to see if this is what he's doing where he's
> > > > > > > adding the acquire/release calls, but that's my understanding of both his
> > > > > > > cover letter and your reply.
> > > > > > 
> > > > > > The problem is the unnecessary risk. We know what the goal is for
> > > > > > blk_*()/bdrv_*() but it's not quite there yet. Does making changes in
> > > > > > block jobs help solve the final issues with blk_*()/bdrv_*()?
> > > > > 
> > > > > Correct me if I am wrong, but it seems to me that the bdrv_*()/blk_*()
> > > > > operation mostly take care of building, modifying and walking the bds graph.
> > > > > So since graph nodes can have multiple AioContext, it makes sense that we
> > > > > have a lock when modifying the graph, right?
> > > > > 
> > > > > If so, we can simply try to replace the AioContext lock with a graph lock,
> > > > > or something like that. But I am not sure of this.
> > > > 
> > > > Block graph manipulation (all_bdrv_states and friends) requires the BQL.
> > > > It has always been this way.
> > > > 
> > > > This raises the question: if block graph manipulation is already under
> > > > the BQL and BlockDriver callbacks don't need the AioContext anymore, why
> > > 
> > > I don't believe that block drivers are thread-safe now. They have some mutexes.. But who make an audit of thread-safety?
> > 
> > Emanuele :)
> > 
> > FWIW I took a look at the stream, mirror, and backup jobs today and
> > couldn't find anything that's unsafe after this series. I was expecting
> > to find issues but I think Emanuele and Paolo have taken care of them.
> 
> 
> Hmm, do you mean that all jobs are thread-safe?
> 
> Looking at the mirror, what protects s->ops_in_flight for example? It's accessed from job coroutines and from mirror_top filter write operation.

You're right. I missed the bdrv_mirror_top BlockDriver:

.pwrite_zeroes -> bdrv_mirror_top_pwrite_zeroes -> active_write_prepare -> QTAILQ_INSERT_TAIL(&s->ops_in_flight, op, next)

This is not thread-safe. A CoMutex is needed here to protect
MirrorBSDOpaque fields.

Stefan

[-- Attachment #2: signature.asc --]
[-- Type: application/pgp-signature, Size: 488 bytes --]

^ permalink raw reply	[flat|nested] 33+ messages in thread

* Re: [RFC PATCH 0/6] job: replace AioContext lock with job_mutex
  2021-07-13 13:10         ` Stefan Hajnoczi
  2021-07-13 15:18           ` Vladimir Sementsov-Ogievskiy
@ 2021-07-16 15:23           ` Kevin Wolf
  2021-07-19  9:29             ` Stefan Hajnoczi
  1 sibling, 1 reply; 33+ messages in thread
From: Kevin Wolf @ 2021-07-16 15:23 UTC (permalink / raw)
  To: Stefan Hajnoczi
  Cc: Emanuele Giuseppe Esposito, Vladimir Sementsov-Ogievskiy,
	qemu-block, Wen Congyang, Xie Changlong, qemu-devel,
	Markus Armbruster, Paolo Bonzini, Max Reitz, John Snow

[-- Attachment #1: Type: text/plain, Size: 622 bytes --]

Am 13.07.2021 um 15:10 hat Stefan Hajnoczi geschrieben:
> AIO_WAIT_WHILE() requires that AioContext is acquired according to its
> documentation, but I'm not sure that's true anymore. Thread-safe/atomic
> primitives are used by AIO_WAIT_WHILE(), so as long as the condition
> being waited for is thread-safe too it should work without the
> AioContext lock.

Polling something in a different AioContext from the main thread still
temporarily drops the lock, which crashes if it isn't locked. I'm not
sure if the documentation claims that the lock is needed in more cases,
I guess you could interpret it either way.

Kevin

[-- Attachment #2: signature.asc --]
[-- Type: application/pgp-signature, Size: 833 bytes --]

^ permalink raw reply	[flat|nested] 33+ messages in thread

* Re: [RFC PATCH 0/6] job: replace AioContext lock with job_mutex
  2021-07-16 15:23           ` Kevin Wolf
@ 2021-07-19  9:29             ` Stefan Hajnoczi
  2021-07-19 14:54               ` Kevin Wolf
  0 siblings, 1 reply; 33+ messages in thread
From: Stefan Hajnoczi @ 2021-07-19  9:29 UTC (permalink / raw)
  To: Kevin Wolf
  Cc: Emanuele Giuseppe Esposito, Vladimir Sementsov-Ogievskiy,
	qemu-block, Wen Congyang, Xie Changlong, qemu-devel,
	Markus Armbruster, Paolo Bonzini, Max Reitz, John Snow

[-- Attachment #1: Type: text/plain, Size: 1027 bytes --]

On Fri, Jul 16, 2021 at 05:23:50PM +0200, Kevin Wolf wrote:
> Am 13.07.2021 um 15:10 hat Stefan Hajnoczi geschrieben:
> > AIO_WAIT_WHILE() requires that AioContext is acquired according to its
> > documentation, but I'm not sure that's true anymore. Thread-safe/atomic
> > primitives are used by AIO_WAIT_WHILE(), so as long as the condition
> > being waited for is thread-safe too it should work without the
> > AioContext lock.
> 
> Polling something in a different AioContext from the main thread still
> temporarily drops the lock, which crashes if it isn't locked. I'm not
> sure if the documentation claims that the lock is needed in more cases,
> I guess you could interpret it either way.

I'm claiming that the lock doesn't need to be dropped in that case
anymore - as long as the condition we're polling is thread-safe. :)

Have I missed something that still need locking?

We could temporarily introduce an AIO_WAIT_WHILE_UNLOCKED() macro so
that callers can be converted individually.

Stefan

[-- Attachment #2: signature.asc --]
[-- Type: application/pgp-signature, Size: 488 bytes --]

^ permalink raw reply	[flat|nested] 33+ messages in thread

* Re: [RFC PATCH 0/6] job: replace AioContext lock with job_mutex
  2021-07-19  9:29             ` Stefan Hajnoczi
@ 2021-07-19 14:54               ` Kevin Wolf
  0 siblings, 0 replies; 33+ messages in thread
From: Kevin Wolf @ 2021-07-19 14:54 UTC (permalink / raw)
  To: Stefan Hajnoczi
  Cc: Emanuele Giuseppe Esposito, Vladimir Sementsov-Ogievskiy,
	qemu-block, Wen Congyang, Xie Changlong, qemu-devel,
	Markus Armbruster, Paolo Bonzini, Max Reitz, John Snow

[-- Attachment #1: Type: text/plain, Size: 1438 bytes --]

Am 19.07.2021 um 11:29 hat Stefan Hajnoczi geschrieben:
> On Fri, Jul 16, 2021 at 05:23:50PM +0200, Kevin Wolf wrote:
> > Am 13.07.2021 um 15:10 hat Stefan Hajnoczi geschrieben:
> > > AIO_WAIT_WHILE() requires that AioContext is acquired according to its
> > > documentation, but I'm not sure that's true anymore. Thread-safe/atomic
> > > primitives are used by AIO_WAIT_WHILE(), so as long as the condition
> > > being waited for is thread-safe too it should work without the
> > > AioContext lock.
> > 
> > Polling something in a different AioContext from the main thread still
> > temporarily drops the lock, which crashes if it isn't locked. I'm not
> > sure if the documentation claims that the lock is needed in more cases,
> > I guess you could interpret it either way.
> 
> I'm claiming that the lock doesn't need to be dropped in that case
> anymore - as long as the condition we're polling is thread-safe. :)
> 
> Have I missed something that still need locking?

I'm not sure if AIO_WAIT_WHILE() actually ever needed the locking. I
think it's more a convenience thing since the callers would already hold
the lock, so dropping it temporarily in AIO_WAIT_WHILE() means that the
callers don't have to duplicate the temporary unlock everywhere.

> We could temporarily introduce an AIO_WAIT_WHILE_UNLOCKED() macro so
> that callers can be converted individually.

Yes, this makes sense to me.

Kevin

[-- Attachment #2: signature.asc --]
[-- Type: application/pgp-signature, Size: 833 bytes --]

^ permalink raw reply	[flat|nested] 33+ messages in thread

end of thread, other threads:[~2021-07-19 14:56 UTC | newest]

Thread overview: 33+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2021-07-07 16:58 [RFC PATCH 0/6] job: replace AioContext lock with job_mutex Emanuele Giuseppe Esposito
2021-07-07 16:58 ` [RFC PATCH 1/6] job: use getter/setters instead of accessing the Job fields directly Emanuele Giuseppe Esposito
2021-07-07 16:58 ` [RFC PATCH 2/6] job: _locked functions and public job_lock/unlock for next patch Emanuele Giuseppe Esposito
2021-07-08 10:50   ` Stefan Hajnoczi
2021-07-12  8:43     ` Emanuele Giuseppe Esposito
2021-07-13 13:32       ` Stefan Hajnoczi
2021-07-07 16:58 ` [RFC PATCH 3/6] job: minor changes to simplify locking Emanuele Giuseppe Esposito
2021-07-08 10:55   ` Stefan Hajnoczi
2021-07-12  8:43     ` Emanuele Giuseppe Esposito
2021-07-13 17:56   ` Eric Blake
2021-07-07 16:58 ` [RFC PATCH 4/6] job.h: categorize job fields Emanuele Giuseppe Esposito
2021-07-08 11:02   ` Stefan Hajnoczi
2021-07-12  8:43     ` Emanuele Giuseppe Esposito
2021-07-07 16:58 ` [RFC PATCH 5/6] job: use global job_mutex to protect struct Job Emanuele Giuseppe Esposito
2021-07-08 12:56   ` Stefan Hajnoczi
2021-07-12  8:43     ` Emanuele Giuseppe Esposito
2021-07-07 16:58 ` [RFC PATCH 6/6] jobs: remove unnecessary AioContext aquire/release pairs Emanuele Giuseppe Esposito
2021-07-08 10:36 ` [RFC PATCH 0/6] job: replace AioContext lock with job_mutex Stefan Hajnoczi
2021-07-08 11:32   ` Paolo Bonzini
2021-07-08 12:14     ` Kevin Wolf
2021-07-08 13:04     ` Stefan Hajnoczi
2021-07-12  8:41       ` Emanuele Giuseppe Esposito
2021-07-13 13:10         ` Stefan Hajnoczi
2021-07-13 15:18           ` Vladimir Sementsov-Ogievskiy
2021-07-13 16:38             ` Stefan Hajnoczi
2021-07-15 12:35               ` Vladimir Sementsov-Ogievskiy
2021-07-15 13:29                 ` Stefan Hajnoczi
2021-07-16 15:23           ` Kevin Wolf
2021-07-19  9:29             ` Stefan Hajnoczi
2021-07-19 14:54               ` Kevin Wolf
2021-07-08 13:09 ` Stefan Hajnoczi
2021-07-12  8:42   ` Emanuele Giuseppe Esposito
2021-07-13 13:27     ` Stefan Hajnoczi

This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.