All of lore.kernel.org
 help / color / mirror / Atom feed
From: Kevin Wolf <kwolf@redhat.com>
To: qemu-block@nongnu.org
Cc: kwolf@redhat.com, mreitz@redhat.com, jsnow@redhat.com,
	eblake@redhat.com, jcody@redhat.com, armbru@redhat.com,
	qemu-devel@nongnu.org
Subject: [Qemu-devel] [PATCH v2 26/40] job: Move transactions to Job
Date: Fri, 18 May 2018 15:21:00 +0200	[thread overview]
Message-ID: <20180518132114.4070-27-kwolf@redhat.com> (raw)
In-Reply-To: <20180518132114.4070-1-kwolf@redhat.com>

This moves the logic that implements job transactions from BlockJob to
Job.

Signed-off-by: Kevin Wolf <kwolf@redhat.com>
Reviewed-by: Max Reitz <mreitz@redhat.com>
---
 include/block/blockjob.h     |  54 ----------
 include/block/blockjob_int.h |  10 --
 include/qemu/job.h           |  71 +++++++++++--
 blockdev.c                   |   6 +-
 blockjob.c                   | 238 +------------------------------------------
 job.c                        | 234 ++++++++++++++++++++++++++++++++++++++++--
 tests/test-blockjob-txn.c    |  12 +--
 tests/test-blockjob.c        |   2 +-
 8 files changed, 303 insertions(+), 324 deletions(-)

diff --git a/include/block/blockjob.h b/include/block/blockjob.h
index 44df025bd0..09e6bb42bf 100644
--- a/include/block/blockjob.h
+++ b/include/block/blockjob.h
@@ -33,7 +33,6 @@
 #define BLOCK_JOB_SLICE_TIME 100000000ULL /* ns */
 
 typedef struct BlockJobDriver BlockJobDriver;
-typedef struct JobTxn JobTxn;
 
 /**
  * BlockJob:
@@ -84,8 +83,6 @@ typedef struct BlockJob {
 
     /** BlockDriverStates that are involved in this block job */
     GSList *nodes;
-
-    JobTxn *txn;
 } BlockJob;
 
 /**
@@ -153,22 +150,6 @@ void block_job_set_speed(BlockJob *job, int64_t speed, Error **errp);
 void block_job_cancel(BlockJob *job, bool force);
 
 /**
- * block_job_finalize:
- * @job: The job to fully commit and finish.
- * @errp: Error object.
- *
- * For jobs that have finished their work and are pending
- * awaiting explicit acknowledgement to commit their work,
- * This will commit that work.
- *
- * FIXME: Make the below statement universally true:
- * For jobs that support the manual workflow mode, all graph
- * changes that occur as a result will occur after this command
- * and before a successful reply.
- */
-void block_job_finalize(BlockJob *job, Error **errp);
-
-/**
  * block_job_dismiss:
  * @job: The job to be dismissed.
  * @errp: Error object.
@@ -260,41 +241,6 @@ int block_job_complete_sync(BlockJob *job, Error **errp);
 void block_job_iostatus_reset(BlockJob *job);
 
 /**
- * block_job_txn_new:
- *
- * Allocate and return a new block job transaction.  Jobs can be added to the
- * transaction using block_job_txn_add_job().
- *
- * The transaction is automatically freed when the last job completes or is
- * cancelled.
- *
- * All jobs in the transaction either complete successfully or fail/cancel as a
- * group.  Jobs wait for each other before completing.  Cancelling one job
- * cancels all jobs in the transaction.
- */
-JobTxn *block_job_txn_new(void);
-
-/**
- * block_job_txn_unref:
- *
- * Release a reference that was previously acquired with block_job_txn_add_job
- * or block_job_txn_new. If it's the last reference to the object, it will be
- * freed.
- */
-void block_job_txn_unref(JobTxn *txn);
-
-/**
- * block_job_txn_add_job:
- * @txn: The transaction (may be NULL)
- * @job: Job to add to the transaction
- *
- * Add @job to the transaction.  The @job must not already be in a transaction.
- * The caller must call either block_job_txn_unref() or block_job_completed()
- * to release the reference that is automatically grabbed here.
- */
-void block_job_txn_add_job(JobTxn *txn, BlockJob *job);
-
-/**
  * block_job_is_internal:
  * @job: The job to determine if it is user-visible or not.
  *
diff --git a/include/block/blockjob_int.h b/include/block/blockjob_int.h
index ce66a9b51c..29a28020ac 100644
--- a/include/block/blockjob_int.h
+++ b/include/block/blockjob_int.h
@@ -38,16 +38,6 @@ struct BlockJobDriver {
     /** Generic JobDriver callbacks and settings */
     JobDriver job_driver;
 
-    /**
-     * If the callback is not NULL, prepare will be invoked when all the jobs
-     * belonging to the same transaction complete; or upon this job's completion
-     * if it is not in a transaction.
-     *
-     * This callback will not be invoked if the job has already failed.
-     * If it fails, abort and then clean will be called.
-     */
-    int (*prepare)(BlockJob *job);
-
     /*
      * If the callback is not NULL, it will be invoked before the job is
      * resumed in a new AioContext.  This is the place to move any resources
diff --git a/include/qemu/job.h b/include/qemu/job.h
index d4aa7fa981..39d74abdec 100644
--- a/include/qemu/job.h
+++ b/include/qemu/job.h
@@ -32,6 +32,8 @@
 #include "block/aio.h"
 
 typedef struct JobDriver JobDriver;
+typedef struct JobTxn JobTxn;
+
 
 /**
  * Long-running operation.
@@ -133,6 +135,9 @@ typedef struct Job {
     /** Element of the list of jobs */
     QLIST_ENTRY(Job) job_list;
 
