From 0a3c885e19728452acb54bef87d6608c81d50ecb Mon Sep 17 00:00:00 2001 From: AVMatthews Date: Mon, 20 Jan 2025 14:18:34 -0500 Subject: [PATCH 1/8] remove job limit for update and update on all statuses since last update --- .../initialize-orchestration-db.py | 1 + .../compress/compression_scheduler.py | 6 +++++ .../webui/imports/api/ingestion/constants.js | 1 + .../ingestion/server/CompressionDbManager.js | 10 ++++---- .../api/ingestion/server/publications.js | 23 ++++++++++++++----- .../panels/IngestionJobs/IngestionJobRow.jsx | 7 ++++++ .../IngestView/panels/IngestionJobs/index.jsx | 1 + 7 files changed, 39 insertions(+), 10 deletions(-) diff --git a/components/clp-py-utils/clp_py_utils/initialize-orchestration-db.py b/components/clp-py-utils/clp_py_utils/initialize-orchestration-db.py index 2c8133e8a..ffdfd98b7 100644 --- a/components/clp-py-utils/clp_py_utils/initialize-orchestration-db.py +++ b/components/clp-py-utils/clp_py_utils/initialize-orchestration-db.py @@ -55,6 +55,7 @@ def main(argv): `status_msg` VARCHAR(512) NOT NULL DEFAULT '', `creation_time` DATETIME(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3), `start_time` DATETIME(3) NULL DEFAULT NULL, + `update_time` DATETIME(3) NULL DEFAULT NULL, `duration` FLOAT NULL DEFAULT NULL, `original_size` BIGINT NOT NULL DEFAULT '0', `uncompressed_size` BIGINT NOT NULL DEFAULT '0', diff --git a/components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py b/components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py index f90922a11..d76d93439 100644 --- a/components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py +++ b/components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py @@ -209,12 +209,14 @@ def search_and_schedule_new_tasks(db_conn, db_cursor, clp_metadata_db_connection if len(tasks) == 0: logger.warning(f"No tasks were created for job {job_id}") + update_time = datetime.datetime.now() update_compression_job_metadata( db_cursor, job_id, { "status": CompressionJobStatus.FAILED, "status_msg": "invalid input path", + "update_time": update_time }, ) db_conn.commit() @@ -229,6 +231,7 @@ def search_and_schedule_new_tasks(db_conn, db_cursor, clp_metadata_db_connection "num_tasks": paths_to_compress_buffer.num_tasks, "status": CompressionJobStatus.RUNNING, "start_time": start_time, + "update_time": start_time }, ) db_conn.commit() @@ -324,6 +327,7 @@ def poll_running_jobs(db_conn, db_cursor): logger.error(f"Error while getting results for job {job_id}: {e}") job_success = False + update_time = datetime.datetime.now() if job_success: logger.info(f"Job {job_id} succeeded.") update_compression_job_metadata( @@ -332,6 +336,7 @@ def poll_running_jobs(db_conn, db_cursor): dict( status=CompressionJobStatus.SUCCEEDED, duration=duration, + update_time=update_time ), ) else: @@ -342,6 +347,7 @@ def poll_running_jobs(db_conn, db_cursor): dict( status=CompressionJobStatus.FAILED, status_msg=error_message, + update_time=update_time ), ) db_conn.commit() diff --git a/components/webui/imports/api/ingestion/constants.js b/components/webui/imports/api/ingestion/constants.js index 8797e7849..cab5cbb7e 100644 --- a/components/webui/imports/api/ingestion/constants.js +++ b/components/webui/imports/api/ingestion/constants.js @@ -10,6 +10,7 @@ const COMPRESSION_JOBS_TABLE_COLUMN_NAMES = Object.freeze({ STATUS_MSG: "status_msg", CREATION_TIME: "creation_time", START_TIME: "start_time", + UPDATE_TIME: "update_time", DURATION: "duration", ORIGINAL_SIZE: "original_size", UNCOMPRESSED_SIZE: "uncompressed_size", diff --git a/components/webui/imports/api/ingestion/server/CompressionDbManager.js b/components/webui/imports/api/ingestion/server/CompressionDbManager.js index bd0e47c0c..725063658 100644 --- a/components/webui/imports/api/ingestion/server/CompressionDbManager.js +++ b/components/webui/imports/api/ingestion/server/CompressionDbManager.js @@ -23,12 +23,12 @@ class CompressionDbManager { * Retrieves the last `limit` number of jobs and the ones with the given * job IDs. * - * @param {number} limit + * @param {string} lastUpdateDate * @param {number[]} jobIds * @return {Promise} Job objects with fields with the names in * `COMPRESSION_JOBS_TABLE_COLUMN_NAMES` */ - async getCompressionJobs (limit, jobIds) { + async getCompressionJobs (lastUpdateDate, jobIds) { const queries = []; queries.push(` @@ -38,6 +38,7 @@ class CompressionDbManager { ${COMPRESSION_JOBS_TABLE_COLUMN_NAMES.STATUS}, ${COMPRESSION_JOBS_TABLE_COLUMN_NAMES.STATUS_MSG}, ${COMPRESSION_JOBS_TABLE_COLUMN_NAMES.START_TIME}, + ${COMPRESSION_JOBS_TABLE_COLUMN_NAMES.UPDATE_TIME}, ${COMPRESSION_JOBS_TABLE_COLUMN_NAMES.DURATION}, ${COMPRESSION_JOBS_TABLE_COLUMN_NAMES.UNCOMPRESSED_SIZE}, ${COMPRESSION_JOBS_TABLE_COLUMN_NAMES.COMPRESSED_SIZE} @@ -47,7 +48,6 @@ class CompressionDbManager { SELECT * FROM SelectedColumns ORDER BY _id DESC - LIMIT ${limit} ) `); @@ -59,7 +59,9 @@ class CompressionDbManager { ( SELECT * FROM SelectedColumns - WHERE _id=${jobId} + WHERE + _id=${jobId} && + ${COMPRESSION_JOBS_TABLE_COLUMN_NAMES.UPDATE_TIME} >= '${lastUpdateDate}' ) `); }); diff --git a/components/webui/imports/api/ingestion/server/publications.js b/components/webui/imports/api/ingestion/server/publications.js index 61aefb8a1..ffddf5d88 100644 --- a/components/webui/imports/api/ingestion/server/publications.js +++ b/components/webui/imports/api/ingestion/server/publications.js @@ -18,13 +18,10 @@ import StatsDbManager from "./StatsDbManager"; const COMPRESSION_JOBS_REFRESH_INTERVAL_MILLIS = 1000; -/** - * The maximum number of compression jobs to retrieve at a time. - */ -const COMPRESSION_MAX_RETRIEVE_JOBS = 5; - const STATS_REFRESH_INTERVAL_MILLIS = 5000; +const CONST_FOR_DATE_FORMAT = 19; + /** * @type {CompressionDbManager|null} */ @@ -45,6 +42,13 @@ let compressionJobsRefreshTimeout = null; */ let statsRefreshInterval = null; +/** + * @type {string} + */ +let lastUpdateDate = new Date().toISOString() + .slice(0, CONST_FOR_DATE_FORMAT) + .replace("T", " "); + /** * Updates the compression statistics in the StatsCollection. * @@ -99,11 +103,18 @@ const refreshCompressionJobs = async () => { job._id )); + // Save timestamp before querying the combressionDBManager + const newDate = new Date().toISOString() + .slice(0, CONST_FOR_DATE_FORMAT) + .replace("T", " "); + const jobs = await compressionDbManager.getCompressionJobs( - COMPRESSION_MAX_RETRIEVE_JOBS, + lastUpdateDate, pendingJobIds ); + lastUpdateDate = newDate; + const operations = jobs.map((doc) => ({ updateOne: { filter: {_id: doc._id}, diff --git a/components/webui/imports/ui/IngestView/panels/IngestionJobs/IngestionJobRow.jsx b/components/webui/imports/ui/IngestView/panels/IngestionJobs/IngestionJobRow.jsx index de23ec02b..0cdb2a6d7 100644 --- a/components/webui/imports/ui/IngestView/panels/IngestionJobs/IngestionJobRow.jsx +++ b/components/webui/imports/ui/IngestView/panels/IngestionJobs/IngestionJobRow.jsx @@ -106,6 +106,13 @@ const IngestionJobRow = ({job}) => { isAlwaysVisible={isPlaceholderVisible} text={compressedSizeText}/> + + + ); }; diff --git a/components/webui/imports/ui/IngestView/panels/IngestionJobs/index.jsx b/components/webui/imports/ui/IngestView/panels/IngestionJobs/index.jsx index 9591d5e00..e806b18b2 100644 --- a/components/webui/imports/ui/IngestView/panels/IngestionJobs/index.jsx +++ b/components/webui/imports/ui/IngestView/panels/IngestionJobs/index.jsx @@ -47,6 +47,7 @@ const IngestionJobs = () => { Speed Data Ingested Compressed Size + Last Updated From 1fc3454e7d6f6fb482ce2da8a529338509988f0d Mon Sep 17 00:00:00 2001 From: AVMatthews Date: Mon, 20 Jan 2025 18:00:34 -0500 Subject: [PATCH 2/8] fix linting issue --- .../webui/imports/api/ingestion/server/publications.js | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/components/webui/imports/api/ingestion/server/publications.js b/components/webui/imports/api/ingestion/server/publications.js index ffddf5d88..7e257d252 100644 --- a/components/webui/imports/api/ingestion/server/publications.js +++ b/components/webui/imports/api/ingestion/server/publications.js @@ -103,18 +103,16 @@ const refreshCompressionJobs = async () => { job._id )); - // Save timestamp before querying the combressionDBManager - const newDate = new Date().toISOString() + const previousUpdateDate = lastUpdateDate; + lastUpdateDate = new Date().toISOString() .slice(0, CONST_FOR_DATE_FORMAT) .replace("T", " "); const jobs = await compressionDbManager.getCompressionJobs( - lastUpdateDate, + previousUpdateDate, pendingJobIds ); - lastUpdateDate = newDate; - const operations = jobs.map((doc) => ({ updateOne: { filter: {_id: doc._id}, From 433c66b4fd7fdf8aefcda990c1bbe6b060475cbe Mon Sep 17 00:00:00 2001 From: AVMatthews Date: Mon, 20 Jan 2025 18:13:56 -0500 Subject: [PATCH 3/8] fix linting format issue --- .../scheduler/compress/compression_scheduler.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py b/components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py index d76d93439..8e7db39b9 100644 --- a/components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py +++ b/components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py @@ -216,7 +216,7 @@ def search_and_schedule_new_tasks(db_conn, db_cursor, clp_metadata_db_connection { "status": CompressionJobStatus.FAILED, "status_msg": "invalid input path", - "update_time": update_time + "update_time": update_time, }, ) db_conn.commit() @@ -231,7 +231,7 @@ def search_and_schedule_new_tasks(db_conn, db_cursor, clp_metadata_db_connection "num_tasks": paths_to_compress_buffer.num_tasks, "status": CompressionJobStatus.RUNNING, "start_time": start_time, - "update_time": start_time + "update_time": start_time, }, ) db_conn.commit() @@ -336,7 +336,7 @@ def poll_running_jobs(db_conn, db_cursor): dict( status=CompressionJobStatus.SUCCEEDED, duration=duration, - update_time=update_time + update_time=update_time, ), ) else: @@ -347,7 +347,7 @@ def poll_running_jobs(db_conn, db_cursor): dict( status=CompressionJobStatus.FAILED, status_msg=error_message, - update_time=update_time + update_time=update_time, ), ) db_conn.commit() From c0e830cc283b3b6232ed68b9d756d91852c34940 Mon Sep 17 00:00:00 2001 From: AVMatthews Date: Thu, 23 Jan 2025 02:15:13 -0500 Subject: [PATCH 4/8] move update_time logic into update_compression_job_metadata --- .../initialize-orchestration-db.py | 2 +- .../compress/compression_scheduler.py | 8 +-- .../ingestion/server/CompressionDbManager.js | 61 ++++++------------- .../api/ingestion/server/publications.js | 37 +++-------- .../panels/IngestionJobs/IngestionJobRow.jsx | 7 --- .../IngestView/panels/IngestionJobs/index.jsx | 1 - 6 files changed, 29 insertions(+), 87 deletions(-) diff --git a/components/clp-py-utils/clp_py_utils/initialize-orchestration-db.py b/components/clp-py-utils/clp_py_utils/initialize-orchestration-db.py index ffdfd98b7..4108bf7ee 100644 --- a/components/clp-py-utils/clp_py_utils/initialize-orchestration-db.py +++ b/components/clp-py-utils/clp_py_utils/initialize-orchestration-db.py @@ -55,7 +55,7 @@ def main(argv): `status_msg` VARCHAR(512) NOT NULL DEFAULT '', `creation_time` DATETIME(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3), `start_time` DATETIME(3) NULL DEFAULT NULL, - `update_time` DATETIME(3) NULL DEFAULT NULL, + `update_time` DATETIME(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3), `duration` FLOAT NULL DEFAULT NULL, `original_size` BIGINT NOT NULL DEFAULT '0', `uncompressed_size` BIGINT NOT NULL DEFAULT '0', diff --git a/components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py b/components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py index 8e7db39b9..ede8449fc 100644 --- a/components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py +++ b/components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py @@ -77,7 +77,7 @@ def update_compression_job_metadata(db_cursor, job_id, kv): field_set_expressions = [f"{k} = %s" for k in kv.keys()] query = f""" UPDATE {COMPRESSION_JOBS_TABLE_NAME} - SET {", ".join(field_set_expressions)} + SET {", ".join(field_set_expressions)}, update_time = NOW(3) WHERE id = %s """ values = list(kv.values()) + [job_id] @@ -209,14 +209,12 @@ def search_and_schedule_new_tasks(db_conn, db_cursor, clp_metadata_db_connection if len(tasks) == 0: logger.warning(f"No tasks were created for job {job_id}") - update_time = datetime.datetime.now() update_compression_job_metadata( db_cursor, job_id, { "status": CompressionJobStatus.FAILED, "status_msg": "invalid input path", - "update_time": update_time, }, ) db_conn.commit() @@ -231,7 +229,6 @@ def search_and_schedule_new_tasks(db_conn, db_cursor, clp_metadata_db_connection "num_tasks": paths_to_compress_buffer.num_tasks, "status": CompressionJobStatus.RUNNING, "start_time": start_time, - "update_time": start_time, }, ) db_conn.commit() @@ -327,7 +324,6 @@ def poll_running_jobs(db_conn, db_cursor): logger.error(f"Error while getting results for job {job_id}: {e}") job_success = False - update_time = datetime.datetime.now() if job_success: logger.info(f"Job {job_id} succeeded.") update_compression_job_metadata( @@ -336,7 +332,6 @@ def poll_running_jobs(db_conn, db_cursor): dict( status=CompressionJobStatus.SUCCEEDED, duration=duration, - update_time=update_time, ), ) else: @@ -347,7 +342,6 @@ def poll_running_jobs(db_conn, db_cursor): dict( status=CompressionJobStatus.FAILED, status_msg=error_message, - update_time=update_time, ), ) db_conn.commit() diff --git a/components/webui/imports/api/ingestion/server/CompressionDbManager.js b/components/webui/imports/api/ingestion/server/CompressionDbManager.js index 725063658..9675afdfd 100644 --- a/components/webui/imports/api/ingestion/server/CompressionDbManager.js +++ b/components/webui/imports/api/ingestion/server/CompressionDbManager.js @@ -20,55 +20,28 @@ class CompressionDbManager { } /** - * Retrieves the last `limit` number of jobs and the ones with the given - * job IDs. + * Retrieves compression jobs that are updated on or after a specific time. * - * @param {string} lastUpdateDate - * @param {number[]} jobIds + * @param {number} lastUpdateTimestamp in seconds * @return {Promise} Job objects with fields with the names in * `COMPRESSION_JOBS_TABLE_COLUMN_NAMES` */ - async getCompressionJobs (lastUpdateDate, jobIds) { - const queries = []; + async getCompressionJobs (lastUpdateTimestamp) { + const queryString = ` + SELECT + id as _id, + ${COMPRESSION_JOBS_TABLE_COLUMN_NAMES.STATUS}, + ${COMPRESSION_JOBS_TABLE_COLUMN_NAMES.STATUS_MSG}, + ${COMPRESSION_JOBS_TABLE_COLUMN_NAMES.START_TIME}, + ${COMPRESSION_JOBS_TABLE_COLUMN_NAMES.UPDATE_TIME}, + ${COMPRESSION_JOBS_TABLE_COLUMN_NAMES.DURATION}, + ${COMPRESSION_JOBS_TABLE_COLUMN_NAMES.UNCOMPRESSED_SIZE}, + ${COMPRESSION_JOBS_TABLE_COLUMN_NAMES.COMPRESSED_SIZE}, + UNIX_TIMESTAMP(CURRENT_TIMESTAMP(3)) as retrieval_time + FROM ${this.#compressionJobsTableName} + WHERE update_time >= FROM_UNIXTIME(${lastUpdateTimestamp}) + ORDER BY _id DESC;\n`; - queries.push(` - WITH SelectedColumns AS ( - SELECT - id as _id, - ${COMPRESSION_JOBS_TABLE_COLUMN_NAMES.STATUS}, - ${COMPRESSION_JOBS_TABLE_COLUMN_NAMES.STATUS_MSG}, - ${COMPRESSION_JOBS_TABLE_COLUMN_NAMES.START_TIME}, - ${COMPRESSION_JOBS_TABLE_COLUMN_NAMES.UPDATE_TIME}, - ${COMPRESSION_JOBS_TABLE_COLUMN_NAMES.DURATION}, - ${COMPRESSION_JOBS_TABLE_COLUMN_NAMES.UNCOMPRESSED_SIZE}, - ${COMPRESSION_JOBS_TABLE_COLUMN_NAMES.COMPRESSED_SIZE} - FROM ${this.#compressionJobsTableName} - ) - ( - SELECT * - FROM SelectedColumns - ORDER BY _id DESC - ) - `); - - // The initial select may not include the jobs specified by `jobIds`, so we select - // them explicitly and then deduplicate the rows with a UNION DISTINCT clause. - jobIds.forEach((jobId) => { - queries.push(` - UNION DISTINCT - ( - SELECT * - FROM SelectedColumns - WHERE - _id=${jobId} && - ${COMPRESSION_JOBS_TABLE_COLUMN_NAMES.UPDATE_TIME} >= '${lastUpdateDate}' - ) - `); - }); - - queries.push("ORDER BY _id DESC;"); - - const queryString = queries.join("\n"); const [results] = await this.#sqlDbConnPool.query(queryString); return results; diff --git a/components/webui/imports/api/ingestion/server/publications.js b/components/webui/imports/api/ingestion/server/publications.js index 7e257d252..90f5a341f 100644 --- a/components/webui/imports/api/ingestion/server/publications.js +++ b/components/webui/imports/api/ingestion/server/publications.js @@ -8,10 +8,6 @@ import { STATS_COLLECTION_ID, StatsCollection, } from "../collections"; -import { - COMPRESSION_JOB_WAITING_STATES, - COMPRESSION_JOBS_TABLE_COLUMN_NAMES, -} from "../constants"; import CompressionDbManager from "./CompressionDbManager"; import StatsDbManager from "./StatsDbManager"; @@ -20,8 +16,6 @@ const COMPRESSION_JOBS_REFRESH_INTERVAL_MILLIS = 1000; const STATS_REFRESH_INTERVAL_MILLIS = 5000; -const CONST_FOR_DATE_FORMAT = 19; - /** * @type {CompressionDbManager|null} */ @@ -43,11 +37,9 @@ let compressionJobsRefreshTimeout = null; let statsRefreshInterval = null; /** - * @type {string} + * @type {number|null} */ -let lastUpdateDate = new Date().toISOString() - .slice(0, CONST_FOR_DATE_FORMAT) - .replace("T", " "); +let lastUpdateTimestampSeconds = 0; /** * Updates the compression statistics in the StatsCollection. @@ -93,26 +85,17 @@ const refreshCompressionJobs = async () => { return; } - const pendingJobIds = await CompressionJobsCollection.find({ - [COMPRESSION_JOBS_TABLE_COLUMN_NAMES.STATUS]: { - $in: COMPRESSION_JOB_WAITING_STATES, - }, - }) - .fetch() - .map((job) => ( - job._id - )); - - const previousUpdateDate = lastUpdateDate; - lastUpdateDate = new Date().toISOString() - .slice(0, CONST_FOR_DATE_FORMAT) - .replace("T", " "); - const jobs = await compressionDbManager.getCompressionJobs( - previousUpdateDate, - pendingJobIds + lastUpdateTimestampSeconds ); + if (0 !== jobs.length) { + // `refreshCompressionJobs()` shall not be run concurrently + // and therefore incurs no racecondition. + // eslint-disable-next-line require-atomic-updates + lastUpdateTimestampSeconds = jobs[0].retrieval_time; + } + const operations = jobs.map((doc) => ({ updateOne: { filter: {_id: doc._id}, diff --git a/components/webui/imports/ui/IngestView/panels/IngestionJobs/IngestionJobRow.jsx b/components/webui/imports/ui/IngestView/panels/IngestionJobs/IngestionJobRow.jsx index 0cdb2a6d7..de23ec02b 100644 --- a/components/webui/imports/ui/IngestView/panels/IngestionJobs/IngestionJobRow.jsx +++ b/components/webui/imports/ui/IngestView/panels/IngestionJobs/IngestionJobRow.jsx @@ -106,13 +106,6 @@ const IngestionJobRow = ({job}) => { isAlwaysVisible={isPlaceholderVisible} text={compressedSizeText}/> - - - ); }; diff --git a/components/webui/imports/ui/IngestView/panels/IngestionJobs/index.jsx b/components/webui/imports/ui/IngestView/panels/IngestionJobs/index.jsx index e806b18b2..9591d5e00 100644 --- a/components/webui/imports/ui/IngestView/panels/IngestionJobs/index.jsx +++ b/components/webui/imports/ui/IngestView/panels/IngestionJobs/index.jsx @@ -47,7 +47,6 @@ const IngestionJobs = () => { Speed Data Ingested Compressed Size - Last Updated From 6603122b49a7865146f88138a4b707b5abce3f52 Mon Sep 17 00:00:00 2001 From: AVMatthews Date: Thu, 23 Jan 2025 18:36:46 -0500 Subject: [PATCH 5/8] fix datetime rounding issues --- .../clp_py_utils/initialize-orchestration-db.py | 5 +++-- .../scheduler/compress/compression_scheduler.py | 4 ++-- .../api/ingestion/server/CompressionDbManager.js | 11 ++++++----- .../imports/api/ingestion/server/publications.js | 4 ++-- 4 files changed, 13 insertions(+), 11 deletions(-) diff --git a/components/clp-py-utils/clp_py_utils/initialize-orchestration-db.py b/components/clp-py-utils/clp_py_utils/initialize-orchestration-db.py index 4108bf7ee..f0811e714 100644 --- a/components/clp-py-utils/clp_py_utils/initialize-orchestration-db.py +++ b/components/clp-py-utils/clp_py_utils/initialize-orchestration-db.py @@ -55,7 +55,7 @@ def main(argv): `status_msg` VARCHAR(512) NOT NULL DEFAULT '', `creation_time` DATETIME(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3), `start_time` DATETIME(3) NULL DEFAULT NULL, - `update_time` DATETIME(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3), + `update_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP(), `duration` FLOAT NULL DEFAULT NULL, `original_size` BIGINT NOT NULL DEFAULT '0', `uncompressed_size` BIGINT NOT NULL DEFAULT '0', @@ -65,7 +65,8 @@ def main(argv): `clp_binary_version` INT NULL DEFAULT NULL, `clp_config` VARBINARY(60000) NOT NULL, PRIMARY KEY (`id`) USING BTREE, - INDEX `JOB_STATUS` (`status`) USING BTREE + INDEX `JOB_STATUS` (`status`) USING BTREE, + INDEX `LAST_UPDATE_TIME` (`update_time`) USING BTREE ) ROW_FORMAT=DYNAMIC """ ) diff --git a/components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py b/components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py index ede8449fc..7de797bb0 100644 --- a/components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py +++ b/components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py @@ -74,10 +74,10 @@ def update_compression_job_metadata(db_cursor, job_id, kv): logger.error("Must specify at least one field to update") raise ValueError - field_set_expressions = [f"{k} = %s" for k in kv.keys()] + field_set_expressions = [f"{k} = %s" for k in kv.keys()] + ["update_time = CURRENT_TIMESTAMP()"] query = f""" UPDATE {COMPRESSION_JOBS_TABLE_NAME} - SET {", ".join(field_set_expressions)}, update_time = NOW(3) + SET {", ".join(field_set_expressions)} WHERE id = %s """ values = list(kv.values()) + [job_id] diff --git a/components/webui/imports/api/ingestion/server/CompressionDbManager.js b/components/webui/imports/api/ingestion/server/CompressionDbManager.js index 9675afdfd..38a9c0356 100644 --- a/components/webui/imports/api/ingestion/server/CompressionDbManager.js +++ b/components/webui/imports/api/ingestion/server/CompressionDbManager.js @@ -22,13 +22,14 @@ class CompressionDbManager { /** * Retrieves compression jobs that are updated on or after a specific time. * - * @param {number} lastUpdateTimestamp in seconds + * @param {number} lastUpdateTimestampSeconds * @return {Promise} Job objects with fields with the names in * `COMPRESSION_JOBS_TABLE_COLUMN_NAMES` */ - async getCompressionJobs (lastUpdateTimestamp) { + async getCompressionJobs (lastUpdateTimestampSeconds) { const queryString = ` SELECT + UNIX_TIMESTAMP() as retrieval_time, id as _id, ${COMPRESSION_JOBS_TABLE_COLUMN_NAMES.STATUS}, ${COMPRESSION_JOBS_TABLE_COLUMN_NAMES.STATUS_MSG}, @@ -36,10 +37,10 @@ class CompressionDbManager { ${COMPRESSION_JOBS_TABLE_COLUMN_NAMES.UPDATE_TIME}, ${COMPRESSION_JOBS_TABLE_COLUMN_NAMES.DURATION}, ${COMPRESSION_JOBS_TABLE_COLUMN_NAMES.UNCOMPRESSED_SIZE}, - ${COMPRESSION_JOBS_TABLE_COLUMN_NAMES.COMPRESSED_SIZE}, - UNIX_TIMESTAMP(CURRENT_TIMESTAMP(3)) as retrieval_time + ${COMPRESSION_JOBS_TABLE_COLUMN_NAMES.COMPRESSED_SIZE} FROM ${this.#compressionJobsTableName} - WHERE update_time >= FROM_UNIXTIME(${lastUpdateTimestamp}) + WHERE ${COMPRESSION_JOBS_TABLE_COLUMN_NAMES.UPDATE_TIME} >= + FROM_UNIXTIME(${lastUpdateTimestampSeconds}) -1 ORDER BY _id DESC;\n`; const [results] = await this.#sqlDbConnPool.query(queryString); diff --git a/components/webui/imports/api/ingestion/server/publications.js b/components/webui/imports/api/ingestion/server/publications.js index 90f5a341f..8febca51f 100644 --- a/components/webui/imports/api/ingestion/server/publications.js +++ b/components/webui/imports/api/ingestion/server/publications.js @@ -37,7 +37,7 @@ let compressionJobsRefreshTimeout = null; let statsRefreshInterval = null; /** - * @type {number|null} + * @type {number} */ let lastUpdateTimestampSeconds = 0; @@ -91,7 +91,7 @@ const refreshCompressionJobs = async () => { if (0 !== jobs.length) { // `refreshCompressionJobs()` shall not be run concurrently - // and therefore incurs no racecondition. + // and therefore incurs no race condition. // eslint-disable-next-line require-atomic-updates lastUpdateTimestampSeconds = jobs[0].retrieval_time; } From 470f3c44e93620b2a9b02f121db3036044f044eb Mon Sep 17 00:00:00 2001 From: Abigail Matthews Date: Fri, 24 Jan 2025 16:13:38 -0500 Subject: [PATCH 6/8] Update components/webui/imports/api/ingestion/server/CompressionDbManager.js Co-authored-by: Junhao Liao --- .../webui/imports/api/ingestion/server/CompressionDbManager.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/components/webui/imports/api/ingestion/server/CompressionDbManager.js b/components/webui/imports/api/ingestion/server/CompressionDbManager.js index 38a9c0356..537157b06 100644 --- a/components/webui/imports/api/ingestion/server/CompressionDbManager.js +++ b/components/webui/imports/api/ingestion/server/CompressionDbManager.js @@ -40,7 +40,7 @@ class CompressionDbManager { ${COMPRESSION_JOBS_TABLE_COLUMN_NAMES.COMPRESSED_SIZE} FROM ${this.#compressionJobsTableName} WHERE ${COMPRESSION_JOBS_TABLE_COLUMN_NAMES.UPDATE_TIME} >= - FROM_UNIXTIME(${lastUpdateTimestampSeconds}) -1 + FROM_UNIXTIME(${lastUpdateTimestampSeconds}) - 1 ORDER BY _id DESC;\n`; const [results] = await this.#sqlDbConnPool.query(queryString); From b76d0d8ec5560b07ef6a87a54935cd461321f020 Mon Sep 17 00:00:00 2001 From: Abigail Matthews Date: Fri, 24 Jan 2025 16:15:06 -0500 Subject: [PATCH 7/8] Update components/webui/imports/api/ingestion/server/CompressionDbManager.js Co-authored-by: Junhao Liao --- .../webui/imports/api/ingestion/server/CompressionDbManager.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/components/webui/imports/api/ingestion/server/CompressionDbManager.js b/components/webui/imports/api/ingestion/server/CompressionDbManager.js index 537157b06..03a1dda4d 100644 --- a/components/webui/imports/api/ingestion/server/CompressionDbManager.js +++ b/components/webui/imports/api/ingestion/server/CompressionDbManager.js @@ -41,7 +41,7 @@ class CompressionDbManager { FROM ${this.#compressionJobsTableName} WHERE ${COMPRESSION_JOBS_TABLE_COLUMN_NAMES.UPDATE_TIME} >= FROM_UNIXTIME(${lastUpdateTimestampSeconds}) - 1 - ORDER BY _id DESC;\n`; + ORDER BY _id DESC;`; const [results] = await this.#sqlDbConnPool.query(queryString); From 2f50bacdf043d6f629efde36c836dd0815241466 Mon Sep 17 00:00:00 2001 From: Abigail Matthews Date: Fri, 24 Jan 2025 16:16:18 -0500 Subject: [PATCH 8/8] Update components/clp-py-utils/clp_py_utils/initialize-orchestration-db.py Co-authored-by: Junhao Liao --- .../clp-py-utils/clp_py_utils/initialize-orchestration-db.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/components/clp-py-utils/clp_py_utils/initialize-orchestration-db.py b/components/clp-py-utils/clp_py_utils/initialize-orchestration-db.py index f0811e714..c29128853 100644 --- a/components/clp-py-utils/clp_py_utils/initialize-orchestration-db.py +++ b/components/clp-py-utils/clp_py_utils/initialize-orchestration-db.py @@ -66,7 +66,7 @@ def main(argv): `clp_config` VARBINARY(60000) NOT NULL, PRIMARY KEY (`id`) USING BTREE, INDEX `JOB_STATUS` (`status`) USING BTREE, - INDEX `LAST_UPDATE_TIME` (`update_time`) USING BTREE + INDEX `JOB_UPDATE_TIME` (`update_time`) USING BTREE ) ROW_FORMAT=DYNAMIC """ )