From mboxrd@z Thu Jan 1 00:00:00 1970 Received: from eggs.gnu.org ([2001:4830:134:3::10]:52832) by lists.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1fGRwh-0005yN-IS for qemu-devel@nongnu.org; Wed, 09 May 2018 12:28:06 -0400 Received: from Debian-exim by eggs.gnu.org with spam-scanned (Exim 4.71) (envelope-from ) id 1fGRwe-0002pi-Gx for qemu-devel@nongnu.org; Wed, 09 May 2018 12:28:03 -0400 From: Kevin Wolf Date: Wed, 9 May 2018 18:26:26 +0200 Message-Id: <20180509162637.15575-32-kwolf@redhat.com> In-Reply-To: <20180509162637.15575-1-kwolf@redhat.com> References: <20180509162637.15575-1-kwolf@redhat.com> Subject: [Qemu-devel] [PATCH 31/42] job: Move transactions to Job List-Id: List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: qemu-block@nongnu.org Cc: kwolf@redhat.com, mreitz@redhat.com, eblake@redhat.com, jsnow@redhat.com, armbru@redhat.com, jcody@redhat.com, qemu-devel@nongnu.org This moves the logic that implements job transactions from BlockJob to Job. Signed-off-by: Kevin Wolf --- include/block/blockjob.h | 54 ---------- include/block/blockjob_int.h | 10 -- include/qemu/job.h | 71 +++++++++++-- blockdev.c | 6 +- blockjob.c | 238 +------------------------------------------ job.c | 235 ++++++++++++++++++++++++++++++++++++++++-- tests/test-blockjob-txn.c | 12 +-- tests/test-blockjob.c | 2 +- 8 files changed, 304 insertions(+), 324 deletions(-) diff --git a/include/block/blockjob.h b/include/block/blockjob.h index fbb8f54dc6..e8e9e5f370 100644 --- a/include/block/blockjob.h +++ b/include/block/blockjob.h @@ -33,7 +33,6 @@ #define SLICE_TIME 100000000ULL /* ns */ typedef struct BlockJobDriver BlockJobDriver; -typedef struct JobTxn JobTxn; /** * BlockJob: @@ -84,8 +83,6 @@ typedef struct BlockJob { /** BlockDriverStates that are involved in this block job */ GSList *nodes; - - JobTxn *txn; } BlockJob; /** @@ -153,22 +150,6 @@ void block_job_set_speed(BlockJob *job, int64_t speed, Error **errp); void block_job_cancel(BlockJob *job, bool force); /** - * block_job_finalize: - * @job: The job to fully commit and finish. - * @errp: Error object. - * - * For jobs that have finished their work and are pending - * awaiting explicit acknowledgement to commit their work, - * This will commit that work. - * - * FIXME: Make the below statement universally true: - * For jobs that support the manual workflow mode, all graph - * changes that occur as a result will occur after this command - * and before a successful reply. - */ -void block_job_finalize(BlockJob *job, Error **errp); - -/** * block_job_dismiss: * @job: The job to be dismissed. * @errp: Error object. @@ -260,41 +241,6 @@ int block_job_complete_sync(BlockJob *job, Error **errp); void block_job_iostatus_reset(BlockJob *job); /** - * block_job_txn_new: - * - * Allocate and return a new block job transaction. Jobs can be added to the - * transaction using block_job_txn_add_job(). - * - * The transaction is automatically freed when the last job completes or is - * cancelled. - * - * All jobs in the transaction either complete successfully or fail/cancel as a - * group. Jobs wait for each other before completing. Cancelling one job - * cancels all jobs in the transaction. - */ -JobTxn *block_job_txn_new(void); - -/** - * block_job_txn_unref: - * - * Release a reference that was previously acquired with block_job_txn_add_job - * or block_job_txn_new. If it's the last reference to the object, it will be - * freed. - */ -void block_job_txn_unref(JobTxn *txn); - -/** - * block_job_txn_add_job: - * @txn: The transaction (may be NULL) - * @job: Job to add to the transaction - * - * Add @job to the transaction. The @job must not already be in a transaction. - * The caller must call either block_job_txn_unref() or block_job_completed() - * to release the reference that is automatically grabbed here. - */ -void block_job_txn_add_job(JobTxn *txn, BlockJob *job); - -/** * block_job_is_internal: * @job: The job to determine if it is user-visible or not. * diff --git a/include/block/blockjob_int.h b/include/block/blockjob_int.h index ce66a9b51c..29a28020ac 100644 --- a/include/block/blockjob_int.h +++ b/include/block/blockjob_int.h @@ -38,16 +38,6 @@ struct BlockJobDriver { /** Generic JobDriver callbacks and settings */ JobDriver job_driver; - /** - * If the callback is not NULL, prepare will be invoked when all the jobs - * belonging to the same transaction complete; or upon this job's completion - * if it is not in a transaction. - * - * This callback will not be invoked if the job has already failed. - * If it fails, abort and then clean will be called. - */ - int (*prepare)(BlockJob *job); - /* * If the callback is not NULL, it will be invoked before the job is * resumed in a new AioContext. This is the place to move any resources diff --git a/include/qemu/job.h b/include/qemu/job.h index 614a2dea92..84a9eb7980 100644 --- a/include/qemu/job.h +++ b/include/qemu/job.h @@ -32,6 +32,8 @@ #include "block/aio.h" typedef struct JobDriver JobDriver; +typedef struct JobTxn JobTxn; + /** * Long-running operation. @@ -133,6 +135,9 @@ typedef struct Job { /** Element of the list of jobs */ QLIST_ENTRY(Job) job_list; + /** Transaction this job is part of */ + JobTxn *txn; + /** Element of the list of jobs in a job transaction */ QLIST_ENTRY(Job) txn_list; } Job; @@ -184,6 +189,16 @@ struct JobDriver { void (*drain)(Job *job); /** + * If the callback is not NULL, prepare will be invoked when all the jobs + * belonging to the same transaction complete; or upon this job's completion + * if it is not in a transaction. + * + * This callback will not be invoked if the job has already failed. + * If it fails, abort and then clean will be called. + */ + int (*prepare)(Job *job); + + /** * If the callback is not NULL, it will be invoked when all the jobs * belonging to the same transaction complete; or upon this job's * completion if it is not in a transaction. Skipped if NULL. @@ -227,20 +242,52 @@ typedef enum JobCreateFlags { JOB_MANUAL_DISMISS = 0x04, } JobCreateFlags; +/** + * Allocate and return a new job transaction. Jobs can be added to the + * transaction using job_txn_add_job(). + * + * The transaction is automatically freed when the last job completes or is + * cancelled. + * + * All jobs in the transaction either complete successfully or fail/cancel as a + * group. Jobs wait for each other before completing. Cancelling one job + * cancels all jobs in the transaction. + */ +JobTxn *job_txn_new(void); + +/** + * Release a reference that was previously acquired with job_txn_add_job or + * job_txn_new. If it's the last reference to the object, it will be freed. + */ +void job_txn_unref(JobTxn *txn); + +/** + * @txn: The transaction (may be NULL) + * @job: Job to add to the transaction + * + * Add @job to the transaction. The @job must not already be in a transaction. + * The caller must call either block_job_txn_unref() or block_job_completed() + * to release the reference that is automatically grabbed here. + * + * If @txn is NULL, the function does nothing. + */ +void job_txn_add_job(JobTxn *txn, Job *job); /** * Create a new long-running job and return it. * * @job_id: The id of the newly-created job, or %NULL for internal jobs * @driver: The class object for the newly-created job. + * @txn: The transaction this job belongs to, if any. %NULL otherwise. * @ctx: The AioContext to run the job coroutine in. * @flags: Creation flags for the job. See @JobCreateFlags. * @cb: Completion function for the job. * @opaque: Opaque pointer value passed to @cb. * @errp: Error object. */ -void *job_create(const char *job_id, const JobDriver *driver, AioContext *ctx, - int flags, BlockCompletionFunc *cb, void *opaque, Error **errp); +void *job_create(const char *job_id, const JobDriver *driver, JobTxn *txn, + AioContext *ctx, int flags, BlockCompletionFunc *cb, + void *opaque, Error **errp); /** * Add a reference to Job refcnt, it will be decreased with job_unref, and then @@ -260,9 +307,6 @@ void job_event_cancelled(Job *job); /** To be called when a successfully completed job is finalised. */ void job_event_completed(Job *job); -/** To be called when the job transitions to PENDING */ -void job_event_pending(Job *job); - /** * Conditionally enter the job coroutine if the job is ready to run, not * already busy and fn() returns true. fn() is called while under the job_lock @@ -375,6 +419,16 @@ void job_early_fail(Job *job); /** Asynchronously complete the specified @job. */ void job_complete(Job *job, Error **errp);; +/** + * For a @job that has finished its work and is pending awaiting explicit + * acknowledgement to commit its work, this will commit that work. + * + * FIXME: Make the below statement universally true: + * For jobs that support the manual workflow mode, all graph changes that occur + * as a result will occur after this command and before a successful reply. + */ +void job_finalize(Job *job, Error **errp); + typedef void JobDeferToMainLoopFn(Job *job, void *opaque); /** @@ -407,10 +461,9 @@ void coroutine_fn job_do_yield(Job *job, uint64_t ns); bool job_should_pause(Job *job); bool job_started(Job *job); void job_do_dismiss(Job *job); -int job_finalize_single(Job *job); void job_update_rc(Job *job); - -typedef struct BlockJob BlockJob; -void block_job_txn_del_job(BlockJob *job); +void job_cancel_async(Job *job, bool force); +void job_completed_txn_abort(Job *job); +void job_completed_txn_success(Job *job); #endif diff --git a/blockdev.c b/blockdev.c index 817c3848c0..87a23bed6f 100644 --- a/blockdev.c +++ b/blockdev.c @@ -2255,7 +2255,7 @@ void qmp_transaction(TransactionActionList *dev_list, */ props = get_transaction_properties(props); if (props->completion_mode != ACTION_COMPLETION_MODE_INDIVIDUAL) { - block_job_txn = block_job_txn_new(); + block_job_txn = job_txn_new(); } /* drain all i/o before any operations */ @@ -2314,7 +2314,7 @@ exit: if (!has_props) { qapi_free_TransactionProperties(props); } - block_job_txn_unref(block_job_txn); + job_txn_unref(block_job_txn); } void qmp_eject(bool has_device, const char *device, @@ -3908,7 +3908,7 @@ void qmp_block_job_finalize(const char *id, Error **errp) } trace_qmp_block_job_finalize(job); - block_job_finalize(job, errp); + job_finalize(&job->job, errp); aio_context_release(aio_context); } diff --git a/blockjob.c b/blockjob.c index cc8b98dee0..fd096e0f0c 100644 --- a/blockjob.c +++ b/blockjob.c @@ -36,19 +36,6 @@ #include "qemu/coroutine.h" #include "qemu/timer.h" -/* Transactional group of block jobs */ -struct JobTxn { - - /* Is this txn being cancelled? */ - bool aborting; - - /* List of jobs */ - QLIST_HEAD(, Job) jobs; - - /* Reference count */ - int refcnt; -}; - /* * The block job API is composed of two categories of functions. * @@ -95,48 +82,6 @@ BlockJob *block_job_get(const char *id) } } -JobTxn *block_job_txn_new(void) -{ - JobTxn *txn = g_new0(JobTxn, 1); - QLIST_INIT(&txn->jobs); - txn->refcnt = 1; - return txn; -} - -static void block_job_txn_ref(JobTxn *txn) -{ - txn->refcnt++; -} - -void block_job_txn_unref(JobTxn *txn) -{ - if (txn && --txn->refcnt == 0) { - g_free(txn); - } -} - -void block_job_txn_add_job(JobTxn *txn, BlockJob *job) -{ - if (!txn) { - return; - } - - assert(!job->txn); - job->txn = txn; - - QLIST_INSERT_HEAD(&txn->jobs, &job->job, txn_list); - block_job_txn_ref(txn); -} - -void block_job_txn_del_job(BlockJob *job) -{ - if (job->txn) { - QLIST_REMOVE(&job->job, txn_list); - block_job_txn_unref(job->txn); - job->txn = NULL; - } -} - static void block_job_attached_aio_context(AioContext *new_context, void *opaque); static void block_job_detach_aio_context(void *opaque); @@ -146,8 +91,6 @@ void block_job_free(Job *job) BlockJob *bjob = container_of(job, BlockJob, job); BlockDriverState *bs = blk_bs(bjob->blk); - assert(!bjob->txn); - bs->job = NULL; block_job_remove_all_bdrv(bjob); blk_remove_aio_context_notifier(bjob->blk, @@ -262,158 +205,6 @@ const BlockJobDriver *block_job_driver(BlockJob *job) return job->driver; } -static int block_job_prepare(BlockJob *job) -{ - if (job->job.ret == 0 && job->driver->prepare) { - job->job.ret = job->driver->prepare(job); - } - return job->job.ret; -} - -static void job_cancel_async(Job *job, bool force) -{ - if (job->user_paused) { - /* Do not call job_enter here, the caller will handle it. */ - job->user_paused = false; - if (job->driver->user_resume) { - job->driver->user_resume(job); - } - assert(job->pause_count > 0); - job->pause_count--; - } - job->cancelled = true; - /* To prevent 'force == false' overriding a previous 'force == true' */ - job->force_cancel |= force; -} - -static int block_job_txn_apply(JobTxn *txn, int fn(BlockJob *), bool lock) -{ - AioContext *ctx; - Job *job, *next; - BlockJob *bjob; - int rc = 0; - - QLIST_FOREACH_SAFE(job, &txn->jobs, txn_list, next) { - assert(is_block_job(job)); - bjob = container_of(job, BlockJob, job); - - if (lock) { - ctx = job->aio_context; - aio_context_acquire(ctx); - } - rc = fn(bjob); - if (lock) { - aio_context_release(ctx); - } - if (rc) { - break; - } - } - return rc; -} - -static void block_job_completed_txn_abort(BlockJob *job) -{ - AioContext *ctx; - JobTxn *txn = job->txn; - Job *other_job; - - if (txn->aborting) { - /* - * We are cancelled by another job, which will handle everything. - */ - return; - } - txn->aborting = true; - block_job_txn_ref(txn); - - /* We are the first failed job. Cancel other jobs. */ - QLIST_FOREACH(other_job, &txn->jobs, txn_list) { - ctx = other_job->aio_context; - aio_context_acquire(ctx); - } - - /* Other jobs are effectively cancelled by us, set the status for - * them; this job, however, may or may not be cancelled, depending - * on the caller, so leave it. */ - QLIST_FOREACH(other_job, &txn->jobs, txn_list) { - if (other_job != &job->job) { - job_cancel_async(other_job, false); - } - } - while (!QLIST_EMPTY(&txn->jobs)) { - other_job = QLIST_FIRST(&txn->jobs); - ctx = other_job->aio_context; - if (!job_is_completed(other_job)) { - assert(job_is_cancelled(other_job)); - job_finish_sync(other_job, NULL, NULL); - } - job_finalize_single(other_job); - aio_context_release(ctx); - } - - block_job_txn_unref(txn); -} - -static int block_job_needs_finalize(BlockJob *job) -{ - return !job->job.auto_finalize; -} - -static int block_job_finalize_single(BlockJob *job) -{ - return job_finalize_single(&job->job); -} - -static void block_job_do_finalize(BlockJob *job) -{ - int rc; - assert(job && job->txn); - - /* prepare the transaction to complete */ - rc = block_job_txn_apply(job->txn, block_job_prepare, true); - if (rc) { - block_job_completed_txn_abort(job); - } else { - block_job_txn_apply(job->txn, block_job_finalize_single, true); - } -} - -static int block_job_transition_to_pending(BlockJob *job) -{ - job_state_transition(&job->job, JOB_STATUS_PENDING); - if (!job->job.auto_finalize) { - job_event_pending(&job->job); - } - return 0; -} - -static void block_job_completed_txn_success(BlockJob *job) -{ - JobTxn *txn = job->txn; - Job *other_job; - - job_state_transition(&job->job, JOB_STATUS_WAITING); - - /* - * Successful completion, see if there are other running jobs in this - * txn. - */ - QLIST_FOREACH(other_job, &txn->jobs, txn_list) { - if (!job_is_completed(other_job)) { - return; - } - assert(other_job->ret == 0); - } - - block_job_txn_apply(txn, block_job_transition_to_pending, false); - - /* If no jobs need manual finalization, automatically do so */ - if (block_job_txn_apply(txn, block_job_needs_finalize, false) == 0) { - block_job_do_finalize(job); - } -} - /* Assumes the block_job_mutex is held */ static bool job_timer_pending(Job *job) { @@ -452,15 +243,6 @@ int64_t block_job_ratelimit_get_delay(BlockJob *job, uint64_t n) return ratelimit_calculate_delay(&job->limit, n); } -void block_job_finalize(BlockJob *job, Error **errp) -{ - assert(job && job->job.id); - if (job_apply_verb(&job->job, JOB_VERB_FINALIZE, errp)) { - return; - } - block_job_do_finalize(job); -} - void block_job_dismiss(BlockJob **jobptr, Error **errp) { BlockJob *job = *jobptr; @@ -484,7 +266,7 @@ void block_job_cancel(BlockJob *job, bool force) if (!job_started(&job->job)) { block_job_completed(job, -ECANCELED); } else if (job->job.deferred_to_main_loop) { - block_job_completed_txn_abort(job); + job_completed_txn_abort(&job->job); } else { block_job_enter(job); } @@ -657,7 +439,7 @@ void *block_job_create(const char *job_id, const BlockJobDriver *driver, return NULL; } - job = job_create(job_id, &driver->job_driver, blk_get_aio_context(blk), + job = job_create(job_id, &driver->job_driver, txn, blk_get_aio_context(blk), flags, cb, opaque, errp); if (job == NULL) { blk_unref(blk); @@ -705,30 +487,20 @@ void *block_job_create(const char *job_id, const BlockJobDriver *driver, } } - /* Single jobs are modeled as single-job transactions for sake of - * consolidating the job management logic */ - if (!txn) { - txn = block_job_txn_new(); - block_job_txn_add_job(txn, job); - block_job_txn_unref(txn); - } else { - block_job_txn_add_job(txn, job); - } - return job; } void block_job_completed(BlockJob *job, int ret) { - assert(job && job->txn && !job_is_completed(&job->job)); + assert(job && job->job.txn && !job_is_completed(&job->job)); assert(blk_bs(job->blk)->job == job); job->job.ret = ret; job_update_rc(&job->job); trace_block_job_completed(job, ret, job->job.ret); if (job->job.ret) { - block_job_completed_txn_abort(job); + job_completed_txn_abort(&job->job); } else { - block_job_completed_txn_success(job); + job_completed_txn_success(&job->job); } } diff --git a/job.c b/job.c index 49dce57c9e..2d782859ac 100644 --- a/job.c +++ b/job.c @@ -60,6 +60,19 @@ bool JobVerbTable[JOB_VERB__MAX][JOB_STATUS__MAX] = { [JOB_VERB_DISMISS] = {0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0}, }; +/* Transactional group of jobs */ +struct JobTxn { + + /* Is this txn being cancelled? */ + bool aborting; + + /* List of jobs */ + QLIST_HEAD(, Job) jobs; + + /* Reference count */ + int refcnt; +}; + /* Right now, this mutex is only needed to synchronize accesses to job->busy * and job->sleep_timer, such as concurrent calls to job_do_yield and * job_enter. */ @@ -80,6 +93,71 @@ static void __attribute__((__constructor__)) job_init(void) qemu_mutex_init(&job_mutex); } +JobTxn *job_txn_new(void) +{ + JobTxn *txn = g_new0(JobTxn, 1); + QLIST_INIT(&txn->jobs); + txn->refcnt = 1; + return txn; +} + +static void job_txn_ref(JobTxn *txn) +{ + txn->refcnt++; +} + +void job_txn_unref(JobTxn *txn) +{ + if (txn && --txn->refcnt == 0) { + g_free(txn); + } +} + +void job_txn_add_job(JobTxn *txn, Job *job) +{ + if (!txn) { + return; + } + + assert(!job->txn); + job->txn = txn; + + QLIST_INSERT_HEAD(&txn->jobs, job, txn_list); + job_txn_ref(txn); +} + +static void job_txn_del_job(Job *job) +{ + if (job->txn) { + QLIST_REMOVE(job, txn_list); + job_txn_unref(job->txn); + job->txn = NULL; + } +} + +static int job_txn_apply(JobTxn *txn, int fn(Job *), bool lock) +{ + AioContext *ctx; + Job *job, *next; + int rc = 0; + + QLIST_FOREACH_SAFE(job, &txn->jobs, txn_list, next) { + if (lock) { + ctx = job->aio_context; + aio_context_acquire(ctx); + } + rc = fn(job); + if (lock) { + aio_context_release(ctx); + } + if (rc) { + break; + } + } + return rc; +} + + /* TODO Make static once the whole state machine is in job.c */ void job_state_transition(Job *job, JobStatus s1) { @@ -181,8 +259,9 @@ static void job_sleep_timer_cb(void *opaque) job_enter(job); } -void *job_create(const char *job_id, const JobDriver *driver, AioContext *ctx, - int flags, BlockCompletionFunc *cb, void *opaque, Error **errp) +void *job_create(const char *job_id, const JobDriver *driver, JobTxn *txn, + AioContext *ctx, int flags, BlockCompletionFunc *cb, + void *opaque, Error **errp) { Job *job; @@ -228,6 +307,16 @@ void *job_create(const char *job_id, const JobDriver *driver, AioContext *ctx, QLIST_INSERT_HEAD(&jobs, job, job_list); + /* Single jobs are modeled as single-job transactions for sake of + * consolidating the job management logic */ + if (!txn) { + txn = job_txn_new(); + job_txn_add_job(txn, job); + job_txn_unref(txn); + } else { + job_txn_add_job(txn, job); + } + return job; } @@ -241,6 +330,7 @@ void job_unref(Job *job) if (--job->refcnt == 0) { assert(job->status == JOB_STATUS_NULL); assert(!timer_pending(&job->sleep_timer)); + assert(!job->txn); if (job->driver->free) { job->driver->free(job); @@ -263,9 +353,10 @@ void job_event_completed(Job *job) notifier_list_notify(&job->on_finalize_completed, job); } -void job_event_pending(Job *job) +static int job_event_pending(Job *job) { notifier_list_notify(&job->on_pending, job); + return 0; } void job_enter_cond(Job *job, bool(*fn)(Job *job)) @@ -461,8 +552,7 @@ void job_do_dismiss(Job *job) job->paused = false; job->deferred_to_main_loop = true; - /* TODO Don't assume it's a BlockJob */ - block_job_txn_del_job((BlockJob*) job); + job_txn_del_job(job); job_state_transition(job, JOB_STATUS_NULL); job_unref(job); @@ -515,7 +605,7 @@ static void job_clean(Job *job) } } -int job_finalize_single(Job *job) +static int job_finalize_single(Job *job) { assert(job_is_completed(job)); @@ -542,12 +632,141 @@ int job_finalize_single(Job *job) } } - /* TODO Don't assume it's a BlockJob */ - block_job_txn_del_job((BlockJob*) job); + job_txn_del_job(job); job_conclude(job); return 0; } +void job_cancel_async(Job *job, bool force) +{ + if (job->user_paused) { + /* Do not call job_enter here, the caller will handle it. */ + job->user_paused = false; + if (job->driver->user_resume) { + job->driver->user_resume(job); + } + assert(job->pause_count > 0); + job->pause_count--; + } + job->cancelled = true; + /* To prevent 'force == false' overriding a previous 'force == true' */ + job->force_cancel |= force; +} + +void job_completed_txn_abort(Job *job) +{ + AioContext *ctx; + JobTxn *txn = job->txn; + Job *other_job; + + if (txn->aborting) { + /* + * We are cancelled by another job, which will handle everything. + */ + return; + } + txn->aborting = true; + job_txn_ref(txn); + + /* We are the first failed job. Cancel other jobs. */ + QLIST_FOREACH(other_job, &txn->jobs, txn_list) { + ctx = other_job->aio_context; + aio_context_acquire(ctx); + } + + /* Other jobs are effectively cancelled by us, set the status for + * them; this job, however, may or may not be cancelled, depending + * on the caller, so leave it. */ + QLIST_FOREACH(other_job, &txn->jobs, txn_list) { + if (other_job != job) { + job_cancel_async(other_job, false); + } + } + while (!QLIST_EMPTY(&txn->jobs)) { + other_job = QLIST_FIRST(&txn->jobs); + ctx = other_job->aio_context; + if (!job_is_completed(other_job)) { + assert(job_is_cancelled(other_job)); + job_finish_sync(other_job, NULL, NULL); + } + job_finalize_single(other_job); + aio_context_release(ctx); + } + + job_txn_unref(txn); +} + +static int job_prepare(Job *job) +{ + if (job->ret == 0 && job->driver->prepare) { + job->ret = job->driver->prepare(job); + } + return job->ret; +} + +static int job_needs_finalize(Job *job) +{ + return !job->auto_finalize; +} + +static void job_do_finalize(Job *job) +{ + int rc; + assert(job && job->txn); + + /* prepare the transaction to complete */ + rc = job_txn_apply(job->txn, job_prepare, true); + if (rc) { + job_completed_txn_abort(job); + } else { + job_txn_apply(job->txn, job_finalize_single, true); + } +} + +void job_finalize(Job *job, Error **errp) +{ + assert(job && job->id); + if (job_apply_verb(job, JOB_VERB_FINALIZE, errp)) { + return; + } + job_do_finalize(job); +} + +static int job_transition_to_pending(Job *job) +{ + job_state_transition(job, JOB_STATUS_PENDING); + if (!job->auto_finalize) { + job_event_pending(job); + } + return 0; +} + +void job_completed_txn_success(Job *job) +{ + JobTxn *txn = job->txn; + Job *other_job; + + job_state_transition(job, JOB_STATUS_WAITING); + + /* + * Successful completion, see if there are other running jobs in this + * txn. + */ + QLIST_FOREACH(other_job, &txn->jobs, txn_list) { + if (!job_is_completed(other_job)) { + return; + } + assert(other_job->ret == 0); + } + + job_txn_apply(txn, job_transition_to_pending, false); + + /* If no jobs need manual finalization, automatically do so */ + if (job_txn_apply(txn, job_needs_finalize, false) == 0) { + job_do_finalize(job); + } +} + void job_complete(Job *job, Error **errp) { /* Should not be reachable via external interface for internal jobs */ diff --git a/tests/test-blockjob-txn.c b/tests/test-blockjob-txn.c index ec5d592b68..6ee31d59ad 100644 --- a/tests/test-blockjob-txn.c +++ b/tests/test-blockjob-txn.c @@ -125,7 +125,7 @@ static void test_single_job(int expected) JobTxn *txn; int result = -EINPROGRESS; - txn = block_job_txn_new(); + txn = job_txn_new(); job = test_block_job_start(1, true, expected, &result, txn); job_start(&job->job); @@ -138,7 +138,7 @@ static void test_single_job(int expected) } g_assert_cmpint(result, ==, expected); - block_job_txn_unref(txn); + job_txn_unref(txn); } static void test_single_job_success(void) @@ -164,7 +164,7 @@ static void test_pair_jobs(int expected1, int expected2) int result1 = -EINPROGRESS; int result2 = -EINPROGRESS; - txn = block_job_txn_new(); + txn = job_txn_new(); job1 = test_block_job_start(1, true, expected1, &result1, txn); job2 = test_block_job_start(2, true, expected2, &result2, txn); job_start(&job1->job); @@ -173,7 +173,7 @@ static void test_pair_jobs(int expected1, int expected2) /* Release our reference now to trigger as many nice * use-after-free bugs as possible. */ - block_job_txn_unref(txn); + job_txn_unref(txn); if (expected1 == -ECANCELED) { block_job_cancel(job1, false); @@ -226,7 +226,7 @@ static void test_pair_jobs_fail_cancel_race(void) int result1 = -EINPROGRESS; int result2 = -EINPROGRESS; - txn = block_job_txn_new(); + txn = job_txn_new(); job1 = test_block_job_start(1, true, -ECANCELED, &result1, txn); job2 = test_block_job_start(2, false, 0, &result2, txn); job_start(&job1->job); @@ -247,7 +247,7 @@ static void test_pair_jobs_fail_cancel_race(void) g_assert_cmpint(result1, ==, -ECANCELED); g_assert_cmpint(result2, ==, -ECANCELED); - block_job_txn_unref(txn); + job_txn_unref(txn); } int main(int argc, char **argv) diff --git a/tests/test-blockjob.c b/tests/test-blockjob.c index e44c608327..1e052c2e9c 100644 --- a/tests/test-blockjob.c +++ b/tests/test-blockjob.c @@ -364,7 +364,7 @@ static void test_cancel_concluded(void) } assert(job->job.status == JOB_STATUS_PENDING); - block_job_finalize(job, &error_abort); + job_finalize(&job->job, &error_abort); assert(job->job.status == JOB_STATUS_CONCLUDED); cancel_common(s); -- 2.13.6