+    /** Transaction this job is part of */
+    JobTxn *txn;
+
     /** Element of the list of jobs in a job transaction */
     QLIST_ENTRY(Job) txn_list;
 } Job;
@@ -184,6 +189,16 @@ struct JobDriver {
     void (*drain)(Job *job);
 
     /**
+     * If the callback is not NULL, prepare will be invoked when all the jobs
+     * belonging to the same transaction complete; or upon this job's completion
+     * if it is not in a transaction.
+     *
+     * This callback will not be invoked if the job has already failed.
+     * If it fails, abort and then clean will be called.
+     */
+    int (*prepare)(Job *job);
+
+    /**
      * If the callback is not NULL, it will be invoked when all the jobs
      * belonging to the same transaction complete; or upon this job's
      * completion if it is not in a transaction. Skipped if NULL.
@@ -227,20 +242,52 @@ typedef enum JobCreateFlags {
     JOB_MANUAL_DISMISS = 0x04,
 } JobCreateFlags;
 
+/**
+ * Allocate and return a new job transaction. Jobs can be added to the
+ * transaction using job_txn_add_job().
+ *
+ * The transaction is automatically freed when the last job completes or is
+ * cancelled.
+ *
+ * All jobs in the transaction either complete successfully or fail/cancel as a
+ * group.  Jobs wait for each other before completing.  Cancelling one job
+ * cancels all jobs in the transaction.
+ */
+JobTxn *job_txn_new(void);
+
+/**
+ * Release a reference that was previously acquired with job_txn_add_job or
+ * job_txn_new. If it's the last reference to the object, it will be freed.
+ */
+void job_txn_unref(JobTxn *txn);
+
+/**
+ * @txn: The transaction (may be NULL)
+ * @job: Job to add to the transaction
+ *
+ * Add @job to the transaction.  The @job must not already be in a transaction.
+ * The caller must call either job_txn_unref() or block_job_completed() to
+ * release the reference that is automatically grabbed here.
+ *
+ * If @txn is NULL, the function does nothing.
+ */
+void job_txn_add_job(JobTxn *txn, Job *job);
 
 /**
  * Create a new long-running job and return it.
  *
  * @job_id: The id of the newly-created job, or %NULL for internal jobs
  * @driver: The class object for the newly-created job.
+ * @txn: The transaction this job belongs to, if any. %NULL otherwise.
  * @ctx: The AioContext to run the job coroutine in.
  * @flags: Creation flags for the job. See @JobCreateFlags.
  * @cb: Completion function for the job.
  * @opaque: Opaque pointer value passed to @cb.
  * @errp: Error object.
  */
-void *job_create(const char *job_id, const JobDriver *driver, AioContext *ctx,
-                 int flags, BlockCompletionFunc *cb, void *opaque, Error **errp);
+void *job_create(const char *job_id, const JobDriver *driver, JobTxn *txn,
+                 AioContext *ctx, int flags, BlockCompletionFunc *cb,
+                 void *opaque, Error **errp);
 
 /**
  * Add a reference to Job refcnt, it will be decreased with job_unref, and then
@@ -260,9 +307,6 @@ void job_event_cancelled(Job *job);
 /** To be called when a successfully completed job is finalised. */
 void job_event_completed(Job *job);
 
-/** To be called when the job transitions to PENDING */
-void job_event_pending(Job *job);
-
 /**
  * Conditionally enter the job coroutine if the job is ready to run, not
  * already busy and fn() returns true. fn() is called while under the job_lock
@@ -375,6 +419,16 @@ void job_early_fail(Job *job);
 /** Asynchronously complete the specified @job. */
 void job_complete(Job *job, Error **errp);;
 
+/**
+ * For a @job that has finished its work and is pending awaiting explicit
+ * acknowledgement to commit its work, this will commit that work.
+ *
+ * FIXME: Make the below statement universally true:
+ * For jobs that support the manual workflow mode, all graph changes that occur
+ * as a result will occur after this command and before a successful reply.
+ */
+void job_finalize(Job *job, Error **errp);
+
 typedef void JobDeferToMainLoopFn(Job *job, void *opaque);
 
 /**
@@ -407,10 +461,9 @@ void coroutine_fn job_do_yield(Job *job, uint64_t ns);
 bool job_should_pause(Job *job);
 bool job_started(Job *job);
 void job_do_dismiss(Job *job);
-int job_finalize_single(Job *job);
 void job_update_rc(Job *job);
-
-typedef struct BlockJob BlockJob;
-void block_job_txn_del_job(BlockJob *job);
+void job_cancel_async(Job *job, bool force);
+void job_completed_txn_abort(Job *job);
+void job_completed_txn_success(Job *job);
 
 #endif
diff --git a/blockdev.c b/blockdev.c
index 817c3848c0..87a23bed6f 100644
--- a/blockdev.c
+++ b/blockdev.c
@@ -2255,7 +2255,7 @@ void qmp_transaction(TransactionActionList *dev_list,
      */
     props = get_transaction_properties(props);
     if (props->completion_mode != ACTION_COMPLETION_MODE_INDIVIDUAL) {
-        block_job_txn = block_job_txn_new();
+        block_job_txn = job_txn_new();
     }
 
     /* drain all i/o before any operations */
@@ -2314,7 +2314,7 @@ exit:
     if (!has_props) {
         qapi_free_TransactionProperties(props);
     }
-    block_job_txn_unref(block_job_txn);
+    job_txn_unref(block_job_txn);
 }
 
 void qmp_eject(bool has_device, const char *device,
@@ -3908,7 +3908,7 @@ void qmp_block_job_finalize(const char *id, Error **errp)
     }
 
     trace_qmp_block_job_finalize(job);
