Skip to content

Commit

Permalink
[Improve] Graceful stop flink job with savepoint (#3796)
Browse files Browse the repository at this point in the history
Co-authored-by: benjobs <[email protected]>
  • Loading branch information
dyccode and wolfboys authored Jun 26, 2024
1 parent 8b5b200 commit 45f169a
Show file tree
Hide file tree
Showing 3 changed files with 3 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,8 @@ public boolean checkEnv(Application appParam) throws ApplicationException {
@Override
public boolean checkAlter(Application appParam) {
Long appId = appParam.getId();
if (FlinkAppStateEnum.CANCELED != appParam.getStateEnum()) {
if (FlinkAppStateEnum.CANCELED != appParam.getStateEnum()
&& FlinkAppStateEnum.FINISHED != appParam.getStateEnum()) {
return false;
}
long cancelUserId = FlinkAppHttpWatcher.getCanceledJobUserId(appId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,7 @@ private void handleNotRunState(
doPersistMetrics(application, false);
break;
case CANCELED:
case FINISHED:
log.info(
"[StreamPark][FlinkAppHttpWatcher] getFromFlinkRestApi, job state {}, stop tracking and delete stopFrom!",
currentState.name());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -538,10 +538,6 @@ trait FlinkClientTrait extends Logger {
case (false, false) =>
client.cancel(jobID).get()
null
case (true, false) =>
clientWrapper
.cancelWithSavepoint(jobID, savePointDir, cancelRequest.nativeFormat)
.get()
case (_, _) =>
clientWrapper
.stopWithSavepoint(
Expand Down

0 comments on commit 45f169a

Please sign in to comment.