Skip to content

Commit

Permalink
Merge pull request #494 from bigcapitalhq/BIG-192
Browse files Browse the repository at this point in the history
fix: Concurrency controlling multiple processes in Bigcapital CLI commands
  • Loading branch information
abouolia authored Jun 9, 2024
2 parents 858f347 + f7fcfef commit 4d4ef54
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 28 deletions.
1 change: 1 addition & 0 deletions packages/server/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
"@casl/ability": "^5.4.3",
"@hapi/boom": "^7.4.3",
"@lemonsqueezy/lemonsqueezy.js": "^2.2.0",
"@supercharge/promise-pool": "^3.2.0",
"@types/express": "^4.17.21",
"@types/i18n": "^0.8.7",
"@types/knex": "^0.16.1",
Expand Down
67 changes: 39 additions & 28 deletions packages/server/src/commands/bigcapital.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import color from 'colorette';
import argv from 'getopts';
import Knex from 'knex';
import { knexSnakeCaseMappers } from 'objection';
import { PromisePool } from '@supercharge/promise-pool';
import '../before';
import config from '../config';

Expand All @@ -28,7 +29,7 @@ function initSystemKnex() {
});
}

function initTenantKnex(organizationId) {
function initTenantKnex(organizationId: string = '') {
return Knex({
client: config.tenant.db_client,
connection: {
Expand Down Expand Up @@ -71,10 +72,12 @@ function getAllSystemTenants(knex) {
return knex('tenants');
}

function getAllInitializedSystemTenants(knex) {
function getAllInitializedTenants(knex) {
return knex('tenants').whereNotNull('initializedAt');
}

const MIGRATION_CONCURRENCY = 10;

// module.exports = {
// log,
// success,
Expand All @@ -91,6 +94,7 @@ function getAllInitializedSystemTenants(knex) {
// - bigcapital tenants:migrate:make
// - bigcapital system:migrate:make
// - bigcapital tenants:list
// - bigcapital tenants:list --all

commander
.command('system:migrate:rollback')
Expand Down Expand Up @@ -149,10 +153,13 @@ commander
commander
.command('tenants:list')
.description('Retrieve a list of all system tenants databases.')
.option('-a, --all', 'All tenants even are not initialized.')
.action(async (cmd) => {
try {
const sysKnex = await initSystemKnex();
const tenants = await getAllSystemTenants(sysKnex);
const tenants = cmd?.all
? await getAllSystemTenants(sysKnex)
: await getAllInitializedTenants(sysKnex);

tenants.forEach((tenant) => {
const dbName = `${config.tenant.db_name_prefix}${tenant.organizationId}`;
Expand Down Expand Up @@ -183,18 +190,20 @@ commander
commander
.command('tenants:migrate:latest')
.description('Migrate all tenants or the given tenant id.')
.option('-t, --tenant_id [tenant_id]', 'Which tenant id do you migrate.')
.option(
'-t, --tenant_id [tenant_id]',
'Which organization id do you migrate.'
)
.action(async (cmd) => {
try {
const sysKnex = await initSystemKnex();
const tenants = await getAllInitializedSystemTenants(sysKnex);
const tenants = await getAllInitializedTenants(sysKnex);
const tenantsOrgsIds = tenants.map((tenant) => tenant.organizationId);

if (cmd.tenant_id && tenantsOrgsIds.indexOf(cmd.tenant_id) === -1) {
exit(`The given tenant id ${cmd.tenant_id} is not exists.`);
}
// Validate the tenant id exist first of all.
const migrateOpers = [];
const migrateTenant = async (organizationId) => {
try {
const tenantKnex = await initTenantKnex(organizationId);
Expand All @@ -216,17 +225,17 @@ commander
}
};
if (!cmd.tenant_id) {
tenants.forEach((tenant) => {
const oper = migrateTenant(tenant.organizationId);
migrateOpers.push(oper);
});
await PromisePool.withConcurrency(MIGRATION_CONCURRENCY)
.for(tenants)
.process((tenant, index, pool) => {
return migrateTenant(tenant.organizationId);
})
.then(() => {
success('All tenants are migrated.');
});
} else {
const oper = migrateTenant(cmd.tenant_id);
migrateOpers.push(oper);
await migrateTenant(cmd.tenant_id);
}
Promise.all(migrateOpers).then(() => {
success('All tenants are migrated.');
});
} catch (error) {
exit(error);
}
Expand All @@ -235,19 +244,21 @@ commander
commander
.command('tenants:migrate:rollback')
.description('Rollback the last batch of tenants migrations.')
.option('-t, --tenant_id [tenant_id]', 'Which tenant id do you migrate.')
.option(
'-t, --tenant_id [tenant_id]',
'Which organization id do you migrate.'
)
.action(async (cmd) => {
try {
const sysKnex = await initSystemKnex();
const tenants = await getAllSystemTenants(sysKnex);
const tenants = await getAllInitializedTenants(sysKnex);
const tenantsOrgsIds = tenants.map((tenant) => tenant.organizationId);

if (cmd.tenant_id && tenantsOrgsIds.indexOf(cmd.tenant_id) === -1) {
exit(`The given tenant id ${cmd.tenant_id} is not exists.`);
}

const migrateOpers = [];
const migrateTenant = async (organizationId) => {
const migrateTenant = async (organizationId: string) => {
try {
const tenantKnex = await initTenantKnex(organizationId);
const [batchNo, _log] = await tenantKnex.migrate.rollback();
Expand All @@ -268,17 +279,17 @@ commander
};

if (!cmd.tenant_id) {
tenants.forEach((tenant) => {
const oper = migrateTenant(tenant.organizationId);
migrateOpers.push(oper);
});
await PromisePool.withConcurrency(MIGRATION_CONCURRENCY)
.for(tenants)
.process((tenant, index, pool) => {
return migrateTenant(tenant.organizationId);
})
.then(() => {
success('All tenants are rollbacked.');
});
} else {
const oper = migrateTenant(cmd.tenant_id);
migrateOpers.push(oper);
await migrateTenant(cmd.tenant_id);
}
Promise.all(migrateOpers).then(() => {
success('All tenants are rollbacked.');
});
} catch (error) {
exit(error);
}
Expand Down
10 changes: 10 additions & 0 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 4d4ef54

Please sign in to comment.