-    block_job_finalize(job, errp);
+    job_finalize(&job->job, errp);
     aio_context_release(aio_context);
 }
 
diff --git a/blockjob.c b/blockjob.c
index bd35c4fa7a..14b21c86b7 100644
--- a/blockjob.c
+++ b/blockjob.c
@@ -36,19 +36,6 @@
 #include "qemu/coroutine.h"
 #include "qemu/timer.h"
 
-/* Transactional group of block jobs */
-struct JobTxn {
-
-    /* Is this txn being cancelled? */
-    bool aborting;
-
-    /* List of jobs */
-    QLIST_HEAD(, Job) jobs;
-
-    /* Reference count */
-    int refcnt;
-};
-
 /*
  * The block job API is composed of two categories of functions.
  *
@@ -94,48 +81,6 @@ BlockJob *block_job_get(const char *id)
     }
 }
 
-JobTxn *block_job_txn_new(void)
-{
-    JobTxn *txn = g_new0(JobTxn, 1);
-    QLIST_INIT(&txn->jobs);
-    txn->refcnt = 1;
-    return txn;
-}
-
-static void block_job_txn_ref(JobTxn *txn)
-{
-    txn->refcnt++;
-}
-
-void block_job_txn_unref(JobTxn *txn)
-{
-    if (txn && --txn->refcnt == 0) {
-        g_free(txn);
-    }
-}
-
-void block_job_txn_add_job(JobTxn *txn, BlockJob *job)
-{
-    if (!txn) {
-        return;
-    }
-
-    assert(!job->txn);
-    job->txn = txn;
-
-    QLIST_INSERT_HEAD(&txn->jobs, &job->job, txn_list);
-    block_job_txn_ref(txn);
-}
-
-void block_job_txn_del_job(BlockJob *job)
-{
-    if (job->txn) {
-        QLIST_REMOVE(&job->job, txn_list);
-        block_job_txn_unref(job->txn);
-        job->txn = NULL;
-    }
-}
-
 static void block_job_attached_aio_context(AioContext *new_context,
                                            void *opaque);
 static void block_job_detach_aio_context(void *opaque);
@@ -145,8 +90,6 @@ void block_job_free(Job *job)
     BlockJob *bjob = container_of(job, BlockJob, job);
     BlockDriverState *bs = blk_bs(bjob->blk);
 
-    assert(!bjob->txn);
-
     bs->job = NULL;
     block_job_remove_all_bdrv(bjob);
     blk_remove_aio_context_notifier(bjob->blk,
@@ -261,158 +204,6 @@ const BlockJobDriver *block_job_driver(BlockJob *job)
     return job->driver;
 }
 
-static int block_job_prepare(BlockJob *job)
-{
-    if (job->job.ret == 0 && job->driver->prepare) {
-        job->job.ret = job->driver->prepare(job);
-    }
-    return job->job.ret;
-}
-
-static void job_cancel_async(Job *job, bool force)
-{
-    if (job->user_paused) {
-        /* Do not call job_enter here, the caller will handle it.  */
-        job->user_paused = false;
-        if (job->driver->user_resume) {
-            job->driver->user_resume(job);
-        }
-        assert(job->pause_count > 0);
-        job->pause_count--;
-    }
-    job->cancelled = true;
-    /* To prevent 'force == false' overriding a previous 'force == true' */
-    job->force_cancel |= force;
-}
-
-static int block_job_txn_apply(JobTxn *txn, int fn(BlockJob *), bool lock)
-{
-    AioContext *ctx;
-    Job *job, *next;
-    BlockJob *bjob;
-    int rc = 0;
-
-    QLIST_FOREACH_SAFE(job, &txn->jobs, txn_list, next) {
-        assert(is_block_job(job));
-        bjob = container_of(job, BlockJob, job);
-
-        if (lock) {
-            ctx = job->aio_context;
-            aio_context_acquire(ctx);
-        }
-        rc = fn(bjob);
-        if (lock) {
-            aio_context_release(ctx);
-        }
-        if (rc) {
-            break;
-        }
-    }
-    return rc;
-}
-
-static void block_job_completed_txn_abort(BlockJob *job)
-{
-    AioContext *ctx;
-    JobTxn *txn = job->txn;
-    Job *other_job;
-
-    if (txn->aborting) {
-        /*
-         * We are cancelled by another job, which will handle everything.
-         */
-        return;
-    }
-    txn->aborting = true;
-    block_job_txn_ref(txn);
-
-    /* We are the first failed job. Cancel other jobs. */
-    QLIST_FOREACH(other_job, &txn->jobs, txn_list) {
-        ctx = other_job->aio_context;
-        aio_context_acquire(ctx);
-    }
-
-    /* Other jobs are effectively cancelled by us, set the status for
-     * them; this job, however, may or may not be cancelled, depending
-     * on the caller, so leave it. */
-    QLIST_FOREACH(other_job, &txn->jobs, txn_list) {
-        if (other_job != &job->job) {
-            job_cancel_async(other_job, false);
-        }
-    }
-    while (!QLIST_EMPTY(&txn->jobs)) {
-        other_job = QLIST_FIRST(&txn->jobs);
-        ctx = other_job->aio_context;
-        if (!job_is_completed(other_job)) {
-            assert(job_is_cancelled(other_job));
-            job_finish_sync(other_job, NULL, NULL);
-        }
-        job_finalize_single(other_job);
-        aio_context_release(ctx);
-    }
-
-    block_job_txn_unref(txn);
-}
-
-static int block_job_needs_finalize(BlockJob *job)
-{
-    return !job->job.auto_finalize;
-}
-
-static int block_job_finalize_single(BlockJob *job)
-{
-    return job_finalize_single(&job->job);
-}
-
-static void block_job_do_finalize(BlockJob *job)
-{
-    int rc;
-    assert(job && job->txn);
-
-    /* prepare the transaction to complete */
-    rc = block_job_txn_apply(job->txn, block_job_prepare, true);
-    if (rc) {
-        block_job_completed_txn_abort(job);
-    } else {
-        block_job_txn_apply(job->txn, block_job_finalize_single, true);
-    }
-}
-
-static int block_job_transition_to_pending(BlockJob *job)
-{
-    job_state_transition(&job->job, JOB_STATUS_PENDING);
-    if (!job->job.auto_finalize) {
-        job_event_pending(&job->job);
-    }
-    return 0;
-}
-
-static void block_job_completed_txn_success(BlockJob *job)
-{
-    JobTxn *txn = job->txn;
-    Job *other_job;
-
-    job_state_transition(&job->job, JOB_STATUS_WAITING);
-
-    /*
-     * Successful completion, see if there are other running jobs in this
-     * txn.
-     */
-    QLIST_FOREACH(other_job, &txn->jobs, txn_list) {
-        if (!job_is_completed(other_job)) {
-            return;
-        }
-        assert(other_job->ret == 0);
-    }
-
-    block_job_txn_apply(txn, block_job_transition_to_pending, false);
-
-    /* If no jobs need manual finalization, automatically do so */
-    if (block_job_txn_apply(txn, block_job_needs_finalize, false) == 0) {
-        block_job_do_finalize(job);
-    }
-}
-
 /* Assumes the job_mutex is held */
 static bool job_timer_pending(Job *job)
 {
@@ -451,15 +242,6 @@ int64_t block_job_ratelimit_get_delay(BlockJob *job, uint64_t n)
     return ratelimit_calculate_delay(&job->limit, n);
 }
 
