Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Missing Ingestion Jobs from WebUI Table #679

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ def main(argv):
`status_msg` VARCHAR(512) NOT NULL DEFAULT '',
`creation_time` DATETIME(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3),
`start_time` DATETIME(3) NULL DEFAULT NULL,
`update_time` DATETIME(3) NULL DEFAULT NULL,
`duration` FLOAT NULL DEFAULT NULL,
`original_size` BIGINT NOT NULL DEFAULT '0',
`uncompressed_size` BIGINT NOT NULL DEFAULT '0',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,12 +209,14 @@ def search_and_schedule_new_tasks(db_conn, db_cursor, clp_metadata_db_connection

if len(tasks) == 0:
logger.warning(f"No tasks were created for job {job_id}")
update_time = datetime.datetime.now()
update_compression_job_metadata(
db_cursor,
job_id,
{
"status": CompressionJobStatus.FAILED,
"status_msg": "invalid input path",
"update_time": update_time
},
)
db_conn.commit()
Expand All @@ -229,6 +231,7 @@ def search_and_schedule_new_tasks(db_conn, db_cursor, clp_metadata_db_connection
"num_tasks": paths_to_compress_buffer.num_tasks,
"status": CompressionJobStatus.RUNNING,
"start_time": start_time,
"update_time": start_time
},
)
db_conn.commit()
Expand Down Expand Up @@ -324,6 +327,7 @@ def poll_running_jobs(db_conn, db_cursor):
logger.error(f"Error while getting results for job {job_id}: {e}")
job_success = False

update_time = datetime.datetime.now()
if job_success:
logger.info(f"Job {job_id} succeeded.")
update_compression_job_metadata(
Expand All @@ -332,6 +336,7 @@ def poll_running_jobs(db_conn, db_cursor):
dict(
status=CompressionJobStatus.SUCCEEDED,
duration=duration,
update_time=update_time
),
)
else:
Expand All @@ -342,6 +347,7 @@ def poll_running_jobs(db_conn, db_cursor):
dict(
status=CompressionJobStatus.FAILED,
status_msg=error_message,
update_time=update_time
),
)
db_conn.commit()
Expand Down
1 change: 1 addition & 0 deletions components/webui/imports/api/ingestion/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ const COMPRESSION_JOBS_TABLE_COLUMN_NAMES = Object.freeze({
STATUS_MSG: "status_msg",
CREATION_TIME: "creation_time",
START_TIME: "start_time",
UPDATE_TIME: "update_time",
DURATION: "duration",
ORIGINAL_SIZE: "original_size",
UNCOMPRESSED_SIZE: "uncompressed_size",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ class CompressionDbManager {
* Retrieves the last `limit` number of jobs and the ones with the given
* job IDs.
*
* @param {number} limit
* @param {string} lastUpdateDate
* @param {number[]} jobIds
* @return {Promise<object[]>} Job objects with fields with the names in
* `COMPRESSION_JOBS_TABLE_COLUMN_NAMES`
*/
async getCompressionJobs (limit, jobIds) {
async getCompressionJobs (lastUpdateDate, jobIds) {
const queries = [];

queries.push(`
Expand All @@ -38,6 +38,7 @@ class CompressionDbManager {
${COMPRESSION_JOBS_TABLE_COLUMN_NAMES.STATUS},
${COMPRESSION_JOBS_TABLE_COLUMN_NAMES.STATUS_MSG},
${COMPRESSION_JOBS_TABLE_COLUMN_NAMES.START_TIME},
${COMPRESSION_JOBS_TABLE_COLUMN_NAMES.UPDATE_TIME},
${COMPRESSION_JOBS_TABLE_COLUMN_NAMES.DURATION},
${COMPRESSION_JOBS_TABLE_COLUMN_NAMES.UNCOMPRESSED_SIZE},
${COMPRESSION_JOBS_TABLE_COLUMN_NAMES.COMPRESSED_SIZE}
Expand All @@ -47,7 +48,6 @@ class CompressionDbManager {
SELECT *
FROM SelectedColumns
ORDER BY _id DESC
LIMIT ${limit}
)
`);

Expand All @@ -59,7 +59,9 @@ class CompressionDbManager {
(
SELECT *
FROM SelectedColumns
WHERE _id=${jobId}
WHERE
_id=${jobId} &&
${COMPRESSION_JOBS_TABLE_COLUMN_NAMES.UPDATE_TIME} >= '${lastUpdateDate}'
)
`);
});
Expand Down
23 changes: 17 additions & 6 deletions components/webui/imports/api/ingestion/server/publications.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,10 @@ import StatsDbManager from "./StatsDbManager";

const COMPRESSION_JOBS_REFRESH_INTERVAL_MILLIS = 1000;

/**
* The maximum number of compression jobs to retrieve at a time.
*/
const COMPRESSION_MAX_RETRIEVE_JOBS = 5;

const STATS_REFRESH_INTERVAL_MILLIS = 5000;

const CONST_FOR_DATE_FORMAT = 19;

/**
* @type {CompressionDbManager|null}
*/
Expand All @@ -45,6 +42,13 @@ let compressionJobsRefreshTimeout = null;
*/
let statsRefreshInterval = null;

/**
* @type {string}
*/
let lastUpdateDate = new Date().toISOString()
.slice(0, CONST_FOR_DATE_FORMAT)
.replace("T", " ");

/**
* Updates the compression statistics in the StatsCollection.
*
Expand Down Expand Up @@ -99,11 +103,18 @@ const refreshCompressionJobs = async () => {
job._id
));

// Save timestamp before querying the combressionDBManager
const newDate = new Date().toISOString()
.slice(0, CONST_FOR_DATE_FORMAT)
.replace("T", " ");

const jobs = await compressionDbManager.getCompressionJobs(
COMPRESSION_MAX_RETRIEVE_JOBS,
lastUpdateDate,
pendingJobIds
);

lastUpdateDate = newDate;

AVMatthews marked this conversation as resolved.
Show resolved Hide resolved
const operations = jobs.map((doc) => ({
updateOne: {
filter: {_id: doc._id},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,13 @@ const IngestionJobRow = ({job}) => {
isAlwaysVisible={isPlaceholderVisible}
text={compressedSizeText}/>
</td>
<td className={"text-end"}>
<PlaceholderText
isAlwaysVisible={isPlaceholderVisible}
text={(null === job.update_time) ?
"null" :
new Date(job.update_time).toLocaleString()}/>
</td>
</tr>
);
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ const IngestionJobs = () => {
<th className={"text-end"}>Speed</th>
<th className={"text-end"}>Data Ingested</th>
<th className={"text-end"}>Compressed Size</th>
<th className={"text-end"}>Last Updated</th>
</tr>
</thead>
<tbody>
Expand Down
Loading