From 1a75c82bf3399162bb8adfeaee4ce7484bd12164 Mon Sep 17 00:00:00 2001 From: benjobs Date: Thu, 7 Mar 2024 13:05:10 +0800 Subject: [PATCH] [Imporve] changeStateEvnet equals improvements (#3603) * [Imporve] changeStateEvnet equals improvements * [Improve] trigger savepoint FE pop-window improvements --------- Co-authored-by: benjobs --- .../core/service/impl/SavePointServiceImpl.java | 14 ++++++++------ .../console/core/task/FlinkAppHttpWatcher.java | 6 ++---- .../src/views/flink/app/hooks/useSavepoint.tsx | 1 + 3 files changed, 11 insertions(+), 10 deletions(-) diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java index 687026e4a4..bee73b6b82 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java @@ -265,15 +265,17 @@ public String getSavePointPath(Application appParam) throws Exception { if (!config.isEmpty()) { savepointPath = config.get(CheckpointingOptions.SAVEPOINT_DIRECTORY.key()); } - } else { - // 3.2) At the yarn or k8s mode, then read the savepoint in flink-conf.yml in the bound - // flink - FlinkEnv flinkEnv = flinkEnvService.getById(application.getVersionId()); - savepointPath = - flinkEnv.convertFlinkYamlAsMap().get(CheckpointingOptions.SAVEPOINT_DIRECTORY.key()); } } + // 3.2) read the savepoint in flink-conf.yml in the bound + if (StringUtils.isBlank(savepointPath)) { + // flink + FlinkEnv flinkEnv = flinkEnvService.getById(application.getVersionId()); + savepointPath = + flinkEnv.convertFlinkYamlAsMap().get(CheckpointingOptions.SAVEPOINT_DIRECTORY.key()); + } + return savepointPath; } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java index 08a7f7902c..1c721b455f 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java @@ -845,12 +845,10 @@ public boolean equals(Object object) { return false; } StateChangeEvent that = (StateChangeEvent) object; - if (optionState != that.optionState) { - return false; - } return Objects.equals(id, that.id) && Objects.equals(jobId, that.jobId) - && appState == that.appState + && Objects.equals(appState, that.appState) + && Objects.equals(optionState, that.optionState) && Objects.equals(jobManagerUrl, that.jobManagerUrl); } diff --git a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useSavepoint.tsx b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useSavepoint.tsx index ea0ddb3262..f84e1ff624 100644 --- a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useSavepoint.tsx +++ b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useSavepoint.tsx @@ -106,6 +106,7 @@ export const useSavepoint = (updateOption: Fn) => { const { data } = await fetchCheckSavepointPath({ id: appId.value }); if (data.data) { await handleSavepointAction(savepointReq); + resolve(true); } else { await createErrorSwal(data.message); }