-void block_job_finalize(BlockJob *job, Error **errp)
-{
-    assert(job && job->job.id);
-    if (job_apply_verb(&job->job, JOB_VERB_FINALIZE, errp)) {
-        return;
-    }
-    block_job_do_finalize(job);
-}
-
 void block_job_dismiss(BlockJob **jobptr, Error **errp)
 {
     BlockJob *job = *jobptr;
@@ -483,7 +265,7 @@ void block_job_cancel(BlockJob *job, bool force)
     if (!job_started(&job->job)) {
         block_job_completed(job, -ECANCELED);
     } else if (job->job.deferred_to_main_loop) {
-        block_job_completed_txn_abort(job);
+        job_completed_txn_abort(&job->job);
     } else {
         block_job_enter(job);
     }
@@ -656,7 +438,7 @@ void *block_job_create(const char *job_id, const BlockJobDriver *driver,
         return NULL;
     }
 
-    job = job_create(job_id, &driver->job_driver, blk_get_aio_context(blk),
+    job = job_create(job_id, &driver->job_driver, txn, blk_get_aio_context(blk),
                      flags, cb, opaque, errp);
     if (job == NULL) {
         blk_unref(blk);
@@ -703,30 +485,20 @@ void *block_job_create(const char *job_id, const BlockJobDriver *driver,
         }
     }
 
-    /* Single jobs are modeled as single-job transactions for sake of
-     * consolidating the job management logic */
-    if (!txn) {
-        txn = block_job_txn_new();
-        block_job_txn_add_job(txn, job);
-        block_job_txn_unref(txn);
-    } else {
-        block_job_txn_add_job(txn, job);
-    }
-
     return job;
 }
 
 void block_job_completed(BlockJob *job, int ret)
 {
-    assert(job && job->txn && !job_is_completed(&job->job));
+    assert(job && job->job.txn && !job_is_completed(&job->job));
     assert(blk_bs(job->blk)->job == job);
     job->job.ret = ret;
     job_update_rc(&job->job);
     trace_block_job_completed(job, ret, job->job.ret);
     if (job->job.ret) {
-        block_job_completed_txn_abort(job);
+        job_completed_txn_abort(&job->job);
     } else {
-        block_job_completed_txn_success(job);
+        job_completed_txn_success(&job->job);
     }
 }
 
diff --git a/job.c b/job.c
index 7c9a3a7b47..ee0bf42c5d 100644
--- a/job.c
+++ b/job.c
@@ -60,6 +60,19 @@ bool JobVerbTable[JOB_VERB__MAX][JOB_STATUS__MAX] = {
     [JOB_VERB_DISMISS]              = {0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0},
 };
 
+/* Transactional group of jobs */
+struct JobTxn {
+
+    /* Is this txn being cancelled? */
+    bool aborting;
+
+    /* List of jobs */
+    QLIST_HEAD(, Job) jobs;
+
+    /* Reference count */
+    int refcnt;
+};
+
 /* Right now, this mutex is only needed to synchronize accesses to job->busy
  * and job->sleep_timer, such as concurrent calls to job_do_yield and
  * job_enter. */
@@ -80,6 +93,71 @@ static void __attribute__((__constructor__)) job_init(void)
     qemu_mutex_init(&job_mutex);
 }
 
