Skip to content

Commit

Permalink
block: Add block job transactions
Browse files Browse the repository at this point in the history
Sometimes block jobs must execute as a transaction group.  Finishing
jobs wait until all other jobs are ready to complete successfully.
Failure or cancellation of one job cancels the other jobs in the group.

Signed-off-by: Stefan Hajnoczi <[email protected]>
Reviewed-by: Max Reitz <[email protected]>
Signed-off-by: Fam Zheng <[email protected]>
Signed-off-by: John Snow <[email protected]>
Message-id: [email protected]
[Rewrite the implementation which is now contained in block_job_completed.
--Fam]
Signed-off-by: Fam Zheng <[email protected]>
Reviewed-by: Max Reitz <[email protected]>

Signed-off-by: John Snow <[email protected]>
Signed-off-by: Stefan Hajnoczi <[email protected]>
Signed-off-by: Kevin Wolf <[email protected]>
  • Loading branch information
Fam Zheng authored and kevmw committed Nov 12, 2015
1 parent 94db6d2 commit c55a832
Show file tree
Hide file tree
Showing 3 changed files with 172 additions and 2 deletions.
135 changes: 133 additions & 2 deletions blockjob.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,19 @@
#include "qemu/timer.h"
#include "qapi-event.h"

/* Transactional group of block jobs */
struct BlockJobTxn {

/* Is this txn being cancelled? */
bool aborting;

/* List of jobs */
QLIST_HEAD(, BlockJob) jobs;

/* Reference count */
int refcnt;
};

void *block_job_create(const BlockJobDriver *driver, BlockDriverState *bs,
int64_t speed, BlockCompletionFunc *cb,
void *opaque, Error **errp)
Expand Down Expand Up @@ -94,6 +107,86 @@ void block_job_unref(BlockJob *job)
}
}

static void block_job_completed_single(BlockJob *job)
{
if (!job->ret) {
if (job->driver->commit) {
job->driver->commit(job);
}
} else {
if (job->driver->abort) {
job->driver->abort(job);
}
}
job->cb(job->opaque, job->ret);
if (job->txn) {
block_job_txn_unref(job->txn);
}
block_job_unref(job);
}

static void block_job_completed_txn_abort(BlockJob *job)
{
AioContext *ctx;
BlockJobTxn *txn = job->txn;
BlockJob *other_job, *next;

if (txn->aborting) {
/*
* We are cancelled by another job, which will handle everything.
*/
return;
}
txn->aborting = true;
/* We are the first failed job. Cancel other jobs. */
QLIST_FOREACH(other_job, &txn->jobs, txn_list) {
ctx = bdrv_get_aio_context(other_job->bs);
aio_context_acquire(ctx);
}
QLIST_FOREACH(other_job, &txn->jobs, txn_list) {
if (other_job == job || other_job->completed) {
/* Other jobs are "effectively" cancelled by us, set the status for
* them; this job, however, may or may not be cancelled, depending
* on the caller, so leave it. */
if (other_job != job) {
other_job->cancelled = true;
}
continue;
}
block_job_cancel_sync(other_job);
assert(other_job->completed);
}
QLIST_FOREACH_SAFE(other_job, &txn->jobs, txn_list, next) {
ctx = bdrv_get_aio_context(other_job->bs);
block_job_completed_single(other_job);
aio_context_release(ctx);
}
}

static void block_job_completed_txn_success(BlockJob *job)
{
AioContext *ctx;
BlockJobTxn *txn = job->txn;
BlockJob *other_job, *next;
/*
* Successful completion, see if there are other running jobs in this
* txn.
*/
QLIST_FOREACH(other_job, &txn->jobs, txn_list) {
if (!other_job->completed) {
return;
}
}
/* We are the last completed job, commit the transaction. */
QLIST_FOREACH_SAFE(other_job, &txn->jobs, txn_list, next) {
ctx = bdrv_get_aio_context(other_job->bs);
aio_context_acquire(ctx);
assert(other_job->ret == 0);
block_job_completed_single(other_job);
aio_context_release(ctx);
}
}

void block_job_completed(BlockJob *job, int ret)
{
BlockDriverState *bs = job->bs;
Expand All @@ -102,8 +195,13 @@ void block_job_completed(BlockJob *job, int ret)
assert(!job->completed);
job->completed = true;
job->ret = ret;
job->cb(job->opaque, ret);
block_job_unref(job);
if (!job->txn) {
block_job_completed_single(job);
} else if (ret < 0 || block_job_is_cancelled(job)) {
block_job_completed_txn_abort(job);
} else {
block_job_completed_txn_success(job);
}
}

void block_job_set_speed(BlockJob *job, int64_t speed, Error **errp)
Expand Down Expand Up @@ -402,3 +500,36 @@ void block_job_defer_to_main_loop(BlockJob *job,

qemu_bh_schedule(data->bh);
}

BlockJobTxn *block_job_txn_new(void)
{
BlockJobTxn *txn = g_new0(BlockJobTxn, 1);
QLIST_INIT(&txn->jobs);
txn->refcnt = 1;
return txn;
}

static void block_job_txn_ref(BlockJobTxn *txn)
{
txn->refcnt++;
}

void block_job_txn_unref(BlockJobTxn *txn)
{
if (txn && --txn->refcnt == 0) {
g_free(txn);
}
}

void block_job_txn_add_job(BlockJobTxn *txn, BlockJob *job)
{
if (!txn) {
return;
}

assert(!job->txn);
job->txn = txn;

QLIST_INSERT_HEAD(&txn->jobs, job, txn_list);
block_job_txn_ref(txn);
}
1 change: 1 addition & 0 deletions include/block/block.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ typedef struct BlockDriver BlockDriver;
typedef struct BlockJob BlockJob;
typedef struct BdrvChild BdrvChild;
typedef struct BdrvChildRole BdrvChildRole;
typedef struct BlockJobTxn BlockJobTxn;

typedef struct BlockDriverInfo {
/* in bytes, 0 if irrelevant */
Expand Down
38 changes: 38 additions & 0 deletions include/block/blockjob.h
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,9 @@ struct BlockJob {
*/
int ret;

/** Non-NULL if this job is part of a transaction */
BlockJobTxn *txn;
QLIST_ENTRY(BlockJob) txn_list;
};

/**
Expand Down Expand Up @@ -405,4 +408,39 @@ void block_job_defer_to_main_loop(BlockJob *job,
BlockJobDeferToMainLoopFn *fn,
void *opaque);

/**
* block_job_txn_new:
*
* Allocate and return a new block job transaction. Jobs can be added to the
* transaction using block_job_txn_add_job().
*
* The transaction is automatically freed when the last job completes or is
* cancelled.
*
* All jobs in the transaction either complete successfully or fail/cancel as a
* group. Jobs wait for each other before completing. Cancelling one job
* cancels all jobs in the transaction.
*/
BlockJobTxn *block_job_txn_new(void);

/**
* block_job_txn_unref:
*
* Release a reference that was previously acquired with block_job_txn_add_job
* or block_job_txn_new. If it's the last reference to the object, it will be
* freed.
*/
void block_job_txn_unref(BlockJobTxn *txn);

/**
* block_job_txn_add_job:
* @txn: The transaction (may be NULL)
* @job: Job to add to the transaction
*
* Add @job to the transaction. The @job must not already be in a transaction.
* The caller must call either block_job_txn_unref() or block_job_completed()
* to release the reference that is automatically grabbed here.
*/
void block_job_txn_add_job(BlockJobTxn *txn, BlockJob *job);

#endif

0 comments on commit c55a832

Please sign in to comment.