Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Missing Ingestion Jobs from WebUI Table #679

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ def main(argv):
`status_msg` VARCHAR(512) NOT NULL DEFAULT '',
`creation_time` DATETIME(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3),
`start_time` DATETIME(3) NULL DEFAULT NULL,
`update_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP(),
`duration` FLOAT NULL DEFAULT NULL,
`original_size` BIGINT NOT NULL DEFAULT '0',
`uncompressed_size` BIGINT NOT NULL DEFAULT '0',
Expand All @@ -64,7 +65,8 @@ def main(argv):
`clp_binary_version` INT NULL DEFAULT NULL,
`clp_config` VARBINARY(60000) NOT NULL,
PRIMARY KEY (`id`) USING BTREE,
INDEX `JOB_STATUS` (`status`) USING BTREE
INDEX `JOB_STATUS` (`status`) USING BTREE,
INDEX `LAST_UPDATE_TIME` (`update_time`) USING BTREE
) ROW_FORMAT=DYNAMIC
"""
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def update_compression_job_metadata(db_cursor, job_id, kv):
logger.error("Must specify at least one field to update")
raise ValueError

field_set_expressions = [f"{k} = %s" for k in kv.keys()]
field_set_expressions = [f"{k} = %s" for k in kv.keys()] + ["update_time = CURRENT_TIMESTAMP()"]
query = f"""
UPDATE {COMPRESSION_JOBS_TABLE_NAME}
SET {", ".join(field_set_expressions)}
Expand Down
1 change: 1 addition & 0 deletions components/webui/imports/api/ingestion/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ const COMPRESSION_JOBS_TABLE_COLUMN_NAMES = Object.freeze({
STATUS_MSG: "status_msg",
CREATION_TIME: "creation_time",
START_TIME: "start_time",
UPDATE_TIME: "update_time",
DURATION: "duration",
ORIGINAL_SIZE: "original_size",
UNCOMPRESSED_SIZE: "uncompressed_size",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,53 +20,29 @@ class CompressionDbManager {
}

/**
* Retrieves the last `limit` number of jobs and the ones with the given
* job IDs.
* Retrieves compression jobs that are updated on or after a specific time.
*
* @param {number} limit
* @param {number[]} jobIds
* @param {number} lastUpdateTimestampSeconds
* @return {Promise<object[]>} Job objects with fields with the names in
* `COMPRESSION_JOBS_TABLE_COLUMN_NAMES`
*/
async getCompressionJobs (limit, jobIds) {
const queries = [];
async getCompressionJobs (lastUpdateTimestampSeconds) {
const queryString = `
SELECT
UNIX_TIMESTAMP() as retrieval_time,
id as _id,
${COMPRESSION_JOBS_TABLE_COLUMN_NAMES.STATUS},
${COMPRESSION_JOBS_TABLE_COLUMN_NAMES.STATUS_MSG},
${COMPRESSION_JOBS_TABLE_COLUMN_NAMES.START_TIME},
${COMPRESSION_JOBS_TABLE_COLUMN_NAMES.UPDATE_TIME},
${COMPRESSION_JOBS_TABLE_COLUMN_NAMES.DURATION},
${COMPRESSION_JOBS_TABLE_COLUMN_NAMES.UNCOMPRESSED_SIZE},
${COMPRESSION_JOBS_TABLE_COLUMN_NAMES.COMPRESSED_SIZE}
FROM ${this.#compressionJobsTableName}
WHERE ${COMPRESSION_JOBS_TABLE_COLUMN_NAMES.UPDATE_TIME} >=
FROM_UNIXTIME(${lastUpdateTimestampSeconds}) -1
ORDER BY _id DESC;\n`;

queries.push(`
WITH SelectedColumns AS (
SELECT
id as _id,
${COMPRESSION_JOBS_TABLE_COLUMN_NAMES.STATUS},
${COMPRESSION_JOBS_TABLE_COLUMN_NAMES.STATUS_MSG},
${COMPRESSION_JOBS_TABLE_COLUMN_NAMES.START_TIME},
${COMPRESSION_JOBS_TABLE_COLUMN_NAMES.DURATION},
${COMPRESSION_JOBS_TABLE_COLUMN_NAMES.UNCOMPRESSED_SIZE},
${COMPRESSION_JOBS_TABLE_COLUMN_NAMES.COMPRESSED_SIZE}
FROM ${this.#compressionJobsTableName}
)
(
SELECT *
FROM SelectedColumns
ORDER BY _id DESC
LIMIT ${limit}
)
`);

// The initial select may not include the jobs specified by `jobIds`, so we select
// them explicitly and then deduplicate the rows with a UNION DISTINCT clause.
jobIds.forEach((jobId) => {
queries.push(`
UNION DISTINCT
(
SELECT *
FROM SelectedColumns
WHERE _id=${jobId}
)
`);
});

queries.push("ORDER BY _id DESC;");

const queryString = queries.join("\n");
const [results] = await this.#sqlDbConnPool.query(queryString);

return results;
Expand Down
34 changes: 13 additions & 21 deletions components/webui/imports/api/ingestion/server/publications.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,12 @@ import {
STATS_COLLECTION_ID,
StatsCollection,
} from "../collections";
import {
COMPRESSION_JOB_WAITING_STATES,
COMPRESSION_JOBS_TABLE_COLUMN_NAMES,
} from "../constants";
import CompressionDbManager from "./CompressionDbManager";
import StatsDbManager from "./StatsDbManager";


const COMPRESSION_JOBS_REFRESH_INTERVAL_MILLIS = 1000;

/**
* The maximum number of compression jobs to retrieve at a time.
*/
const COMPRESSION_MAX_RETRIEVE_JOBS = 5;

const STATS_REFRESH_INTERVAL_MILLIS = 5000;

/**
Expand All @@ -45,6 +36,11 @@ let compressionJobsRefreshTimeout = null;
*/
let statsRefreshInterval = null;

/**
* @type {number}
*/
let lastUpdateTimestampSeconds = 0;

/**
* Updates the compression statistics in the StatsCollection.
*
Expand Down Expand Up @@ -89,21 +85,17 @@ const refreshCompressionJobs = async () => {
return;
}

const pendingJobIds = await CompressionJobsCollection.find({
[COMPRESSION_JOBS_TABLE_COLUMN_NAMES.STATUS]: {
$in: COMPRESSION_JOB_WAITING_STATES,
},
})
.fetch()
.map((job) => (
job._id
));

const jobs = await compressionDbManager.getCompressionJobs(
COMPRESSION_MAX_RETRIEVE_JOBS,
pendingJobIds
lastUpdateTimestampSeconds
);

if (0 !== jobs.length) {
// `refreshCompressionJobs()` shall not be run concurrently
// and therefore incurs no race condition.
// eslint-disable-next-line require-atomic-updates
lastUpdateTimestampSeconds = jobs[0].retrieval_time;
}

const operations = jobs.map((doc) => ({
updateOne: {
filter: {_id: doc._id},
Expand Down
Loading