+JobTxn *job_txn_new(void)
+{
+    JobTxn *txn = g_new0(JobTxn, 1);
+    QLIST_INIT(&txn->jobs);
+    txn->refcnt = 1;
+    return txn;
+}
+
+static void job_txn_ref(JobTxn *txn)
+{
+    txn->refcnt++;
+}
+
+void job_txn_unref(JobTxn *txn)
+{
+    if (txn && --txn->refcnt == 0) {
+        g_free(txn);
+    }
+}
+
+void job_txn_add_job(JobTxn *txn, Job *job)
+{
+    if (!txn) {
+        return;
+    }
+
+    assert(!job->txn);
+    job->txn = txn;
+
+    QLIST_INSERT_HEAD(&txn->jobs, job, txn_list);
+    job_txn_ref(txn);
+}
+
+static void job_txn_del_job(Job *job)
+{
+    if (job->txn) {
+        QLIST_REMOVE(job, txn_list);
+        job_txn_unref(job->txn);
+        job->txn = NULL;
+    }
+}
+
+static int job_txn_apply(JobTxn *txn, int fn(Job *), bool lock)
+{
+    AioContext *ctx;
+    Job *job, *next;
+    int rc = 0;
+
+    QLIST_FOREACH_SAFE(job, &txn->jobs, txn_list, next) {
+        if (lock) {
+            ctx = job->aio_context;
+            aio_context_acquire(ctx);
+        }
+        rc = fn(job);
+        if (lock) {
+            aio_context_release(ctx);
+        }
+        if (rc) {
+            break;
+        }
+    }
+    return rc;
+}
+
+
 /* TODO Make static once the whole state machine is in job.c */
 void job_state_transition(Job *job, JobStatus s1)
 {
@@ -181,8 +259,9 @@ static void job_sleep_timer_cb(void *opaque)
     job_enter(job);
 }
 
-void *job_create(const char *job_id, const JobDriver *driver, AioContext *ctx,
-                 int flags, BlockCompletionFunc *cb, void *opaque, Error **errp)
+void *job_create(const char *job_id, const JobDriver *driver, JobTxn *txn,
+                 AioContext *ctx, int flags, BlockCompletionFunc *cb,
+                 void *opaque, Error **errp)
 {
     Job *job;
 
@@ -228,6 +307,16 @@ void *job_create(const char *job_id, const JobDriver *driver, AioContext *ctx,
 
     QLIST_INSERT_HEAD(&jobs, job, job_list);
 
+    /* Single jobs are modeled as single-job transactions for sake of
+     * consolidating the job management logic */
+    if (!txn) {
+        txn = job_txn_new();
+        job_txn_add_job(txn, job);
+        job_txn_unref(txn);
+    } else {
+        job_txn_add_job(txn, job);
+    }
+
     return job;
 }
 
@@ -241,6 +330,7 @@ void job_unref(Job *job)
     if (--job->refcnt == 0) {
         assert(job->status == JOB_STATUS_NULL);
         assert(!timer_pending(&job->sleep_timer));
+        assert(!job->txn);
 
         if (job->driver->free) {
             job->driver->free(job);
@@ -263,7 +353,7 @@ void job_event_completed(Job *job)
     notifier_list_notify(&job->on_finalize_completed, job);
 }
 
-void job_event_pending(Job *job)
+static void job_event_pending(Job *job)
 {
     notifier_list_notify(&job->on_pending, job);
 }
@@ -469,8 +559,7 @@ void job_do_dismiss(Job *job)
     job->paused = false;
     job->deferred_to_main_loop = true;
 
-    /* TODO Don't assume it's a BlockJob */
-    block_job_txn_del_job((BlockJob*) job);
+    job_txn_del_job(job);
 
     job_state_transition(job, JOB_STATUS_NULL);
     job_unref(job);
@@ -523,7 +612,7 @@ static void job_clean(Job *job)
     }
 }
 
-int job_finalize_single(Job *job)
+static int job_finalize_single(Job *job)
 {
     assert(job_is_completed(job));
 
@@ -550,12 +639,141 @@ int job_finalize_single(Job *job)
         }
     }
 
