diff --git a/components/clp-py-utils/clp_py_utils/initialize-orchestration-db.py b/components/clp-py-utils/clp_py_utils/initialize-orchestration-db.py index 2c8133e8a..c29128853 100644 --- a/components/clp-py-utils/clp_py_utils/initialize-orchestration-db.py +++ b/components/clp-py-utils/clp_py_utils/initialize-orchestration-db.py @@ -55,6 +55,7 @@ def main(argv): `status_msg` VARCHAR(512) NOT NULL DEFAULT '', `creation_time` DATETIME(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3), `start_time` DATETIME(3) NULL DEFAULT NULL, + `update_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP(), `duration` FLOAT NULL DEFAULT NULL, `original_size` BIGINT NOT NULL DEFAULT '0', `uncompressed_size` BIGINT NOT NULL DEFAULT '0', @@ -64,7 +65,8 @@ def main(argv): `clp_binary_version` INT NULL DEFAULT NULL, `clp_config` VARBINARY(60000) NOT NULL, PRIMARY KEY (`id`) USING BTREE, - INDEX `JOB_STATUS` (`status`) USING BTREE + INDEX `JOB_STATUS` (`status`) USING BTREE, + INDEX `JOB_UPDATE_TIME` (`update_time`) USING BTREE ) ROW_FORMAT=DYNAMIC """ ) diff --git a/components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py b/components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py index f90922a11..7de797bb0 100644 --- a/components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py +++ b/components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py @@ -74,7 +74,7 @@ def update_compression_job_metadata(db_cursor, job_id, kv): logger.error("Must specify at least one field to update") raise ValueError - field_set_expressions = [f"{k} = %s" for k in kv.keys()] + field_set_expressions = [f"{k} = %s" for k in kv.keys()] + ["update_time = CURRENT_TIMESTAMP()"] query = f""" UPDATE {COMPRESSION_JOBS_TABLE_NAME} SET {", ".join(field_set_expressions)} diff --git a/components/webui/imports/api/ingestion/constants.js b/components/webui/imports/api/ingestion/constants.js index 8797e7849..cab5cbb7e 100644 --- a/components/webui/imports/api/ingestion/constants.js +++ b/components/webui/imports/api/ingestion/constants.js @@ -10,6 +10,7 @@ const COMPRESSION_JOBS_TABLE_COLUMN_NAMES = Object.freeze({ STATUS_MSG: "status_msg", CREATION_TIME: "creation_time", START_TIME: "start_time", + UPDATE_TIME: "update_time", DURATION: "duration", ORIGINAL_SIZE: "original_size", UNCOMPRESSED_SIZE: "uncompressed_size", diff --git a/components/webui/imports/api/ingestion/server/CompressionDbManager.js b/components/webui/imports/api/ingestion/server/CompressionDbManager.js index bd0e47c0c..03a1dda4d 100644 --- a/components/webui/imports/api/ingestion/server/CompressionDbManager.js +++ b/components/webui/imports/api/ingestion/server/CompressionDbManager.js @@ -20,53 +20,29 @@ class CompressionDbManager { } /** - * Retrieves the last `limit` number of jobs and the ones with the given - * job IDs. + * Retrieves compression jobs that are updated on or after a specific time. * - * @param {number} limit - * @param {number[]} jobIds + * @param {number} lastUpdateTimestampSeconds * @return {Promise} Job objects with fields with the names in * `COMPRESSION_JOBS_TABLE_COLUMN_NAMES` */ - async getCompressionJobs (limit, jobIds) { - const queries = []; + async getCompressionJobs (lastUpdateTimestampSeconds) { + const queryString = ` + SELECT + UNIX_TIMESTAMP() as retrieval_time, + id as _id, + ${COMPRESSION_JOBS_TABLE_COLUMN_NAMES.STATUS}, + ${COMPRESSION_JOBS_TABLE_COLUMN_NAMES.STATUS_MSG}, + ${COMPRESSION_JOBS_TABLE_COLUMN_NAMES.START_TIME}, + ${COMPRESSION_JOBS_TABLE_COLUMN_NAMES.UPDATE_TIME}, + ${COMPRESSION_JOBS_TABLE_COLUMN_NAMES.DURATION}, + ${COMPRESSION_JOBS_TABLE_COLUMN_NAMES.UNCOMPRESSED_SIZE}, + ${COMPRESSION_JOBS_TABLE_COLUMN_NAMES.COMPRESSED_SIZE} + FROM ${this.#compressionJobsTableName} + WHERE ${COMPRESSION_JOBS_TABLE_COLUMN_NAMES.UPDATE_TIME} >= + FROM_UNIXTIME(${lastUpdateTimestampSeconds}) - 1 + ORDER BY _id DESC;`; - queries.push(` - WITH SelectedColumns AS ( - SELECT - id as _id, - ${COMPRESSION_JOBS_TABLE_COLUMN_NAMES.STATUS}, - ${COMPRESSION_JOBS_TABLE_COLUMN_NAMES.STATUS_MSG}, - ${COMPRESSION_JOBS_TABLE_COLUMN_NAMES.START_TIME}, - ${COMPRESSION_JOBS_TABLE_COLUMN_NAMES.DURATION}, - ${COMPRESSION_JOBS_TABLE_COLUMN_NAMES.UNCOMPRESSED_SIZE}, - ${COMPRESSION_JOBS_TABLE_COLUMN_NAMES.COMPRESSED_SIZE} - FROM ${this.#compressionJobsTableName} - ) - ( - SELECT * - FROM SelectedColumns - ORDER BY _id DESC - LIMIT ${limit} - ) - `); - - // The initial select may not include the jobs specified by `jobIds`, so we select - // them explicitly and then deduplicate the rows with a UNION DISTINCT clause. - jobIds.forEach((jobId) => { - queries.push(` - UNION DISTINCT - ( - SELECT * - FROM SelectedColumns - WHERE _id=${jobId} - ) - `); - }); - - queries.push("ORDER BY _id DESC;"); - - const queryString = queries.join("\n"); const [results] = await this.#sqlDbConnPool.query(queryString); return results; diff --git a/components/webui/imports/api/ingestion/server/publications.js b/components/webui/imports/api/ingestion/server/publications.js index 61aefb8a1..8febca51f 100644 --- a/components/webui/imports/api/ingestion/server/publications.js +++ b/components/webui/imports/api/ingestion/server/publications.js @@ -8,21 +8,12 @@ import { STATS_COLLECTION_ID, StatsCollection, } from "../collections"; -import { - COMPRESSION_JOB_WAITING_STATES, - COMPRESSION_JOBS_TABLE_COLUMN_NAMES, -} from "../constants"; import CompressionDbManager from "./CompressionDbManager"; import StatsDbManager from "./StatsDbManager"; const COMPRESSION_JOBS_REFRESH_INTERVAL_MILLIS = 1000; -/** - * The maximum number of compression jobs to retrieve at a time. - */ -const COMPRESSION_MAX_RETRIEVE_JOBS = 5; - const STATS_REFRESH_INTERVAL_MILLIS = 5000; /** @@ -45,6 +36,11 @@ let compressionJobsRefreshTimeout = null; */ let statsRefreshInterval = null; +/** + * @type {number} + */ +let lastUpdateTimestampSeconds = 0; + /** * Updates the compression statistics in the StatsCollection. * @@ -89,21 +85,17 @@ const refreshCompressionJobs = async () => { return; } - const pendingJobIds = await CompressionJobsCollection.find({ - [COMPRESSION_JOBS_TABLE_COLUMN_NAMES.STATUS]: { - $in: COMPRESSION_JOB_WAITING_STATES, - }, - }) - .fetch() - .map((job) => ( - job._id - )); - const jobs = await compressionDbManager.getCompressionJobs( - COMPRESSION_MAX_RETRIEVE_JOBS, - pendingJobIds + lastUpdateTimestampSeconds ); + if (0 !== jobs.length) { + // `refreshCompressionJobs()` shall not be run concurrently + // and therefore incurs no race condition. + // eslint-disable-next-line require-atomic-updates + lastUpdateTimestampSeconds = jobs[0].retrieval_time; + } + const operations = jobs.map((doc) => ({ updateOne: { filter: {_id: doc._id},