Skip to content

Commit

Permalink
[backend] Enhance logging of migrations (#9732)
Browse files Browse the repository at this point in the history
  • Loading branch information
SamuelHassine authored Jan 27, 2025
1 parent 1948fa5 commit 6434431
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 21 deletions.
10 changes: 10 additions & 0 deletions opencti-platform/opencti-graphql/src/config/conf.js
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,12 @@ if (appLogConsoleTransport) {
appLogTransports.push(new winston.transports.Console());
}

const migrationLogger = winston.createLogger({
level: 'debug',
format: format.combine(timestamp(), format.errors({ stack: true }), format.json()),
transports: appLogTransports,
});

const appLogger = winston.createLogger({
level: appLogLevel,
format: format.combine(timestamp(), format.errors({ stack: true }), format.json()),
Expand Down Expand Up @@ -265,6 +271,10 @@ export const logS3Debug = {
},
};

export const logMigration = {
info: (message) => migrationLogger.log('info', message),
};

export const logApp = {
_log: (level, message, meta = {}) => {
if (appLogTransports.length > 0 && appLogger.isLevelEnabled(level)) {
Expand Down
18 changes: 11 additions & 7 deletions opencti-platform/opencti-graphql/src/database/engine.js
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ import {
waitInSec,
WRITE_PLATFORM_INDICES
} from './utils';
import conf, { booleanConf, extendedErrors, loadCert, logApp } from '../config/conf';
import conf, { booleanConf, extendedErrors, loadCert, logApp, logMigration } from '../config/conf';
import { ComplexSearchError, ConfigurationError, DatabaseError, EngineShardsError, FunctionalError, UnsupportedError } from '../config/errors';
import {
isStixRefRelationship,
Expand Down Expand Up @@ -236,11 +236,15 @@ export const UNIMPACTED_ENTITIES_ROLE = [
const LOCATED_AT_CLEANED = [ENTITY_TYPE_LOCATION_REGION, ENTITY_TYPE_LOCATION_COUNTRY];
const UNSUPPORTED_LOCATED_AT = [ENTITY_IPV4_ADDR, ENTITY_IPV6_ADDR, ENTITY_TYPE_LOCATION_CITY];
export const isSpecialNonImpactedCases = (relationshipType, fromType, toType, side) => {
// Rel on the "to" side with related-to from observable
// The relationship is a related-to from an observable to "something" (generally, it is an intrusion set, a malware, etc.)
// This is to avoid for instance Emotet having 200K related-to.
// As a consequence, no entities view on the observable side.
if (side === ROLE_TO && relationshipType === RELATION_RELATED_TO && isStixCyberObservable(fromType)) {
return true;
}
// Rel on the "to" side with located-at from IP / cities to region / country
// This relationship is a located-at from IPv4 / IPv6 / City to a country or a region
// This is to avoid having too big region entities
// As a consequence, no entities view in city / knowledge / regions,
if (side === ROLE_TO && relationshipType === RELATION_LOCATED_AT && UNSUPPORTED_LOCATED_AT.includes(fromType) && LOCATED_AT_CLEANED.includes(toType)) {
return true;
}
Expand Down Expand Up @@ -415,7 +419,7 @@ const elOperationForMigration = (operation) => {
const elGetTask = (taskId) => engine.tasks.get({ task_id: taskId }).then((r) => oebp(r));

return async (message, index, body) => {
logApp.info(`${message} > started`);
logMigration.info(`${message} > started`);
// Execute the update by query in async mode
const queryAsync = await operation({
...(index ? { index } : {}),
Expand All @@ -425,19 +429,19 @@ const elOperationForMigration = (operation) => {
}).catch((err) => {
throw DatabaseError('Async engine bulk migration fail', { migration: message, cause: err });
});
logApp.info(`${message} > elastic running task ${queryAsync.task}`);
logMigration.info(`${message} > elastic running task ${queryAsync.task}`);
// Wait 10 seconds for task to initialize
await waitInSec(10);
// Monitor the task until completion
let taskStatus = await elGetTask(queryAsync.task);
while (!taskStatus.completed) {
const { total, updated } = taskStatus.task.status;
logApp.info(`${message} > in progress - ${updated}/${total}`);
logMigration.info(`${message} > in progress - ${updated}/${total}`);
await waitInSec(5);
taskStatus = await elGetTask(queryAsync.task);
}
const timeSec = Math.round(taskStatus.task.running_time_in_nanos / 1e9);
logApp.info(`${message} > done in ${timeSec} seconds`);
logMigration.info(`${message} > done in ${timeSec} seconds`);
};
};

Expand Down
14 changes: 7 additions & 7 deletions opencti-platform/opencti-graphql/src/database/migration.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import * as R from 'ramda';
import { MigrationSet } from 'migrate';
import Migration from 'migrate/lib/migration';
import { logApp, PLATFORM_VERSION } from '../config/conf';
import { logApp, logMigration, PLATFORM_VERSION } from '../config/conf';
import { DatabaseError } from '../config/errors';
import { RELATION_MIGRATES } from '../schema/internalRelationship';
import { ENTITY_TYPE_MIGRATION_REFERENCE, ENTITY_TYPE_MIGRATION_STATUS } from '../schema/internalObject';
Expand Down Expand Up @@ -50,7 +50,7 @@ const migrationStorage = {
RELATION_MIGRATES,
ENTITY_TYPE_MIGRATION_REFERENCE
);
logApp.info(`[MIGRATION] Read ${dbMigrations.length} migrations from the database`);
logMigration.info(`[MIGRATION] Read ${dbMigrations.length} migrations from the database`);
const migrationStatus = {
lastRun: migration.lastRun,
internal_id: migration.internal_id,
Expand Down Expand Up @@ -80,7 +80,7 @@ const migrationStorage = {
// Attach the reference to the migration status.
const migrationRel = { fromId: migrationStatus.id, toId: migrationRef.id, relationship_type: RELATION_MIGRATES };
await createRelation(context, SYSTEM_USER, migrationRel);
logApp.info(`[MIGRATION] Saving current configuration, ${mig.title}`);
logMigration.info(`[MIGRATION] Saving current configuration, ${mig.title}`);
return fn();
} catch (err) {
logApp.error('Error handling migration', { cause: err });
Expand Down Expand Up @@ -108,16 +108,16 @@ export const applyMigration = (context) => {
/** Match the files migrations to the database migrations.
Plays migrations that does not have matching name / timestamp */
if (migrationToApply.length > 0) {
logApp.info(`[MIGRATION] ${migrationToApply.length} migrations will be executed`);
logMigration.info(`[MIGRATION] ${migrationToApply.length} migrations will be executed`);
} else {
logApp.info('[MIGRATION] Platform already up to date, nothing to migrate');
logMigration.info('[MIGRATION] Platform already up to date, nothing to migrate');
}
for (let index = 0; index < migrationToApply.length; index += 1) {
const migSet = migrationToApply[index];
const migration = new Migration(migSet.title, migSet.up, migSet.down);
const stateMigration = alreadyAppliedMigrations.get(migration.title);
if (stateMigration) {
logApp.info(`[MIGRATION] Replaying migration ${migration.title}`);
logMigration.info(`[MIGRATION] Replaying migration ${migration.title}`);
}
set.addMigration(migration);
}
Expand All @@ -128,7 +128,7 @@ export const applyMigration = (context) => {
reject(migrationError);
return;
}
logApp.info('[MIGRATION] Migration process completed');
logMigration.info('[MIGRATION] Migration process completed');
resolve(state);
});
});
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import * as R from 'ramda';
import { Promise } from 'bluebird';
import { logApp } from '../config/conf';
import { logMigration } from '../config/conf';
import { BULK_TIMEOUT, elBulk, elFindByIds, elList, elUpdateByQueryForMigration, ES_MAX_CONCURRENCY, MAX_BULK_OPERATIONS } from '../database/engine';
import { READ_INDEX_STIX_DOMAIN_OBJECTS } from '../database/utils';
import { executionContext, SYSTEM_USER } from '../utils/access';
Expand All @@ -21,16 +21,19 @@ import { ENTITY_IPV4_ADDR, ENTITY_IPV6_ADDR, isStixCyberObservable } from '../sc
const message = '[MIGRATION] Cleaning deprecated rels';

export const up = async (next) => {
logApp.info(`${message} > started`);
logMigration.info(`${message} > started`);
const context = executionContext('migration');

// 1st pass
// Cleaning all threats from any re_related-to related to observables
logMigration.info('[OPENCTI] Cleaning deprecated rels for observable related-to...');
const relKeyRelatedTo = 'rel_related-to.internal_id';
const bulkOperationsRelatedTo = [];
const startRelatedTo = new Date().getTime();
const clearRelatedToObservableRels = async (threats) => {
let currentProcessingThreats = 0;
for (let i = 0; i < threats.length; i += 1) {
logMigration.info(`[OPENCTI] Cleaning deprecated rels for related-to ${currentProcessingThreats} / ${threats.length}`);
const threat = threats[i];
const relatedToIds = threat[relKeyRelatedTo] ?? [];
const newIds = [];
Expand All @@ -54,6 +57,7 @@ export const up = async (next) => {
];
bulkOperationsRelatedTo.push(...updateQuery);
}
currentProcessingThreats += 1;
}
};
const threatTypes = [
Expand All @@ -72,19 +76,22 @@ export const up = async (next) => {
const concurrentUpdateRelatedTo = async (bulk) => {
await elBulk({ refresh: true, timeout: BULK_TIMEOUT, body: bulk });
currentProcessingRelatedTo += bulk.length;
logApp.info(`[OPENCTI] Cleaning deprecated rels for observable related-to ${currentProcessingRelatedTo} / ${bulkOperationsRelatedTo.length}`);
logMigration.info(`[OPENCTI] Cleaning deprecated rels for observable related-to ${currentProcessingRelatedTo} / ${bulkOperationsRelatedTo.length}`);
};
await Promise.map(groupsOfOperationsRelatedTo, concurrentUpdateRelatedTo, { concurrency: ES_MAX_CONCURRENCY });
logApp.info(`[MIGRATION] Cleaning deprecated rels for observable related-to done in ${new Date() - startRelatedTo} ms`);
logMigration.info(`[MIGRATION] Cleaning deprecated rels for observable related-to done in ${new Date() - startRelatedTo} ms`);

// 2nd pass
// Cleaning located-at when pointing a country or a region
logMigration.info('[OPENCTI] Cleaning deprecated rels for located-at...');
const relKeyLocatedAt = 'rel_located-at.internal_id';
const bulkOperationsLocatedAt = [];
const startLocatedAt = new Date().getTime();
const cleanTypes = [ENTITY_IPV4_ADDR, ENTITY_IPV6_ADDR, ENTITY_TYPE_LOCATION_CITY];
const clearLocatedAtRegionAndCountryRels = async (locations) => {
let currentProcessingLocations = 0;
for (let i = 0; i < locations.length; i += 1) {
logMigration.info(`[OPENCTI] Cleaning deprecated rels for located-at ${currentProcessingLocations} / ${locations.length}`);
const location = locations[i];
const locatedAtIds = location[relKeyLocatedAt] ?? [];
const newIds = [];
Expand All @@ -108,6 +115,7 @@ export const up = async (next) => {
];
bulkOperationsLocatedAt.push(...updateQuery);
}
currentProcessingLocations += 1;
}
};
const opts = { types: [ENTITY_TYPE_LOCATION_REGION, ENTITY_TYPE_LOCATION_COUNTRY], callback: clearLocatedAtRegionAndCountryRels };
Expand All @@ -118,13 +126,14 @@ export const up = async (next) => {
const concurrentUpdate = async (bulk) => {
await elBulk({ refresh: true, timeout: BULK_TIMEOUT, body: bulk });
currentProcessing += bulk.length;
logApp.info(`[OPENCTI] Cleaning deprecated rels for located-at to country / region ${currentProcessing} / ${bulkOperationsLocatedAt.length}`);
logMigration.info(`[OPENCTI] Cleaning deprecated rels for located-at to country / region ${currentProcessing} / ${bulkOperationsLocatedAt.length}`);
};
await Promise.map(groupsOfOperations, concurrentUpdate, { concurrency: ES_MAX_CONCURRENCY });
logApp.info(`[MIGRATION] Cleaning deprecated rels for located-at to country / region in ${new Date() - startLocatedAt} ms`);
logMigration.info(`[MIGRATION] Cleaning deprecated rels for located-at to country / region in ${new Date() - startLocatedAt} ms`);

// 3rd pass
// Cleaning all targets to countries, regions and sectors
logMigration.info('[OPENCTI] Cleaning deprecated rels for targets...');
const updateQueryForTargets = {
script: {
params: { fieldToRemove: 'rel_targets' },
Expand Down Expand Up @@ -159,7 +168,7 @@ export const up = async (next) => {
updateQueryForTargets
);

logApp.info(`${message} > done`);
logMigration.info(`${message} > done`);
next();
};

Expand Down

0 comments on commit 6434431

Please sign in to comment.