-    /* TODO Don't assume it's a BlockJob */
-    block_job_txn_del_job((BlockJob*) job);
+    job_txn_del_job(job);
     job_conclude(job);
     return 0;
 }
 
+void job_cancel_async(Job *job, bool force)
+{
+    if (job->user_paused) {
+        /* Do not call job_enter here, the caller will handle it.  */
+        job->user_paused = false;
+        if (job->driver->user_resume) {
+            job->driver->user_resume(job);
+        }
+        assert(job->pause_count > 0);
+        job->pause_count--;
+    }
+    job->cancelled = true;
+    /* To prevent 'force == false' overriding a previous 'force == true' */
+    job->force_cancel |= force;
+}
+
+void job_completed_txn_abort(Job *job)
+{
+    AioContext *ctx;
+    JobTxn *txn = job->txn;
+    Job *other_job;
+
+    if (txn->aborting) {
+        /*
+         * We are cancelled by another job, which will handle everything.
+         */
+        return;
+    }
+    txn->aborting = true;
+    job_txn_ref(txn);
+
+    /* We are the first failed job. Cancel other jobs. */
+    QLIST_FOREACH(other_job, &txn->jobs, txn_list) {
+        ctx = other_job->aio_context;
+        aio_context_acquire(ctx);
+    }
+
+    /* Other jobs are effectively cancelled by us, set the status for
+     * them; this job, however, may or may not be cancelled, depending
+     * on the caller, so leave it. */
+    QLIST_FOREACH(other_job, &txn->jobs, txn_list) {
+        if (other_job != job) {
+            job_cancel_async(other_job, false);
+        }
+    }
+    while (!QLIST_EMPTY(&txn->jobs)) {
+        other_job = QLIST_FIRST(&txn->jobs);
+        ctx = other_job->aio_context;
+        if (!job_is_completed(other_job)) {
+            assert(job_is_cancelled(other_job));
+            job_finish_sync(other_job, NULL, NULL);
+        }
+        job_finalize_single(other_job);
+        aio_context_release(ctx);
+    }
+
+    job_txn_unref(txn);
+}
+
+static int job_prepare(Job *job)
+{
+    if (job->ret == 0 && job->driver->prepare) {
+        job->ret = job->driver->prepare(job);
+    }
+    return job->ret;
+}
+
+static int job_needs_finalize(Job *job)
+{
+    return !job->auto_finalize;
+}
+
+static void job_do_finalize(Job *job)
+{
+    int rc;
+    assert(job && job->txn);
+
+    /* prepare the transaction to complete */
+    rc = job_txn_apply(job->txn, job_prepare, true);
+    if (rc) {
+        job_completed_txn_abort(job);
+    } else {
+        job_txn_apply(job->txn, job_finalize_single, true);
+    }
+}
+
+void job_finalize(Job *job, Error **errp)
+{
+    assert(job && job->id);
+    if (job_apply_verb(job, JOB_VERB_FINALIZE, errp)) {
+        return;
+    }
+    job_do_finalize(job);
+}
+
+static int job_transition_to_pending(Job *job)
+{
+    job_state_transition(job, JOB_STATUS_PENDING);
+    if (!job->auto_finalize) {
+        job_event_pending(job);
+    }
+    return 0;
+}
+
+void job_completed_txn_success(Job *job)
+{
+    JobTxn *txn = job->txn;
+    Job *other_job;
+
+    job_state_transition(job, JOB_STATUS_WAITING);
+
+    /*
+     * Successful completion, see if there are other running jobs in this
+     * txn.
+     */
+    QLIST_FOREACH(other_job, &txn->jobs, txn_list) {
+        if (!job_is_completed(other_job)) {
+            return;
+        }
+        assert(other_job->ret == 0);
+    }
+
+    job_txn_apply(txn, job_transition_to_pending, false);
+
+    /* If no jobs need manual finalization, automatically do so */
+    if (job_txn_apply(txn, job_needs_finalize, false) == 0) {
+        job_do_finalize(job);
+    }
+}
+
 void job_complete(Job *job, Error **errp)
 {
     /* Should not be reachable via external interface for internal jobs */
diff --git a/tests/test-blockjob-txn.c b/tests/test-blockjob-txn.c
index ec5d592b68..6ee31d59ad 100644
--- a/tests/test-blockjob-txn.c
+++ b/tests/test-blockjob-txn.c
@@ -125,7 +125,7 @@ static void test_single_job(int expected)
     JobTxn *txn;
     int result = -EINPROGRESS;
 
-    txn = block_job_txn_new();
+    txn = job_txn_new();
     job = test_block_job_start(1, true, expected, &result, txn);
     job_start(&job->job);
 
@@ -138,7 +138,7 @@ static void test_single_job(int expected)
     }
     g_assert_cmpint(result, ==, expected);
 
-    block_job_txn_unref(txn);
+    job_txn_unref(txn);
 }
 
 static void test_single_job_success(void)
@@ -164,7 +164,7 @@ static void test_pair_jobs(int expected1, int expected2)
     int result1 = -EINPROGRESS;
     int result2 = -EINPROGRESS;
 
-    txn = block_job_txn_new();
+    txn = job_txn_new();
     job1 = test_block_job_start(1, true, expected1, &result1, txn);
     job2 = test_block_job_start(2, true, expected2, &result2, txn);
     job_start(&job1->job);
