Skip to content

Commit

Permalink
able to handle recurring job now
Browse files Browse the repository at this point in the history
  • Loading branch information
dev8723 committed Mar 31, 2023
1 parent 7854f08 commit 5116232
Show file tree
Hide file tree
Showing 8 changed files with 293 additions and 94 deletions.
3 changes: 2 additions & 1 deletion TODO.MD
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@
MVP should be considered as completed, optimization and corner cases still need to be taken care of.

## next steps
- better logging, like centralizing log to one place, currently we log in redis_helper and ws_helper
- test if recurring job will work
- event is recur_job, this logic is in reply of warp_controller
- need to add corresponding handler

- determine if we need to block in ws. i.e. handle ws event one after another
- i guess so, if we receive any job update event, we want to reflect that in redis as soon as possible to avoid executing executed / deleted job
- it would make log very clear if we can make it blocking
- fix eslint suggestions
- finish all TODOs in code

Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
"withdraw-account": "ts-node src/scripts/tester/withdraw_account.ts",
"create-job": "ts-node src/scripts/tester/create_job.ts",
"update-job": "ts-node src/scripts/tester/update_job.ts",
"delete-job": "ts-node src/scripts/tester/delete_job.ts",
"execute-job": "ts-node src/scripts/keeper/execute_job.ts",
"evict-job": "ts-node src/scripts/keeper/evict_job.ts"
}
Expand Down
17 changes: 14 additions & 3 deletions src/libs/constant.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,15 @@ export const EVENT_ATTRIBUTE_VALUE_UPDATE_JOB = 'update_job';
export const EVENT_ATTRIBUTE_VALUE_EXECUTE_JOB = 'execute_job';
export const EVENT_ATTRIBUTE_VALUE_EVICT_JOB = 'evict_job';
export const EVENT_ATTRIBUTE_VALUE_DELETE_JOB = 'delete_job';

// attributes for recurring job in the reply logic
export const EVENT_ATTRIBUTE_VALUE_EXECUTE_REPLY = 'execute_reply';
export const EVENT_ATTRIBUTE_VALUE_RECUR_JOB = 'recur_job';
export const EVENT_ATTRIBUTE_VALUE_CREATION_STATUS = 'creation_status';

export const EVENT_ATTRIBUTE_KET_CREATION_STATUS = 'creation_status';
export const EVENT_ATTRIBUTE_VALUE_CREATION_STATUS_CREATED = 'created';
export const EVENT_ATTRIBUTE_VALUE_CREATION_STATUS_FAILED_INSUFFICIENT_FEE =
'failed_insufficient_fee';
export const EVENT_ATTRIBUTE_VALUE_CREATION_STATUS_FAILED_INVALID_JOB_STATUS =
'failed_invalid_job_status';

export const ACTIONABLE_ACTIONS = [
EVENT_ATTRIBUTE_VALUE_CREATE_JOB,
Expand All @@ -29,6 +34,12 @@ export const ACTIONABLE_ACTIONS = [
EVENT_ATTRIBUTE_VALUE_DELETE_JOB,
];

export const AVAILABLE_RECURRING_JOB_CREATION_STATUS = [
EVENT_ATTRIBUTE_VALUE_CREATION_STATUS_CREATED,
EVENT_ATTRIBUTE_VALUE_CREATION_STATUS_FAILED_INSUFFICIENT_FEE,
EVENT_ATTRIBUTE_VALUE_CREATION_STATUS_FAILED_INVALID_JOB_STATUS,
];

export const QUERY_JOB_LIMIT = 50;
export const JOB_STATUS_PENDING: warp_controller.JobStatus = 'Pending';
export const JOB_STATUS_EXECUTED: warp_controller.JobStatus = 'Executed';
Expand Down
88 changes: 83 additions & 5 deletions src/libs/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,13 @@ import {
import { TMEvent, TMEventAttribute, TMLog } from './schema';
import {
ACTIONABLE_ACTIONS,
AVAILABLE_RECURRING_JOB_CREATION_STATUS,
CHAIN_ID_LOCALTERRA,
EVENT_ATTRIBUTE_KET_CREATION_STATUS,
EVENT_ATTRIBUTE_KEY_ACTION,
EVENT_ATTRIBUTE_KEY_JOB_ID,
EVENT_ATTRIBUTE_VALUE_CREATION_STATUS_CREATED,
EVENT_ATTRIBUTE_VALUE_RECUR_JOB,
EVENT_TYPE_WASM,
VALID_JOB_STATUS,
} from './constant';
Expand Down Expand Up @@ -98,6 +103,10 @@ export const getActionableEvents = (tmResponse: TendermintSubscriptionResponse):
attribute.key === EVENT_ATTRIBUTE_KEY_ACTION &&
ACTIONABLE_ACTIONS.includes(attribute.value)
) {
// NOTE: execute_job will trigger a reply function, resulting in 2 attribute keys being action
// key: action, value: execute_job; key: action, value: execute_reply
// if the executed job is recurring, there will be 3 attribute keys being action
// key: action, value: execute_job; key: action, value: execute_reply; key: action, value: recur_job
actionableEvents.push(event);
break;
}
Expand All @@ -107,22 +116,44 @@ export const getActionableEvents = (tmResponse: TendermintSubscriptionResponse):
return actionableEvents;
};

