Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Missing Ingestion Jobs from WebUI Table #679

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ def main(argv):
`status_msg` VARCHAR(512) NOT NULL DEFAULT '',
`creation_time` DATETIME(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3),
`start_time` DATETIME(3) NULL DEFAULT NULL,
`update_time` DATETIME(3) NULL DEFAULT NULL,
AVMatthews marked this conversation as resolved.
Show resolved Hide resolved
AVMatthews marked this conversation as resolved.
Show resolved Hide resolved
`duration` FLOAT NULL DEFAULT NULL,
`original_size` BIGINT NOT NULL DEFAULT '0',
`uncompressed_size` BIGINT NOT NULL DEFAULT '0',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,12 +209,14 @@ def search_and_schedule_new_tasks(db_conn, db_cursor, clp_metadata_db_connection

if len(tasks) == 0:
logger.warning(f"No tasks were created for job {job_id}")
update_time = datetime.datetime.now()
AVMatthews marked this conversation as resolved.
Show resolved Hide resolved
update_compression_job_metadata(
db_cursor,
job_id,
{
"status": CompressionJobStatus.FAILED,
"status_msg": "invalid input path",
"update_time": update_time
},
)
db_conn.commit()
Expand All @@ -229,6 +231,7 @@ def search_and_schedule_new_tasks(db_conn, db_cursor, clp_metadata_db_connection
"num_tasks": paths_to_compress_buffer.num_tasks,
"status": CompressionJobStatus.RUNNING,
"start_time": start_time,
"update_time": start_time
},
)
db_conn.commit()
Expand Down Expand Up @@ -324,6 +327,7 @@ def poll_running_jobs(db_conn, db_cursor):
logger.error(f"Error while getting results for job {job_id}: {e}")
job_success = False

update_time = datetime.datetime.now()
if job_success:
logger.info(f"Job {job_id} succeeded.")
update_compression_job_metadata(
Expand All @@ -332,6 +336,7 @@ def poll_running_jobs(db_conn, db_cursor):
dict(
status=CompressionJobStatus.SUCCEEDED,
duration=duration,
update_time=update_time
),
)
else:
Expand All @@ -342,6 +347,7 @@ def poll_running_jobs(db_conn, db_cursor):
dict(
status=CompressionJobStatus.FAILED,
status_msg=error_message,
update_time=update_time
),
)
db_conn.commit()
Expand Down
1 change: 1 addition & 0 deletions components/webui/imports/api/ingestion/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ const COMPRESSION_JOBS_TABLE_COLUMN_NAMES = Object.freeze({
STATUS_MSG: "status_msg",
CREATION_TIME: "creation_time",
START_TIME: "start_time",
UPDATE_TIME: "update_time",
DURATION: "duration",
ORIGINAL_SIZE: "original_size",
UNCOMPRESSED_SIZE: "uncompressed_size",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ class CompressionDbManager {
* Retrieves the last `limit` number of jobs and the ones with the given
AVMatthews marked this conversation as resolved.
Show resolved Hide resolved
* job IDs.
*
* @param {number} limit
* @param {string} lastUpdateDate
AVMatthews marked this conversation as resolved.
Show resolved Hide resolved
* @param {number[]} jobIds
AVMatthews marked this conversation as resolved.
Show resolved Hide resolved
* @return {Promise<object[]>} Job objects with fields with the names in
* `COMPRESSION_JOBS_TABLE_COLUMN_NAMES`
*/
async getCompressionJobs (limit, jobIds) {
async getCompressionJobs (lastUpdateDate, jobIds) {
const queries = [];

queries.push(`
Expand All @@ -38,6 +38,7 @@ class CompressionDbManager {
${COMPRESSION_JOBS_TABLE_COLUMN_NAMES.STATUS},
${COMPRESSION_JOBS_TABLE_COLUMN_NAMES.STATUS_MSG},
${COMPRESSION_JOBS_TABLE_COLUMN_NAMES.START_TIME},
${COMPRESSION_JOBS_TABLE_COLUMN_NAMES.UPDATE_TIME},
${COMPRESSION_JOBS_TABLE_COLUMN_NAMES.DURATION},
${COMPRESSION_JOBS_TABLE_COLUMN_NAMES.UNCOMPRESSED_SIZE},
${COMPRESSION_JOBS_TABLE_COLUMN_NAMES.COMPRESSED_SIZE}
Expand All @@ -47,7 +48,6 @@ class CompressionDbManager {
SELECT *
FROM SelectedColumns
ORDER BY _id DESC
LIMIT ${limit}
)
`);

Expand All @@ -59,7 +59,9 @@ class CompressionDbManager {
(
SELECT *
FROM SelectedColumns
WHERE _id=${jobId}
WHERE
_id=${jobId} &&
AVMatthews marked this conversation as resolved.
Show resolved Hide resolved
${COMPRESSION_JOBS_TABLE_COLUMN_NAMES.UPDATE_TIME} >= '${lastUpdateDate}'
AVMatthews marked this conversation as resolved.
Show resolved Hide resolved
)
`);
});
Expand Down
21 changes: 15 additions & 6 deletions components/webui/imports/api/ingestion/server/publications.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,10 @@ import StatsDbManager from "./StatsDbManager";

const COMPRESSION_JOBS_REFRESH_INTERVAL_MILLIS = 1000;

/**
* The maximum number of compression jobs to retrieve at a time.
*/
const COMPRESSION_MAX_RETRIEVE_JOBS = 5;

const STATS_REFRESH_INTERVAL_MILLIS = 5000;

const CONST_FOR_DATE_FORMAT = 19;

/**
* @type {CompressionDbManager|null}
*/
Expand All @@ -45,6 +42,13 @@ let compressionJobsRefreshTimeout = null;
*/
let statsRefreshInterval = null;

/**
* @type {string}
*/
let lastUpdateDate = new Date().toISOString()
.slice(0, CONST_FOR_DATE_FORMAT)
.replace("T", " ");

AVMatthews marked this conversation as resolved.
Show resolved Hide resolved
/**
* Updates the compression statistics in the StatsCollection.
*
Expand Down Expand Up @@ -99,8 +103,13 @@ const refreshCompressionJobs = async () => {
job._id
));

const previousUpdateDate = lastUpdateDate;
lastUpdateDate = new Date().toISOString()
.slice(0, CONST_FOR_DATE_FORMAT)
.replace("T", " ");
AVMatthews marked this conversation as resolved.
Show resolved Hide resolved

const jobs = await compressionDbManager.getCompressionJobs(
COMPRESSION_MAX_RETRIEVE_JOBS,
previousUpdateDate,
pendingJobIds
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,13 @@ const IngestionJobRow = ({job}) => {
isAlwaysVisible={isPlaceholderVisible}
text={compressedSizeText}/>
</td>
<td className={"text-end"}>
<PlaceholderText
isAlwaysVisible={isPlaceholderVisible}
text={(null === job.update_time) ?
"null" :
new Date(job.update_time).toLocaleString()}/>
</td>
AVMatthews marked this conversation as resolved.
Show resolved Hide resolved
</tr>
);
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ const IngestionJobs = () => {
<th className={"text-end"}>Speed</th>
<th className={"text-end"}>Data Ingested</th>
<th className={"text-end"}>Compressed Size</th>
<th className={"text-end"}>Last Updated</th>
AVMatthews marked this conversation as resolved.
Show resolved Hide resolved
</tr>
</thead>
<tbody>
Expand Down
Loading