@@ -173,7 +173,7 @@ static void test_pair_jobs(int expected1, int expected2)
     /* Release our reference now to trigger as many nice
      * use-after-free bugs as possible.
      */
-    block_job_txn_unref(txn);
+    job_txn_unref(txn);
 
     if (expected1 == -ECANCELED) {
         block_job_cancel(job1, false);
@@ -226,7 +226,7 @@ static void test_pair_jobs_fail_cancel_race(void)
     int result1 = -EINPROGRESS;
     int result2 = -EINPROGRESS;
 
-    txn = block_job_txn_new();
+    txn = job_txn_new();
     job1 = test_block_job_start(1, true, -ECANCELED, &result1, txn);
     job2 = test_block_job_start(2, false, 0, &result2, txn);
     job_start(&job1->job);
@@ -247,7 +247,7 @@ static void test_pair_jobs_fail_cancel_race(void)
     g_assert_cmpint(result1, ==, -ECANCELED);
     g_assert_cmpint(result2, ==, -ECANCELED);
 
-    block_job_txn_unref(txn);
+    job_txn_unref(txn);
 }
 
 int main(int argc, char **argv)
diff --git a/tests/test-blockjob.c b/tests/test-blockjob.c
index e44c608327..1e052c2e9c 100644
--- a/tests/test-blockjob.c
+++ b/tests/test-blockjob.c
@@ -364,7 +364,7 @@ static void test_cancel_concluded(void)
     }
     assert(job->job.status == JOB_STATUS_PENDING);
 
-    block_job_finalize(job, &error_abort);
+    job_finalize(&job->job, &error_abort);
     assert(job->job.status == JOB_STATUS_CONCLUDED);
 
     cancel_common(s);
-- 
2.13.6

  parent reply	other threads:[~2018-05-18 13:22 UTC|newest]

