Skip to content

Commit

Permalink
[INLONG-9373][Manager] Fix the issue of failed creation of pulsar nam…
Browse files Browse the repository at this point in the history
…espace (#9374)
  • Loading branch information
fuweng11 authored Dec 1, 2023
1 parent e2bd659 commit 94ce7a5
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public class SinkType extends StreamType {
public static final String HBASE = "HBASE";

@SupportSortType(sortType = SortType.SORT_STANDALONE)
public static final String ELASTICSEARCH = "ELASTICSEARCH";
public static final String ELASTICSEARCH = "ES";

@SupportSortType(sortType = SortType.SORT_FLINK)
public static final String HDFS = "HDFS";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,12 @@ public static Object doAround(ProceedingJoinPoint joinPoint, OperationLog operat
try {
JSONObject obj = (JSONObject) JSON.toJSON(arg);
for (String key : obj.keySet()) {
if (Objects.equals(key, INLONG_GROUP_ID) || Objects.equals(key, INLONG_STREAM_ID)) {
if (Objects.equals(key, INLONG_GROUP_ID)) {
groupId = obj.getString(key);
}
if (Objects.equals(key, INLONG_STREAM_ID)) {
streamId = obj.getString(key);
}
}
} catch (Exception ignored) {
log.debug("do nothing when exception");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,13 +108,13 @@ public void createNamespace(PulsarClusterInfo pulsarClusterInfo, InlongPulsarInf
Preconditions.expectNotBlank(namespace, ErrorCodeEnum.INVALID_PARAMETER,
"pulsar namespace cannot be empty during create namespace");

String namespaceName = tenant + "/" + namespace;
LOGGER.info("begin to create namespace={}", namespaceName);
String tenantNamespaceName = tenant + "/" + namespace;
LOGGER.info("begin to create namespace={}", tenantNamespaceName);
try {
// Check whether the namespace exists, and create it if it does not exist
boolean isExists = this.namespaceExists(pulsarClusterInfo, tenant, namespace);
if (isExists) {
LOGGER.warn("namespace={} already exists, skip to create", namespaceName);
LOGGER.warn("namespace={} already exists, skip to create", tenantNamespaceName);
return;
}

Expand Down Expand Up @@ -147,10 +147,10 @@ public void createNamespace(PulsarClusterInfo pulsarClusterInfo, InlongPulsarInf
pulsarInfo.getWriteQuorum(), pulsarInfo.getAckQuorum(), pulsarInfo.getMaxMarkDeleteRate());
policies.setPersistence(persistencePolicies);

PulsarUtils.createNamespace(restTemplate, pulsarClusterInfo, tenant, namespaceName, policies);
LOGGER.info("success to create namespace={}", namespaceName);
PulsarUtils.createNamespace(restTemplate, pulsarClusterInfo, tenant, namespace, policies);
LOGGER.info("success to create namespace={}", tenantNamespaceName);
} catch (Exception e) {
LOGGER.error("failed to create namespace=" + namespaceName, e);
LOGGER.error("failed to create namespace=" + tenantNamespaceName, e);
throw e;
}
}
Expand Down

0 comments on commit 94ce7a5

Please sign in to comment.