From 308f4eec8a566d8dedb27e29b590f67d990deac7 Mon Sep 17 00:00:00 2001 From: "neil.xiao" Date: Sun, 29 Sep 2024 14:28:03 +0800 Subject: [PATCH 1/3] fix(#4085): CheckPoint Failure Options dose not take effect --- .../console/core/metrics/flink/CheckPoints.java | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/metrics/flink/CheckPoints.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/metrics/flink/CheckPoints.java index 8aeadb1fcc..4d4084a419 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/metrics/flink/CheckPoints.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/metrics/flink/CheckPoints.java @@ -17,13 +17,12 @@ package org.apache.streampark.console.core.metrics.flink; -import org.apache.streampark.console.core.enums.CheckPointStatus; -import org.apache.streampark.console.core.enums.CheckPointType; - import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import lombok.Getter; import lombok.Setter; +import org.apache.streampark.console.core.enums.CheckPointStatus; +import org.apache.streampark.console.core.enums.CheckPointType; import java.io.Serializable; import java.util.ArrayList; @@ -98,6 +97,7 @@ public String getPath() { public static class Latest implements Serializable { private CheckPoint completed; private CheckPoint savepoint; + private CheckPoint failed; @JsonIgnore public List getLatestCheckpoint() { @@ -108,6 +108,13 @@ public List getLatestCheckpoint() { if (savepoint != null) { checkPoints.add(savepoint); } + if (failed != null) { + if (completed == null) { + checkPoints.add(failed); + } else { + if (failed.getId() > completed.getId()) checkPoints.add(failed); + } + } return checkPoints; } } From ddf0d7bf1c2082be9038dae9b8e86ed85231dbbe Mon Sep 17 00:00:00 2001 From: benjobs Date: Tue, 1 Oct 2024 17:33:36 +0800 Subject: [PATCH 2/3] Update CheckPoints.java --- .../core/metrics/flink/CheckPoints.java | 143 +++++++++--------- 1 file changed, 75 insertions(+), 68 deletions(-) diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/metrics/flink/CheckPoints.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/metrics/flink/CheckPoints.java index 4d4084a419..f34ac65b45 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/metrics/flink/CheckPoints.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/metrics/flink/CheckPoints.java @@ -17,105 +17,112 @@ package org.apache.streampark.console.core.metrics.flink; +import org.apache.streampark.console.core.enums.CheckPointStatusEnum; +import org.apache.streampark.console.core.enums.CheckPointTypeEnum; + import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import lombok.Getter; import lombok.Setter; -import org.apache.streampark.console.core.enums.CheckPointStatus; -import org.apache.streampark.console.core.enums.CheckPointType; import java.io.Serializable; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Objects; @Getter @Setter public class CheckPoints implements Serializable { - private List history; + private List history; - private Latest latest; + private Latest latest; - @JsonIgnore - public List getLatestCheckpoint() { - if (latest == null) { - return Collections.emptyList(); + @JsonIgnore + public List getLatestCheckpoint() { + if (Objects.isNull(latest)) { + return Collections.emptyList(); + } + return latest.getLatestCheckpoint(); } - return latest.getLatestCheckpoint(); - } - @Getter - @Setter - public static class CheckPoint implements Serializable { - private Long id; - private String status; + @Getter + @Setter + public static class CheckPoint implements Serializable { - @JsonProperty("external_path") - private String externalPath; + private Long id; + private String status; - @JsonProperty("is_savepoint") - private Boolean isSavepoint; + @JsonProperty("external_path") + private String externalPath; - @JsonProperty("latest_ack_timestamp") - private Long latestAckTimestamp; + @JsonProperty("is_savepoint") + private Boolean isSavepoint; - @JsonProperty("checkpoint_type") - private String checkpointType; + @JsonProperty("latest_ack_timestamp") + private Long latestAckTimestamp; - @JsonProperty("trigger_timestamp") - private Long triggerTimestamp; + @JsonProperty("checkpoint_type") + private String checkpointType; - @JsonProperty("state_size") - private Long stateSize; + @JsonProperty("trigger_timestamp") + private Long triggerTimestamp; - @JsonProperty("end_to_end_duration") - private Long endToEndDuration; + @JsonProperty("state_size") + private Long stateSize; - private Boolean discarded; + @JsonProperty("end_to_end_duration") + private Long endToEndDuration; - public CheckPointStatus getCheckPointStatus() { - return CheckPointStatus.valueOf(this.status); - } + private Boolean discarded; - public CheckPointType getCheckPointType() { - if ("CHECKPOINT".equals(this.checkpointType)) { - return CheckPointType.CHECKPOINT; - } else if ("SAVEPOINT".equals(this.checkpointType)) { - return CheckPointType.SAVEPOINT; - } - return CheckPointType.SYNC_SAVEPOINT; - } + public CheckPointStatusEnum getCheckPointStatus() { + return CheckPointStatusEnum.valueOf(this.status); + } - public String getPath() { - return this.getExternalPath().replaceFirst("^hdfs:/[^/]", "hdfs:///"); - } - } + public CheckPointTypeEnum getCheckPointType() { + if ("CHECKPOINT".equals(this.checkpointType)) { + return CheckPointTypeEnum.CHECKPOINT; + } + if ("SAVEPOINT".equals(this.checkpointType)) { + return CheckPointTypeEnum.SAVEPOINT; + } + return CheckPointTypeEnum.SYNC_SAVEPOINT; + } - @Getter - @Setter - public static class Latest implements Serializable { - private CheckPoint completed; - private CheckPoint savepoint; - private CheckPoint failed; + public String getPath() { + return this.getExternalPath().replaceFirst("^hdfs:/[^/]", "hdfs:///"); + } + } - @JsonIgnore - public List getLatestCheckpoint() { - List checkPoints = new ArrayList<>(); - if (completed != null) { - checkPoints.add(completed); - } - if (savepoint != null) { - checkPoints.add(savepoint); - } - if (failed != null) { - if (completed == null) { - checkPoints.add(failed); - } else { - if (failed.getId() > completed.getId()) checkPoints.add(failed); + @Getter + @Setter + public static class Latest implements Serializable { + + private CheckPoint completed; + private CheckPoint savepoint; + private CheckPoint failed; + + @JsonIgnore + public List getLatestCheckpoint() { + List checkPoints = new ArrayList<>(); + if (completed != null) { + checkPoints.add(completed); + } + if (savepoint != null) { + checkPoints.add(savepoint); + } + if (failed != null) { + if (completed == null) { + checkPoints.add(failed); + } else { + if (failed.getId() > completed.getId()) { + checkPoints.add(failed); + } + } + } + return checkPoints; } - } - return checkPoints; } - } } From ccdc301c539f205aea5c124c3706a2bc1a373897 Mon Sep 17 00:00:00 2001 From: benjobs Date: Tue, 1 Oct 2024 17:43:58 +0800 Subject: [PATCH 3/3] Update CheckPoints.java --- .../core/metrics/flink/CheckPoints.java | 145 +++++++++--------- 1 file changed, 71 insertions(+), 74 deletions(-) diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/metrics/flink/CheckPoints.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/metrics/flink/CheckPoints.java index f34ac65b45..b25bbdbe41 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/metrics/flink/CheckPoints.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/metrics/flink/CheckPoints.java @@ -17,8 +17,8 @@ package org.apache.streampark.console.core.metrics.flink; -import org.apache.streampark.console.core.enums.CheckPointStatusEnum; -import org.apache.streampark.console.core.enums.CheckPointTypeEnum; +import org.apache.streampark.console.core.enums.CheckPointStatus; +import org.apache.streampark.console.core.enums.CheckPointType; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; @@ -29,100 +29,97 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.Objects; @Getter @Setter public class CheckPoints implements Serializable { - private List history; + private List history; - private Latest latest; + private Latest latest; - @JsonIgnore - public List getLatestCheckpoint() { - if (Objects.isNull(latest)) { - return Collections.emptyList(); - } - return latest.getLatestCheckpoint(); + @JsonIgnore + public List getLatestCheckpoint() { + if (latest == null) { + return Collections.emptyList(); } + return latest.getLatestCheckpoint(); + } - @Getter - @Setter - public static class CheckPoint implements Serializable { - - private Long id; - private String status; + @Getter + @Setter + public static class CheckPoint implements Serializable { + private Long id; + private String status; - @JsonProperty("external_path") - private String externalPath; + @JsonProperty("external_path") + private String externalPath; - @JsonProperty("is_savepoint") - private Boolean isSavepoint; + @JsonProperty("is_savepoint") + private Boolean isSavepoint; - @JsonProperty("latest_ack_timestamp") - private Long latestAckTimestamp; + @JsonProperty("latest_ack_timestamp") + private Long latestAckTimestamp; - @JsonProperty("checkpoint_type") - private String checkpointType; + @JsonProperty("checkpoint_type") + private String checkpointType; - @JsonProperty("trigger_timestamp") - private Long triggerTimestamp; + @JsonProperty("trigger_timestamp") + private Long triggerTimestamp; - @JsonProperty("state_size") - private Long stateSize; + @JsonProperty("state_size") + private Long stateSize; - @JsonProperty("end_to_end_duration") - private Long endToEndDuration; + @JsonProperty("end_to_end_duration") + private Long endToEndDuration; - private Boolean discarded; + private Boolean discarded; - public CheckPointStatusEnum getCheckPointStatus() { - return CheckPointStatusEnum.valueOf(this.status); - } + public CheckPointStatus getCheckPointStatus() { + return CheckPointStatus.valueOf(this.status); + } - public CheckPointTypeEnum getCheckPointType() { - if ("CHECKPOINT".equals(this.checkpointType)) { - return CheckPointTypeEnum.CHECKPOINT; - } - if ("SAVEPOINT".equals(this.checkpointType)) { - return CheckPointTypeEnum.SAVEPOINT; - } - return CheckPointTypeEnum.SYNC_SAVEPOINT; - } + public CheckPointType getCheckPointType() { + if ("CHECKPOINT".equals(this.checkpointType)) { + return CheckPointType.CHECKPOINT; + } else if ("SAVEPOINT".equals(this.checkpointType)) { + return CheckPointType.SAVEPOINT; + } + return CheckPointType.SYNC_SAVEPOINT; + } - public String getPath() { - return this.getExternalPath().replaceFirst("^hdfs:/[^/]", "hdfs:///"); - } + public String getPath() { + return this.getExternalPath().replaceFirst("^hdfs:/[^/]", "hdfs:///"); } + } + + @Getter + @Setter + public static class Latest implements Serializable { - @Getter - @Setter - public static class Latest implements Serializable { - - private CheckPoint completed; - private CheckPoint savepoint; - private CheckPoint failed; - - @JsonIgnore - public List getLatestCheckpoint() { - List checkPoints = new ArrayList<>(); - if (completed != null) { - checkPoints.add(completed); - } - if (savepoint != null) { - checkPoints.add(savepoint); - } - if (failed != null) { - if (completed == null) { - checkPoints.add(failed); - } else { - if (failed.getId() > completed.getId()) { - checkPoints.add(failed); - } - } - } - return checkPoints; + private CheckPoint completed; + private CheckPoint savepoint; + private CheckPoint failed; + + @JsonIgnore + public List getLatestCheckpoint() { + List checkPoints = new ArrayList<>(); + if (completed != null) { + checkPoints.add(completed); + } + if (savepoint != null) { + checkPoints.add(savepoint); + } + if (failed != null) { + if (completed == null) { + checkPoints.add(failed); + } else { + if (failed.getId() > completed.getId()) { + checkPoints.add(failed); + } } + } + return checkPoints; } + } }