export const getValueByKeyInAttributes = (attributes: TMEventAttribute[], k: string): string => {
let val = '';
// there could be duplicate key in attributes, throw exception if not found
export const getAllValuesByKeyInAttributes = (
attributes: TMEventAttribute[],
k: string
): string[] => {
let val = [];
for (const attribute of attributes) {
if (attribute.key === k) {
val = attribute.value;
break;
val.push(attribute.value);
}
}
if (val === '') {
if (val.length === 0) {
throw new Error(
`please inspect manually, value not found by key: ${k} in attributes: ${attributes}`
);
}
return val;
};

// expect key to be unique, throw exception if not unique or not found
export const getUniqueValueByKeyInAttributes = (
attributes: TMEventAttribute[],
k: string
): string => {
let val = getAllValuesByKeyInAttributes(attributes, k);
if (val.length !== 1) {
throw new Error(
`please inspect manually, value expect to be unique by key: ${k} in attributes: ${attributes}`
);
}
if (val[0] === '') {
throw new Error(
`please inspect manually, value not found by key: ${k} in attributes: ${attributes}`
);
}
return val[0];
};

export const parseJobRewardFromStringToNumber = (reward: string): number => {
// TODO: maybe use bigint in the future
// but reward is usually low so it shouldn't overflow
Expand Down Expand Up @@ -211,3 +242,50 @@ export const sendErrorToSentry = (e: any): void => {
Sentry.captureException(new Error(e));
// transaction.finish();
};

// return true if executed job is recurring and new job created successfully
// it's possible when recurring job creation failed due to insufficient balance or invalid condition
export const isExecutedJobRecurringAndNewJobCreatedSuccessfully = (
attributes: TMEventAttribute[]
): boolean => {
const actions = getAllValuesByKeyInAttributes(attributes, EVENT_ATTRIBUTE_KEY_ACTION);
// only executing recurring job has recur_job action
if (!actions.includes(EVENT_ATTRIBUTE_VALUE_RECUR_JOB)) {
return false;
}
const newJobCreationStatus = getUniqueValueByKeyInAttributes(
attributes,
EVENT_ATTRIBUTE_KET_CREATION_STATUS
);
if (!AVAILABLE_RECURRING_JOB_CREATION_STATUS.includes(newJobCreationStatus)) {
throw new Error(
`unknown creation_status for created recurring job in reply: ${newJobCreationStatus}`
);
}
return newJobCreationStatus === EVENT_ATTRIBUTE_VALUE_CREATION_STATUS_CREATED;
};

export const getAllJobIdsInNumberAndSortInAscendingOrder = (
attributes: TMEventAttribute[]
): number[] => {
const allJobIds = getAllValuesByKeyInAttributes(attributes, EVENT_ATTRIBUTE_KEY_JOB_ID);
const allJobIdsNumber = allJobIds.map((jobId) => Number(jobId));
allJobIdsNumber.sort();
return allJobIdsNumber;
};

// sometimes there are multiple jobIds in attributes, the smallest one is the processed jobId
// e.g. in execute_job has jobId of executed job and jobId of newly created job if executed job is recurring
export const getProcessedJobId = (attributes: TMEventAttribute[]): string => {
const allJobIdsNumber = getAllJobIdsInNumberAndSortInAscendingOrder(attributes);
return allJobIdsNumber[0].toString();
};

// there will be 3 jobId in the attributes, 2 being the old executed jobId, 1 being the newly created jobId
// new jobId should be the biggest one cause jobId is ascending
export const getNewJobIdCreatedFromExecutingRecurringJob = (
attributes: TMEventAttribute[]
): string => {
const allJobIdsNumber = getAllJobIdsInNumberAndSortInAscendingOrder(attributes);
return allJobIdsNumber[allJobIdsNumber.length - 1].toString();
};
92 changes: 46 additions & 46 deletions src/libs/ws_helper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@ import { MnemonicKey, TendermintSubscriptionResponse, Wallet } from '@terra-mone
import { warp_controller, WarpSdk } from '@terra-money/warp-sdk';
import {
getActionableEvents,
getValueByKeyInAttributes,
getAllValuesByKeyInAttributes,
getNewJobIdCreatedFromExecutingRecurringJob,
getProcessedJobId,
getUniqueValueByKeyInAttributes,
isExecutedJobRecurringAndNewJobCreatedSuccessfully,
parseJobRewardFromStringToNumber,
parseJobStatusFromStringToJobStatus,
printAxiosError,
Expand All @@ -11,7 +15,6 @@ import {
import { TMEvent, TMEventAttribute } from './schema';
import {
EVENT_ATTRIBUTE_KEY_ACTION,
EVENT_ATTRIBUTE_KEY_JOB_ID,
EVENT_ATTRIBUTE_KEY_JOB_LAST_UPDATED_TIME,
EVENT_ATTRIBUTE_KEY_JOB_REWARD,
EVENT_ATTRIBUTE_KEY_JOB_STATUS,
Expand Down Expand Up @@ -54,8 +57,9 @@ export const handleJobCreation = async (
// console.log('sleep half block in case rpc has not synced to latest state yet');
// await new Promise((resolve) => setTimeout(resolve, 1000));

const job: warp_controller.Job = await warpSdk.job(jobId);
const job = await warpSdk.job(jobId);
await saveJob(job, redisClient);
console.log(`jobId ${jobId} saved to redis`);

// do not try to execute job even if active
// all execute job operation should be blocking, i can't get this ws callback work in blocking way
Expand All @@ -75,9 +79,19 @@ export const handleJobCreation = async (

export const handleJobExecution = async (
redisClient: RedisClientType,
jobId: string
warpSdk: WarpSdk,
jobId: string,
attributes: TMEventAttribute[]
): Promise<void> => {
await removeJobFromRedis(redisClient, jobId);
if (isExecutedJobRecurringAndNewJobCreatedSuccessfully(attributes)) {
const newJobId = getNewJobIdCreatedFromExecutingRecurringJob(attributes);
const newJob = await warpSdk.job(newJobId);
await saveJob(newJob, redisClient);
console.log(
`jobId ${newJobId} is created from executing recurring jobId ${jobId}, saved to redis`
);
}
};

export const handleJobDeletion = async (
Expand All @@ -92,9 +106,14 @@ export const handleJobUpdate = async (
redisClient: RedisClientType,
warpSdk: WarpSdk,
jobId: string,
newReward: number,
updatedJobLastUpdateTimeStr: string
attributes: TMEventAttribute[]
): Promise<void> => {
const newRewardStr = getUniqueValueByKeyInAttributes(attributes, EVENT_ATTRIBUTE_KEY_JOB_REWARD);
const newReward = parseJobRewardFromStringToNumber(newRewardStr);
const updatedJobLastUpdateTimeStr = getUniqueValueByKeyInAttributes(
attributes,
EVENT_ATTRIBUTE_KEY_JOB_LAST_UPDATED_TIME
);
await updateJobRewardInRedis(redisClient, warpSdk, jobId, newReward);
await updateJobLastUpdateTimeInRedis(redisClient, jobId, updatedJobLastUpdateTimeStr);
};
Expand All @@ -106,8 +125,13 @@ export const handleJobEviction = async (
redisClient: RedisClientType,
warpSdk: WarpSdk,
jobId: string,
newStatus: warp_controller.JobStatus
attributes: TMEventAttribute[]
): Promise<void> => {
const newStatusStr = getUniqueValueByKeyInAttributes(attributes, EVENT_ATTRIBUTE_KEY_JOB_STATUS);
const newStatus = parseJobStatusFromStringToJobStatus(newStatusStr);
// evict_job doesn't log last_update_time so we have to query job to get it
// updatedJobLastUpdateTimeStr = getValueByKeyInAttributes(attributes, EVENT_ATTRIBUTE_KEY_JOB_LAST_UPDATED_TIME);

// theoretically status is either pending (enqueue is true and enough money to pay rent) or evicted
// no need to update if job is pending, it should have been added to pending set earlier
const updatedJobLastUpdateTimeStr = (await warpSdk.job(jobId)).last_update_time;
Expand All @@ -132,43 +156,22 @@ export const processEvent = async (
warpSdk: WarpSdk
): Promise<void> => {
const attributes = event.attributes;
let jobId = getValueByKeyInAttributes(attributes, EVENT_ATTRIBUTE_KEY_JOB_ID);
let jobAction = getValueByKeyInAttributes(attributes, EVENT_ATTRIBUTE_KEY_ACTION);
console.log(`new event from WS, jobId: ${jobId}, jobAction: ${jobAction}`);
let newRewardStr: string;
let newReward: number;
let newStatusStr: string;
let newStatus: warp_controller.JobStatus;
let updatedJobLastUpdateTimeStr: string;
let jobId = getProcessedJobId(attributes);
let jobActions = getAllValuesByKeyInAttributes(attributes, EVENT_ATTRIBUTE_KEY_ACTION);
console.log(`new event from WS, jobId: ${jobId}, jobAction: ${jobActions}`);

switch (jobAction) {
case EVENT_ATTRIBUTE_VALUE_CREATE_JOB:
handleJobCreation(redisClient, mnemonicKey, wallet, warpSdk, jobId, attributes);
break;
case EVENT_ATTRIBUTE_VALUE_EXECUTE_JOB:
handleJobExecution(redisClient, jobId);
break;
case EVENT_ATTRIBUTE_VALUE_DELETE_JOB:
handleJobDeletion(redisClient, jobId);
break;
case EVENT_ATTRIBUTE_VALUE_UPDATE_JOB:
newRewardStr = getValueByKeyInAttributes(attributes, EVENT_ATTRIBUTE_KEY_JOB_REWARD);
newReward = parseJobRewardFromStringToNumber(newRewardStr);
updatedJobLastUpdateTimeStr = getValueByKeyInAttributes(
attributes,
EVENT_ATTRIBUTE_KEY_JOB_LAST_UPDATED_TIME
);
handleJobUpdate(redisClient, warpSdk, jobId, newReward, updatedJobLastUpdateTimeStr);
break;
case EVENT_ATTRIBUTE_VALUE_EVICT_JOB:
newStatusStr = getValueByKeyInAttributes(attributes, EVENT_ATTRIBUTE_KEY_JOB_STATUS);
newStatus = parseJobStatusFromStringToJobStatus(newStatusStr);
// evict_job doesn't log last_update_time so we have to query job to get it
// updatedJobLastUpdateTimeStr = getValueByKeyInAttributes(attributes, EVENT_ATTRIBUTE_KEY_JOB_LAST_UPDATED_TIME);
handleJobEviction(redisClient, warpSdk, jobId, newStatus);
break;
default:
throw new Error(`unknown jobAction: ${jobAction}`);
if (jobActions.includes(EVENT_ATTRIBUTE_VALUE_CREATE_JOB)) {
handleJobCreation(redisClient, mnemonicKey, wallet, warpSdk, jobId, attributes);
} else if (jobActions.includes(EVENT_ATTRIBUTE_VALUE_EXECUTE_JOB)) {
handleJobExecution(redisClient, warpSdk, jobId, attributes);
} else if (jobActions.includes(EVENT_ATTRIBUTE_VALUE_DELETE_JOB)) {
handleJobDeletion(redisClient, jobId);
} else if (jobActions.includes(EVENT_ATTRIBUTE_VALUE_UPDATE_JOB)) {
handleJobUpdate(redisClient, warpSdk, jobId, attributes);
} else if (jobActions.includes(EVENT_ATTRIBUTE_VALUE_EVICT_JOB)) {
handleJobEviction(redisClient, warpSdk, jobId, attributes);
} else {
throw new Error(`unknown jobActions: ${jobActions}`);
}
};

Expand All @@ -179,11 +182,8 @@ export const processWebSocketEvent = async (
wallet: Wallet,
warpSdk: WarpSdk
): Promise<void> => {
console.log('new tx on warp_controller contract!');
// console.log('tx log: ' + tmResponse.value.TxResult.result.log)
// console.log('tx type type: ' + tmResponse.type);
// usually actionableEvents should only have 1 event, since 1 tx only has 1 wasm event
// TODO: check if create multiple jobs in 1 tx
const actionableEvents = getActionableEvents(tmResponse);
actionableEvents.forEach(
async (event) =>
Expand Down
2 changes: 1 addition & 1 deletion src/scripts/keeper/execute_job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ const owner = wallet.key.accAddress;

const run = async () => {
warpSdk
.executeJob(owner, '16')
.executeJob(owner, '227')
.then((txInfo) => console.log(txInfo))
.catch((e) => {
printAxiosError(e);
Expand Down
Loading

0 comments on commit 5116232

Please sign in to comment.