Skip to content

Commit

Permalink
for AP rebalance, store partition version and create/drop partition t…
Browse files Browse the repository at this point in the history
…ree together under partition lock.

(We'll now use the same rebalance "group lock" method as we do for SC.)
  • Loading branch information
gooding470 committed Aug 17, 2018
1 parent ee5446b commit c81c659
Show file tree
Hide file tree
Showing 2 changed files with 116 additions and 97 deletions.
6 changes: 6 additions & 0 deletions as/include/fabric/partition_balance.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,12 @@ typedef struct inter_hash_s {

extern const as_partition_version ZERO_VERSION;

#define REBALANCE_FLUSH_SIZE 4096
#define PMETA_SIZE 16 // sizeof(ssd_common_pmeta) without including drv_ssd.h

#define PIDS_PER_GROUP (REBALANCE_FLUSH_SIZE / PMETA_SIZE) // 256
#define NUM_PID_GROUPS (AS_PARTITIONS / PIDS_PER_GROUP) // 16


//------------------------------------------------
// Globals.
Expand Down
207 changes: 110 additions & 97 deletions as/src/fabric/partition_balance.c
Original file line number Diff line number Diff line change
Expand Up @@ -745,143 +745,156 @@ balance_namespace_ap(as_namespace* ns, cf_queue* mq)

uint32_t ns_fresh_partitions = 0;

for (uint32_t pid = 0; pid < AS_PARTITIONS; pid++) {
as_partition* p = &ns->partitions[pid];
for (uint32_t pid_group = 0; pid_group < NUM_PID_GROUPS; pid_group++) {
uint32_t start_pid = pid_group * PIDS_PER_GROUP;
uint32_t end_pid = start_pid + PIDS_PER_GROUP;

cf_node* full_node_seq = &FULL_NODE_SEQ(pid, 0);
sl_ix_t* full_sl_ix = &FULL_SL_IX(pid, 0);
for (uint32_t pid = start_pid; pid < end_pid; pid++) {
as_partition* p = &ns->partitions[pid];

// Usually a namespace can simply use the global tables...
cf_node* ns_node_seq = full_node_seq;
sl_ix_t* ns_sl_ix = full_sl_ix;
cf_node* full_node_seq = &FULL_NODE_SEQ(pid, 0);
sl_ix_t* full_sl_ix = &FULL_SL_IX(pid, 0);

cf_node stack_node_seq[ns_not_equal_global ? ns->cluster_size : 0];
sl_ix_t stack_sl_ix[ns_not_equal_global ? ns->cluster_size : 0];
// Usually a namespace can simply use the global tables...
cf_node* ns_node_seq = full_node_seq;
sl_ix_t* ns_sl_ix = full_sl_ix;

// ... but sometimes a namespace is different.
if (ns_not_equal_global) {
ns_node_seq = stack_node_seq;
ns_sl_ix = stack_sl_ix;
cf_node stack_node_seq[ns_not_equal_global ? ns->cluster_size : 0];
sl_ix_t stack_sl_ix[ns_not_equal_global ? ns->cluster_size : 0];

fill_namespace_rows(full_node_seq, full_sl_ix, ns_node_seq,
ns_sl_ix, ns, translation);
// ... but sometimes a namespace is different.
if (ns_not_equal_global) {
ns_node_seq = stack_node_seq;
ns_sl_ix = stack_sl_ix;

if (ns->prefer_uniform_balance) {
uniform_adjust_row(ns_node_seq, ns->cluster_size, ns_sl_ix,
ns->replication_factor, claims, target_claims,
ns->rack_ids, n_racks);
}
else if (n_racks != 1) {
rack_aware_adjust_row(ns_node_seq, ns_sl_ix,
ns->replication_factor, ns->rack_ids, ns->cluster_size,
n_racks, 1);
fill_namespace_rows(full_node_seq, full_sl_ix, ns_node_seq,
ns_sl_ix, ns, translation);

if (ns->prefer_uniform_balance) {
uniform_adjust_row(ns_node_seq, ns->cluster_size, ns_sl_ix,
ns->replication_factor, claims, target_claims,
ns->rack_ids, n_racks);
}
else if (n_racks != 1) {
rack_aware_adjust_row(ns_node_seq, ns_sl_ix,
ns->replication_factor, ns->rack_ids,
ns->cluster_size, n_racks, 1);
}
}
}

cf_mutex_lock(&p->lock);
cf_mutex_lock(&p->lock);

p->working_master = (cf_node)0;
p->working_master = (cf_node)0;

p->n_replicas = ns->replication_factor;
memcpy(p->replicas, ns_node_seq, p->n_replicas * sizeof(cf_node));
p->n_replicas = ns->replication_factor;
memcpy(p->replicas, ns_node_seq, p->n_replicas * sizeof(cf_node));

p->n_dupl = 0;
p->n_dupl = 0;

p->pending_emigrations = 0;
p->pending_immigrations = 0;

p->pending_emigrations = 0;
p->pending_immigrations = 0;
p->n_witnesses = 0;

p->n_witnesses = 0;
uint32_t self_n = find_self(ns_node_seq, ns);

uint32_t self_n = find_self(ns_node_seq, ns);
as_partition_version final_version = {
.ckey = as_exchange_cluster_key(),
.master = self_n == 0 ? 1 : 0
};

as_partition_version final_version = {
.ckey = as_exchange_cluster_key(),
.master = self_n == 0 ? 1 : 0
};
p->final_version = final_version;

p->final_version = final_version;
int working_master_n = find_working_master_ap(p, ns_sl_ix, ns);

int working_master_n = find_working_master_ap(p, ns_sl_ix, ns);
uint32_t n_dupl = 0;
cf_node dupls[ns->cluster_size];

uint32_t n_dupl = 0;
cf_node dupls[ns->cluster_size];
as_partition_version orig_version = p->version;

as_partition_version orig_version = p->version;
// TEMPORARY debugging.
uint32_t debug_n_immigrators = 0;

// TEMPORARY debugging.
uint32_t debug_n_immigrators = 0;
if (working_master_n == -1) {
// No existing versions - assign fresh version to replicas.
working_master_n = 0;

if (working_master_n == -1) {
// No existing versions - assign fresh version to replicas.
working_master_n = 0;
if (self_n < p->n_replicas) {
p->version = p->final_version;
}

if (self_n < p->n_replicas) {
p->version = p->final_version;
ns_fresh_partitions++;
}
else {
n_dupl = find_duplicates_ap(p, ns_node_seq, ns_sl_ix, ns,
(uint32_t)working_master_n, dupls);

ns_fresh_partitions++;
}
else {
n_dupl = find_duplicates_ap(p, ns_node_seq, ns_sl_ix, ns,
(uint32_t)working_master_n, dupls);
uint32_t n_immigrators = fill_immigrators(p, ns_sl_ix, ns,
(uint32_t)working_master_n, n_dupl);

uint32_t n_immigrators = fill_immigrators(p, ns_sl_ix, ns,
(uint32_t)working_master_n, n_dupl);
// TEMPORARY debugging.
debug_n_immigrators = n_immigrators;

// TEMPORARY debugging.
debug_n_immigrators = n_immigrators;
if (n_immigrators != 0) {
// Migrations required - advance versions for next
// rebalance, queue migrations for this rebalance.

if (n_immigrators != 0) {
// Migrations required - advance versions for next rebalance,
// queue migrations for this rebalance.
advance_version_ap(p, ns_sl_ix, ns, self_n,
(uint32_t)working_master_n, n_dupl, dupls);

advance_version_ap(p, ns_sl_ix, ns, self_n,
(uint32_t)working_master_n, n_dupl, dupls);
queue_namespace_migrations(p, ns, self_n,
ns_node_seq[working_master_n], n_dupl, dupls, mq);

queue_namespace_migrations(p, ns, self_n,
ns_node_seq[working_master_n], n_dupl, dupls, mq);

if (self_n == 0) {
fill_witnesses(p, ns_node_seq, ns_sl_ix, ns);
ns_pending_signals += p->n_witnesses;
if (self_n == 0) {
fill_witnesses(p, ns_node_seq, ns_sl_ix, ns);
ns_pending_signals += p->n_witnesses;
}
}
else if (self_n < p->n_replicas) {
// No migrations required - refresh replicas' versions (only
// truly necessary if replication factor decreased).
p->version = p->final_version;
}
else {
// No migrations required - drop superfluous non-replica
// partitions immediately.
p->version = ZERO_VERSION;
}
}
else if (self_n < p->n_replicas) {
// No migrations required - refresh replicas' versions (only
// truly necessary if replication factor decreased).
p->version = p->final_version;
}
else {
// No migrations required - drop superfluous non-replica
// partitions immediately.
p->version = ZERO_VERSION;

if (self_n == 0 || self_n == working_master_n) {
p->working_master = ns_node_seq[working_master_n];
}
}

if (self_n == 0 || self_n == working_master_n) {
p->working_master = ns_node_seq[working_master_n];
}
handle_version_change(p, ns, &orig_version);

handle_version_change(p, ns, &orig_version);
ns_pending_immigrations += (uint32_t)p->pending_immigrations;
ns_pending_emigrations += (uint32_t)p->pending_emigrations;

ns_pending_immigrations += (uint32_t)p->pending_immigrations;
ns_pending_emigrations += (uint32_t)p->pending_emigrations;
// TEMPORARY debugging.
if (pid < 20) {
cf_debug(AS_PARTITION, "ck%012lX %02u (%d %d) %s -> %s - self_n %u wm_n %d repls %u dupls %u immigrators %u",
as_exchange_cluster_key(), pid, p->pending_emigrations,
p->pending_immigrations,
VERSION_AS_STRING(&orig_version),
VERSION_AS_STRING(&p->version), self_n,
working_master_n, p->n_replicas, n_dupl,
debug_n_immigrators);
}

// TEMPORARY debugging.
if (pid < 20) {
cf_debug(AS_PARTITION, "ck%012lX %02u (%d %d) %s -> %s - self_n %u wm_n %d repls %u dupls %u immigrators %u",
as_exchange_cluster_key(), pid, p->pending_emigrations,
p->pending_immigrations, VERSION_AS_STRING(&orig_version),
VERSION_AS_STRING(&p->version), self_n, working_master_n,
p->n_replicas, n_dupl, debug_n_immigrators);
client_replica_maps_update(ns, pid);
}

client_replica_maps_update(ns, pid);
// Flush partition metadata for this group of partitions ...
as_storage_flush_pmeta(ns, start_pid, PIDS_PER_GROUP);

cf_mutex_unlock(&p->lock);
}
// ... and unlock the group.
for (uint32_t pid = start_pid; pid < end_pid; pid++) {
as_partition* p = &ns->partitions[pid];

as_storage_flush_pmeta(ns, 0, AS_PARTITIONS);
cf_mutex_unlock(&p->lock);
}
}

cf_info(AS_PARTITION, "{%s} rebalanced: expected-migrations (%u,%u) expected-signals %u fresh-partitions %u",
ns->name, ns_pending_emigrations, ns_pending_immigrations,
Expand Down

0 comments on commit c81c659

Please sign in to comment.