Skip to content

Commit

Permalink
use workunit id for state store
Browse files Browse the repository at this point in the history
  • Loading branch information
hanghangliu committed Dec 5, 2023
1 parent 9ad98bd commit 978f989
Showing 1 changed file with 3 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -618,6 +618,7 @@ private WorkUnit getWorkUnitFromStateStoreByHelixId(String helixTaskId) {
helixIdTaskConfigMap.get(helixTaskId).getConfigMap().get(GobblinClusterConfigurationKeys.WORK_UNIT_FILE_PATH);
final StateStore stateStore;
Path workUnitFile = new Path(workUnitFilePath);
String workUnitId = helixIdTaskConfigMap.get(helixTaskId).getConfigMap().get(ConfigurationKeys.TASK_ID_KEY);
final String fileName = workUnitFile.getName();
final String storeName = workUnitFile.getParent().getName();
if (fileName.endsWith(MULTI_WORK_UNIT_FILE_EXTENSION)) {
Expand All @@ -626,9 +627,9 @@ private WorkUnit getWorkUnitFromStateStoreByHelixId(String helixTaskId) {
stateStore = stateStores.getWuStateStore();
}
try {
return (WorkUnit) stateStore.get(storeName, fileName, helixTaskId);
return (WorkUnit) stateStore.get(storeName, fileName, workUnitId);
} catch (IOException ioException) {
log.error("Failed to fetch workunit with ID {} from path {}", helixTaskId, helixTaskId);
log.error("Failed to fetch workUnit for helix task {} from path {}", helixTaskId, workUnitFilePath);
}
return null;
}
Expand Down

0 comments on commit 978f989

Please sign in to comment.