Skip to content

Commit

Permalink
Add fix operation (#257) (#260)
Browse files Browse the repository at this point in the history
* Add fix operation (#257)

* Add FIX operation to run a fix operation (available default fixTypes: UNDEL and DELET) for a record in the datastore

* Add aut prod webhooks (#243, #247) ; Dependency update (#250)

* Update aut-names prod webhook from main (#243)

* Add aut production from main webhooks (#247)

* Bump the development-dependencies group with 2 updates (#249)

Bumps the development-dependencies group with 2 updates: [@babel/plugin-transform-runtime](https://github.com/babel/babel/tree/HEAD/packages/babel-plugin-transform-runtime) and [@babel/preset-env](https://github.com/babel/babel/tree/HEAD/packages/babel-preset-env).
* Bump the production-dependencies group with 2 updates (#248)

Bumps the production-dependencies group with 2 updates: [@babel/runtime](https://github.com/babel/babel/tree/HEAD/packages/babel-runtime) and [@natlibfi/melinda-commons](https://github.com/natlibfi/melinda-commons-js).

* 3.1.0-alpha.4
  • Loading branch information
ammsalme authored Sep 26, 2024
1 parent 2705f4f commit 06de43b
Show file tree
Hide file tree
Showing 16 changed files with 548 additions and 230 deletions.
12 changes: 12 additions & 0 deletions .env.localhost.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
TZ=Europe/Helsinki
#OPERATION='CREATE'
#OPERATION='UPDATE'
OPERATION='FIX'
RECORD_LOAD_URL=http://localhost:8090/fix
RECORD_LOAD_LIBRARY=fin01
POLL_WAIT_TIME=1000
LOG_LEVEL=debug
RECORD_LOAD_API_KEY=foobar
#DEBUG="@natlibfi/*"


12 changes: 12 additions & 0 deletions .env2
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
TZ=Europe/Helsinki
#OPERATION='CREATE'
OPERATION='UPDATE'
RECORD_LOAD_URL=http://localhost:8090
RECORD_LOAD_FIX_URL=http://localhost:8090/fix
RECORD_LOAD_API_KEY=foobar
RECORD_LOAD_LIBRARY=fin01
POLL_WAIT_TIME=1000
LOG_LEVEL=debug
#DEBUG="@natlibfi/*"


3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,8 @@ node_modules/

# dotenv environment variables file
.env
.env.fix
.env.create
.env.update


2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ While service is in use it polls Mongo for jobs is state `'IMPORTER.IN_QUEUE'` f
| Name | Mandatory | Description |
|---------------------|-----------|--------------------------------------------------------------------------------------------------------------------|
| AMQP_URL | Yes | A serialized object of AMQP connection config |
| OPERATION | Yes | A string state of passing operations. Enum: `'CREATE'` or `'UPDATE'` |
| OPERATION | Yes | A string state of passing operations. Enum: `'CREATE'`, `'UPDATE'` or `'FIX'` |
| RECORD_LOAD_API_KEY | Yes | A string key authorized to use the API |
| RECORD_LOAD_LIBRARY | Yes | A string |
| RECORD_LOAD_URL | Yes | A serialized URL address of Melinda-record-load-api |
Expand Down
12 changes: 6 additions & 6 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
"url": "[email protected]:NatLibFi/melinda-rest-api-importer.git"
},
"license": "MIT",
"version": "3.0.21",
"version": "3.1.0-alpha.5",
"main": "./dist/index.js",
"engines": {
"node": ">=18"
Expand All @@ -35,7 +35,7 @@
"@natlibfi/marc-record-serializers": "^10.1.2",
"@natlibfi/melinda-backend-commons": "^2.3.1",
"@natlibfi/melinda-commons": "^13.0.17",
"@natlibfi/melinda-rest-api-commons": "^4.1.11",
"@natlibfi/melinda-rest-api-commons": "^4.2.0",
"http-status": "^1.7.4",
"moment": "^2.30.1",
"node-fetch": "^2.7.0",
Expand Down
110 changes: 82 additions & 28 deletions src/app.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import {createLogger, logWait} from '@natlibfi/melinda-backend-commons';
import recordLoadFactory from './interfaces/loadStarter';
import recordFixFactory from './interfaces/fixLoadStarter';
import {amqpFactory, mongoFactory, QUEUE_ITEM_STATE, IMPORT_JOB_STATE, OPERATIONS, createImportJobState} from '@natlibfi/melinda-rest-api-commons';
import {inspect, promisify} from 'util';
import {createItemImportingHandler} from './handleItemImporting';
Expand All @@ -8,25 +9,44 @@ import prettyPrint from 'pretty-print-ms';

export default async function ({
amqpUrl, operation, pollWaitTime, error503WaitTime, mongoUri,
recordLoadApiKey, recordLoadLibrary, recordLoadUrl, fixPrio, fixBulk,
recordLoadApiKey, recordLoadLibrary, recordLoadUrl, recordLoadFixPath, recordLoadLoadPath, fixPrio, fixBulk,
keepLoadProcessReports
}) {

const setTimeoutPromise = promisify(setTimeout);
const logger = createLogger();

const recordLoadUrlWithPath = getUrlWithPath({operation, recordLoadUrl, recordLoadFixPath, recordLoadLoadPath});

// second parameter for running amqpHealthCheck
const amqpOperator = await amqpFactory(amqpUrl, true);
const mongoOperatorPrio = await mongoFactory(mongoUri, 'prio');
const mongoOperatorBulk = await mongoFactory(mongoUri, 'bulk');
const processOperator = await checkProcess({amqpOperator, recordLoadApiKey, recordLoadUrl, pollWaitTime, error503WaitTime, operation, keepLoadProcessReports, mongoUri});
const recordLoadOperator = recordLoadFactory({recordLoadApiKey, recordLoadLibrary, recordLoadUrl, fixPrio, fixBulk});
const prioItemImportingHandler = createItemImportingHandler(amqpOperator, mongoOperatorPrio, recordLoadOperator, {prio: true, error503WaitTime, recordLoadLibrary});
const bulkItemImportingHandler = createItemImportingHandler(amqpOperator, mongoOperatorBulk, recordLoadOperator, {prio: false, error503WaitTime, recordLoadLibrary});

const mongoOperators = {
prio: await mongoFactory(mongoUri, 'prio'),
bulk: await mongoFactory(mongoUri, 'bulk')
};

const processOperator = await checkProcess({amqpOperator, recordLoadApiKey, recordLoadUrl: recordLoadUrlWithPath, pollWaitTime, error503WaitTime, operation, keepLoadProcessReports, mongoUri});

const recordLoadOperator =
[OPERATIONS.CREATE, OPERATIONS.UPDATE].includes(operation)
? recordLoadFactory({recordLoadApiKey, recordLoadLibrary, recordLoadUrl: recordLoadUrlWithPath, fixPrio, fixBulk})
: recordFixFactory({recordLoadApiKey, recordLoadLibrary, recordLoadUrl: recordLoadUrlWithPath});

const itemImportingHandler = createItemImportingHandler(amqpOperator, mongoOperators, recordLoadOperator, {error503WaitTime, recordLoadLibrary});

logger.info(`Started Melinda-rest-api-importer with operation ${operation}`);

const server = await startCheck({});
return server;

function getUrlWithPath({operation, recordLoadUrl, recordLoadLoadPath, recordLoadFixPath}) {
logger.silly(`URL: ${recordLoadUrl}, paths: fix ${recordLoadFixPath}, load: ${recordLoadLoadPath}`);
const recordLoadUrlWithPath = operation === OPERATIONS.FIX ? `${recordLoadUrl}${recordLoadFixPath}` : `${recordLoadUrl}${recordLoadLoadPath}`;
logger.debug(`Using URL ${recordLoadUrlWithPath} for operation ${operation}`);
return recordLoadUrlWithPath;
}

async function startCheck({checkInProcessItems = true, wait = false, waitSinceLastOp = 0}) {
if (wait) {
await setTimeoutPromise(wait);
Expand All @@ -43,7 +63,7 @@ export default async function ({
}

async function checkInProcess({prio = true, waitSinceLastOp}) {
const mongoOperator = prio ? mongoOperatorPrio : mongoOperatorBulk;
const mongoOperator = prio ? mongoOperators.prio : mongoOperators.bulk;
// Items in aleph-record-load-api

const queueItemInProcess = await mongoOperator.getOne({queueItemState: QUEUE_ITEM_STATE.IMPORTER.IMPORTING, importJobState: createImportJobState(operation, IMPORT_JOB_STATE.PROCESSING, true)});
Expand All @@ -52,8 +72,10 @@ export default async function ({
if (queueItemInProcess) {
// Do not spam logs
logger.silly(`Found item in process ${queueItemInProcess.correlationId}`);

// processOperator return false if process is still ongoing (or it errored) and true if the process is done
const result = await processOperator.checkProcessQueueStart({correlationId: queueItemInProcess.correlationId, operation, mongoOperator, prio});

if (result) {
logger.debug(`Process done with ${prettyPrint(waitSinceLastOp)} of waiting`);
return startCheck({checkInProcessItems: true});
Expand All @@ -73,8 +95,8 @@ export default async function ({

// eslint-disable-next-line max-statements
async function checkItemImportingAndInQueue({prio = true, waitSinceLastOp}) {
const mongoOperator = prio ? mongoOperatorPrio : mongoOperatorBulk;
const itemImportingHandler = prio ? prioItemImportingHandler : bulkItemImportingHandler;
const mongoOperator = prio ? mongoOperators.prio : mongoOperators.bulk;

// Items in importer to be send to aleph-record-load-api
// ImportJobStates: EMPTY, QUEUING, IN_QUEUE, PROCESSING, DONE, ERROR, ABORT
// get here {<OPERATION>: IN_QUEUE}
Expand Down Expand Up @@ -113,33 +135,41 @@ export default async function ({
const itemImportingImporting = await mongoOperator.getOne({queueItemState: QUEUE_ITEM_STATE.IMPORTER.IMPORTING, importJobState: createImportJobState(operation, IMPORT_JOB_STATE.IMPORTING, true)});

if (itemImportingImporting) {

logger.debug(`Found item in importing ${itemImportingImporting.correlationId}, ImportJobState: {${operation}: IMPORTING} ${waitTimePrint(waitSinceLastOp)}`);
await itemImportingHandler({item: itemImportingImporting, operation});
await itemImportingHandler({item: itemImportingImporting, operation, prio});
return true;
}

return false;
}

async function checkImportJobStateDONE({waitSinceLastOp}) {
// We get queueItem, whose import job state for this importer is DONE
const queueItem = await mongoOperator.getOne({queueItemState: QUEUE_ITEM_STATE.IMPORTER.IMPORTING, importJobState: createImportJobState(operation, IMPORT_JOB_STATE.DONE, true)});
if (queueItem) {

logger.debug(`Found item in importing ${queueItem.correlationId}, ImportJobState: {${operation}: DONE} ${waitTimePrint(waitSinceLastOp)}`);
logger.silly(inspect(queueItem));

const otherOperationImportJobState = operation === OPERATIONS.CREATE ? OPERATIONS.UPDATE : OPERATIONS.CREATE;
const otherOperationImportJobStateResult = queueItem.importJobState[otherOperationImportJobState];
logger.debug(`Checking importerJobState for other operation: ${otherOperationImportJobState}: ${otherOperationImportJobStateResult}`);

if ([IMPORT_JOB_STATE.EMPTY, IMPORT_JOB_STATE.DONE, IMPORT_JOB_STATE.ERROR, IMPORT_JOB_STATE.ABORT].includes(otherOperationImportJobStateResult)) {
logger.debug(`Other importJob in not ongoing/pending, importing done`);
// FIXes are DONE when ImportJobState.FIX is DONE
if ([OPERATIONS.FIX].includes(operation)) {
await mongoOperator.setState({correlationId: queueItem.correlationId, state: QUEUE_ITEM_STATE.DONE});
return true;
}
}

// Only CREATEs and UPDATEs can exist in the same queueItem, they are not DONE if the other job is not also done
if ([OPERATIONS.CREATE, OPERATIONS.UPDATE].includes(operation)) {
const otherOperationImportJobState = operation === OPERATIONS.CREATE ? OPERATIONS.UPDATE : OPERATIONS.CREATE;
const otherOperationImportJobStateResult = queueItem.importJobState[otherOperationImportJobState];
logger.debug(`Checking importerJobState for other operation: ${otherOperationImportJobState}: ${otherOperationImportJobStateResult}`);

if ([IMPORT_JOB_STATE.EMPTY, IMPORT_JOB_STATE.DONE, IMPORT_JOB_STATE.ERROR, IMPORT_JOB_STATE.ABORT].includes(otherOperationImportJobStateResult)) {
logger.debug(`Other importJob in not ongoing/pending, importing done`);
await mongoOperator.setState({correlationId: queueItem.correlationId, state: QUEUE_ITEM_STATE.DONE});
return true;
}
return false;
}
logger.debug(`WARNING! unknown operation ${operation}`);
}
return false;
}

Expand All @@ -165,6 +195,7 @@ export default async function ({
// eslint-disable-next-line max-statements
async function checkQueueItemStateINQUEUE({waitSinceLastOp}) {
const queueItem = await mongoOperator.getOne({queueItemState: QUEUE_ITEM_STATE.IMPORTER.IN_QUEUE});
logger.silly(`checkQueueItemStateINQUEUE: ${JSON.stringify(queueItem)}`);

if (queueItem && queueItem.operationSettings.noop === true) {
logger.verbose(`QueueItem ${queueItem.correlationId} has operationSettings.noop ${queueItem.operationSettings.noop} - not running importer for this job`);
Expand All @@ -182,20 +213,43 @@ export default async function ({
if (queueItem && queueItem.importJobState[operation] === IMPORT_JOB_STATE.EMPTY) {
logger.debug(`Found item in queue to be imported ${queueItem.correlationId} ${waitTimePrint(waitSinceLastOp)} but importJobState for ${operation} is ${queueItem.importJobState[operation]}`);
logger.silly(JSON.stringify(queueItem.importJobState));
logger.debug(`QueueItem has operation: ${queueItem.operation}`);

// check whether also the other queue is EMPTY or a final state
const otherOperationImportJobState = operation === OPERATIONS.CREATE ? OPERATIONS.UPDATE : OPERATIONS.CREATE;
const otherOperationImportJobStateResult = queueItem.importJobState[otherOperationImportJobState];
logger.debug(`Checking importerJobState for other operation: ${otherOperationImportJobState}: ${otherOperationImportJobStateResult}`);

if ([IMPORT_JOB_STATE.EMPTY, IMPORT_JOB_STATE.DONE, IMPORT_JOB_STATE.ERROR, IMPORT_JOB_STATE.ABORT].includes(otherOperationImportJobStateResult)) {
logger.debug(`Other importJob (${otherOperationImportJobState}) in not ongoing/pending, importing done`);
if ([OPERATIONS.FIX].includes(operation) && [OPERATIONS.FIX].includes(queueItem.operation)) {
logger.debug(`Fix process has just FIX importer, we shouldn't have empty import job state!`);
// Should we error this?
await mongoOperator.setState({correlationId: queueItem.correlationId, state: QUEUE_ITEM_STATE.DONE});
return true;
}

// Only CREATEs and UPDATEs can exist in the same queueItem
if ([OPERATIONS.CREATE, OPERATIONS.UPDATE].includes(operation) && [OPERATIONS.CREATE, OPERATIONS.UPDATE].includes(queueItem.operation)) {
// check whether also the other queues are EMPTY or a final state
const otherOperationImportJobState = operation === OPERATIONS.CREATE ? OPERATIONS.UPDATE : OPERATIONS.CREATE;
const otherOperationImportJobStateResult = queueItem.importJobState[otherOperationImportJobState];
logger.debug(`Checking importerJobState for other operation: ${otherOperationImportJobState}: ${otherOperationImportJobStateResult}`);

if ([IMPORT_JOB_STATE.EMPTY, IMPORT_JOB_STATE.DONE, IMPORT_JOB_STATE.ERROR, IMPORT_JOB_STATE.ABORT].includes(otherOperationImportJobStateResult)) {
logger.debug(`Other importJob (${otherOperationImportJobState}) in not ongoing/pending, importing done`);
await mongoOperator.setState({correlationId: queueItem.correlationId, state: QUEUE_ITEM_STATE.DONE});
return true;
}
// WHY THIS? Shouldn't other LOAD-IMPORTER CATCH this?
//logger.debug(`Found item in queue to be imported ${queueItem.correlationId} ${waitTimePrint(waitSinceLastOp)}`);
//await mongoOperator.setState({correlationId: queueItem.correlationId, state: QUEUE_ITEM_STATE.IMPORTER.IMPORTING});
return false;
}
return false;
}
if (queueItem) {
logger.debug(`Found item in queue to be imported ${queueItem.correlationId} ${waitTimePrint(waitSinceLastOp)}`);
await mongoOperator.setState({correlationId: queueItem.correlationId, state: QUEUE_ITEM_STATE.IMPORTER.IMPORTING});

// Why we have this here?
if (operation === OPERATIONS.FIX && queueItem.importJobState.OPERATIONS.FIX === undefined) {
await mongoOperator.setImportJobState({correlationId: queueItem.correlationId, operation, importJobState: IMPORT_JOB_STATE.IMPORTING});
return true;
}
return true;
}
return false;
Expand Down
3 changes: 3 additions & 0 deletions src/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ import {readEnvironmentVariable} from '@natlibfi/melinda-backend-commons/';

// Record-load-api to save data
export const recordLoadUrl = readEnvironmentVariable('RECORD_LOAD_URL');
export const recordLoadFixPath = readEnvironmentVariable('RECORD_LOAD_FIX_PATH', {defaultValue: '/fix'});
export const recordLoadLoadPath = readEnvironmentVariable('RECORD_LOAD_LOAD_PATH', {defaultValue: ''});
export const recordLoadApiKey = readEnvironmentVariable('RECORD_LOAD_API_KEY');
export const recordLoadLibrary = readEnvironmentVariable('RECORD_LOAD_LIBRARY');
export const fixPrio = readEnvironmentVariable('FIX_PRIO', {defaultValue: 'API'});
Expand All @@ -16,6 +18,7 @@ export const mongoUri = readEnvironmentVariable('MONGO_URI', {defaultValue: 'mon
// Operation variables
export const pollWaitTime = readEnvironmentVariable('POLL_WAIT_TIME', {defaultValue: 1000});
export const error503WaitTime = readEnvironmentVariable('ERROR_503_WAIT_TIME', {defaultValue: 10000});

export const operation = readEnvironmentVariable('OPERATION');

// Reporting variables
Expand Down
Loading

0 comments on commit 06de43b

Please sign in to comment.