From 94ce7a5350d8f13c6dcea26ee3bea8686f50b202 Mon Sep 17 00:00:00 2001 From: fuweng11 <76141879+fuweng11@users.noreply.github.com> Date: Fri, 1 Dec 2023 12:35:34 +0800 Subject: [PATCH] [INLONG-9373][Manager] Fix the issue of failed creation of pulsar namespace (#9374) --- .../inlong/manager/common/consts/SinkType.java | 2 +- .../service/operationlog/OperationLogRecorder.java | 5 ++++- .../resource/queue/pulsar/PulsarOperator.java | 12 ++++++------ 3 files changed, 11 insertions(+), 8 deletions(-) diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java index 1d53c71fc2f..61694269322 100644 --- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java +++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java @@ -38,7 +38,7 @@ public class SinkType extends StreamType { public static final String HBASE = "HBASE"; @SupportSortType(sortType = SortType.SORT_STANDALONE) - public static final String ELASTICSEARCH = "ELASTICSEARCH"; + public static final String ELASTICSEARCH = "ES"; @SupportSortType(sortType = SortType.SORT_FLINK) public static final String HDFS = "HDFS"; diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/operationlog/OperationLogRecorder.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/operationlog/OperationLogRecorder.java index b7dc2a04438..99a1e9472aa 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/operationlog/OperationLogRecorder.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/operationlog/OperationLogRecorder.java @@ -77,9 +77,12 @@ public static Object doAround(ProceedingJoinPoint joinPoint, OperationLog operat try { JSONObject obj = (JSONObject) JSON.toJSON(arg); for (String key : obj.keySet()) { - if (Objects.equals(key, INLONG_GROUP_ID) || Objects.equals(key, INLONG_STREAM_ID)) { + if (Objects.equals(key, INLONG_GROUP_ID)) { groupId = obj.getString(key); } + if (Objects.equals(key, INLONG_STREAM_ID)) { + streamId = obj.getString(key); + } } } catch (Exception ignored) { log.debug("do nothing when exception"); diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java index 7c55f65e43e..5ff2c9cecde 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java @@ -108,13 +108,13 @@ public void createNamespace(PulsarClusterInfo pulsarClusterInfo, InlongPulsarInf Preconditions.expectNotBlank(namespace, ErrorCodeEnum.INVALID_PARAMETER, "pulsar namespace cannot be empty during create namespace"); - String namespaceName = tenant + "/" + namespace; - LOGGER.info("begin to create namespace={}", namespaceName); + String tenantNamespaceName = tenant + "/" + namespace; + LOGGER.info("begin to create namespace={}", tenantNamespaceName); try { // Check whether the namespace exists, and create it if it does not exist boolean isExists = this.namespaceExists(pulsarClusterInfo, tenant, namespace); if (isExists) { - LOGGER.warn("namespace={} already exists, skip to create", namespaceName); + LOGGER.warn("namespace={} already exists, skip to create", tenantNamespaceName); return; } @@ -147,10 +147,10 @@ public void createNamespace(PulsarClusterInfo pulsarClusterInfo, InlongPulsarInf pulsarInfo.getWriteQuorum(), pulsarInfo.getAckQuorum(), pulsarInfo.getMaxMarkDeleteRate()); policies.setPersistence(persistencePolicies); - PulsarUtils.createNamespace(restTemplate, pulsarClusterInfo, tenant, namespaceName, policies); - LOGGER.info("success to create namespace={}", namespaceName); + PulsarUtils.createNamespace(restTemplate, pulsarClusterInfo, tenant, namespace, policies); + LOGGER.info("success to create namespace={}", tenantNamespaceName); } catch (Exception e) { - LOGGER.error("failed to create namespace=" + namespaceName, e); + LOGGER.error("failed to create namespace=" + tenantNamespaceName, e); throw e; } }