Thread overview: 86+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2018-05-18 13:20 [Qemu-devel] [PATCH v2 00/40] Generic background jobs Kevin Wolf
2018-05-18 13:20 ` [Qemu-devel] [PATCH v2 01/40] blockjob: Update block-job-pause/resume documentation Kevin Wolf
2018-05-18 14:20   ` Eric Blake
2018-05-18 17:12   ` John Snow
2018-05-18 13:20 ` [Qemu-devel] [PATCH v2 02/40] blockjob: Improve BlockJobInfo.offset/len documentation Kevin Wolf
2018-05-18 14:25   ` Eric Blake
2018-05-18 17:47   ` John Snow
2018-05-18 13:20 ` [Qemu-devel] [PATCH v2 03/40] job: Create Job, JobDriver and job_create() Kevin Wolf
2018-05-18 13:20 ` [Qemu-devel] [PATCH v2 04/40] job: Rename BlockJobType into JobType Kevin Wolf
2018-05-18 13:20 ` [Qemu-devel] [PATCH v2 05/40] job: Add JobDriver.job_type Kevin Wolf
2018-05-18 13:20 ` [Qemu-devel] [PATCH v2 06/40] job: Add job_delete() Kevin Wolf
2018-05-18 13:20 ` [Qemu-devel] [PATCH v2 07/40] job: Maintain a list of all jobs Kevin Wolf
2018-05-18 13:20 ` [Qemu-devel] [PATCH v2 08/40] job: Move state transitions to Job Kevin Wolf
2018-05-18 14:36   ` Eric Blake
2018-05-18 13:20 ` [Qemu-devel] [PATCH v2 09/40] job: Add reference counting Kevin Wolf
2018-05-18 13:20 ` [Qemu-devel] [PATCH v2 10/40] job: Move cancelled to Job Kevin Wolf
2018-05-18 13:20 ` [Qemu-devel] [PATCH v2 11/40] job: Add Job.aio_context Kevin Wolf
2018-05-18 13:20 ` [Qemu-devel] [PATCH v2 12/40] job: Move defer_to_main_loop to Job Kevin Wolf
2018-05-18 17:56   ` John Snow
2018-05-18 13:20 ` [Qemu-devel] [PATCH v2 13/40] job: Move coroutine and related code " Kevin Wolf
2018-05-18 18:43   ` John Snow
2018-05-18 13:20 ` [Qemu-devel] [PATCH v2 14/40] job: Add job_sleep_ns() Kevin Wolf
2018-05-18 13:20 ` [Qemu-devel] [PATCH v2 15/40] job: Move pause/resume functions to Job Kevin Wolf
2018-05-18 13:20 ` [Qemu-devel] [PATCH v2 16/40] job: Replace BlockJob.completed with job_is_completed() Kevin Wolf
2018-05-18 13:20 ` [Qemu-devel] [PATCH v2 17/40] job: Move BlockJobCreateFlags to Job Kevin Wolf
2018-05-23 22:24   ` John Snow
2018-05-24  8:17     ` Kevin Wolf
2018-05-24 17:46       ` John Snow
2018-05-18 13:20 ` [Qemu-devel] [PATCH v2 18/40] blockjob: Split block_job_event_pending() Kevin Wolf
2018-05-18 13:20 ` [Qemu-devel] [PATCH v2 19/40] job: Add job_event_*() Kevin Wolf
2018-05-18 13:20 ` [Qemu-devel] [PATCH v2 20/40] job: Move single job finalisation to Job Kevin Wolf
2018-05-18 18:00   ` Eric Blake
2018-05-18 13:20 ` [Qemu-devel] [PATCH v2 21/40] job: Convert block_job_cancel_async() " Kevin Wolf
2018-05-23 23:18   ` John Snow
2018-05-24  8:24     ` Kevin Wolf
2018-05-24 17:42       ` John Snow
2018-05-25  8:00         ` Kevin Wolf
2018-05-25 17:43           ` John Snow
2018-05-29 11:59           ` [Qemu-devel] [Qemu-block] " Kashyap Chamarthy
2018-05-29 12:30             ` Max Reitz
2018-05-29 13:10               ` Kashyap Chamarthy
2018-05-29 13:22                 ` Kashyap Chamarthy
2018-05-30 20:33               ` John Snow
2018-05-18 13:20 ` [Qemu-devel] [PATCH v2 22/40] job: Add job_drain() Kevin Wolf
2018-05-18 13:20 ` [Qemu-devel] [PATCH v2 23/40] job: Move .complete callback to Job Kevin Wolf
2018-05-18 13:20 ` [Qemu-devel] [PATCH v2 24/40] job: Move job_finish_sync() " Kevin Wolf
2018-05-18 13:20 ` [Qemu-devel] [PATCH v2 25/40] job: Switch transactions to JobTxn Kevin Wolf
2018-05-18 13:21 ` Kevin Wolf [this message]
2018-05-18 13:21 ` [Qemu-devel] [PATCH v2 27/40] job: Move completion and cancellation to Job Kevin Wolf
2018-05-18 13:21 ` [Qemu-devel] [PATCH v2 28/40] block: Cancel job in bdrv_close_all() callers Kevin Wolf
2018-05-18 13:21 ` [Qemu-devel] [PATCH v2 29/40] job: Add job_yield() Kevin Wolf
2018-05-18 13:21 ` [Qemu-devel] [PATCH v2 30/40] job: Add job_dismiss() Kevin Wolf
2018-05-18 13:21 ` [Qemu-devel] [PATCH v2 31/40] job: Add job_is_ready() Kevin Wolf
2018-05-23 23:42   ` John Snow
2018-05-24  8:30     ` Kevin Wolf
2018-05-24 17:25       ` John Snow
2018-05-25  8:06         ` Kevin Wolf
2018-05-18 13:21 ` [Qemu-devel] [PATCH v2 32/40] job: Add job_transition_to_ready() Kevin Wolf
2018-05-18 13:21 ` [Qemu-devel] [PATCH v2 33/40] job: Move progress fields to Job Kevin Wolf
2018-05-18 13:21 ` [Qemu-devel] [PATCH v2 34/40] job: Introduce qapi/job.json Kevin Wolf
2018-05-18 15:59   ` Eric Blake
2018-05-31 21:21   ` Eric Blake
2018-05-18 13:21 ` [Qemu-devel] [PATCH v2 35/40] job: Add JOB_STATUS_CHANGE QMP event Kevin Wolf
2018-05-18 17:55   ` Eric Blake
2018-05-24  0:02   ` John Snow
2018-05-24  8:36     ` Kevin Wolf
2018-05-24 17:36       ` John Snow
2018-05-24 18:22         ` Eric Blake
2018-05-24 18:32           ` John Snow
2018-05-18 13:21 ` [Qemu-devel] [PATCH v2 36/40] job: Add lifecycle QMP commands Kevin Wolf
2018-05-18 18:12   ` Eric Blake
2018-05-22 10:40     ` Kevin Wolf
2018-05-23 23:56   ` John Snow
2018-05-18 13:21 ` [Qemu-devel] [PATCH v2 37/40] job: Add query-jobs QMP command Kevin Wolf
2018-05-18 18:14   ` Eric Blake
2018-05-18 18:22   ` Eric Blake
2018-05-22 10:44     ` Kevin Wolf
2018-05-18 13:21 ` [Qemu-devel] [PATCH v2 38/40] blockjob: Remove BlockJob.driver Kevin Wolf
2018-05-18 13:21 ` [Qemu-devel] [PATCH v2 39/40] iotests: Move qmp_to_opts() to VM Kevin Wolf
2018-05-18 13:21 ` [Qemu-devel] [PATCH v2 40/40] qemu-iotests: Test job-* with block jobs Kevin Wolf
2018-05-18 14:05 ` [Qemu-devel] [PATCH v2 00/40] Generic background jobs no-reply
2018-05-18 18:41 ` Dr. David Alan Gilbert
2018-05-22 11:01   ` Kevin Wolf
2018-05-22 17:15     ` Marc-André Lureau
2018-05-29 17:16       ` Dr. David Alan Gilbert
2018-05-23 12:31 ` Kevin Wolf

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20180518132114.4070-27-kwolf@redhat.com \
    --to=kwolf@redhat.com \
    --cc=armbru@redhat.com \
    --cc=eblake@redhat.com \
    --cc=jcody@redhat.com \
    --cc=jsnow@redhat.com \
    --cc=mreitz@redhat.com \
    --cc=qemu-block@nongnu.org \
    --cc=qemu-devel@nongnu.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.