diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/AuditSourceType.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/AuditSourceType.java
deleted file mode 100644
index 48ed33be28..0000000000
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/AuditSourceType.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.common.consts;
-
-/**
- * Audit Source Type
- */
-public enum AuditSourceType {
- MYSQL, CLICKHOUSE, ELASTICSEARCH
-}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/AuditQuerySource.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/AuditQuerySource.java
index bcace34313..a1ffe17f6a 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/AuditQuerySource.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/AuditQuerySource.java
@@ -25,12 +25,5 @@ public enum AuditQuerySource {
* MYSQL source of query
*/
MYSQL,
- /**
- * ELASTICSEARCH source of query
- */
- ELASTICSEARCH,
- /**
- * CLICKHOUSE source of query
- */
- CLICKHOUSE;
+
}
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/AuditSourceEntity.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/AuditSourceEntity.java
deleted file mode 100644
index 752e790ce6..0000000000
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/AuditSourceEntity.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.dao.entity;
-
-import lombok.Data;
-
-import java.io.Serializable;
-import java.util.Date;
-
-/**
- * Audit source entity, including id, name, type, etc.
- */
-@Data
-public class AuditSourceEntity implements Serializable {
-
- private Integer id;
- private String name;
- private String type;
- private String url;
- private Integer enableAuth;
- private String username;
- private String token;
- private Integer status;
- private Integer isDeleted;
- private String creator;
- private String modifier;
- private Date createTime;
- private Date modifyTime;
- private Integer version;
-
-}
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/AuditSourceEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/AuditSourceEntityMapper.java
deleted file mode 100644
index d496f8570f..0000000000
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/AuditSourceEntityMapper.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.dao.mapper;
-
-import org.apache.inlong.manager.dao.entity.AuditSourceEntity;
-
-import org.apache.ibatis.annotations.Param;
-import org.springframework.stereotype.Repository;
-
-/**
- * Audit source mapper
- */
-@Repository
-public interface AuditSourceEntityMapper {
-
- int insert(AuditSourceEntity record);
-
- AuditSourceEntity selectOnlineSource();
-
- void offlineSourceByUrl(@Param("offlineUrl") String offlineUrl);
-
-}
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/AuditSourceEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/AuditSourceEntityMapper.xml
deleted file mode 100644
index da7d111782..0000000000
--- a/inlong-manager/manager-dao/src/main/resources/mappers/AuditSourceEntityMapper.xml
+++ /dev/null
@@ -1,71 +0,0 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- id, name, type, url, enable_auth, username, token, status, is_deleted,
- creator, modifier, create_time, modify_time, version
-
-
-
- insert into audit_source (id, name, type, url,
- enable_auth, username, token,
- status, creator, modifier)
- values (#{id, jdbcType=INTEGER}, #{name, jdbcType=VARCHAR}, #{type,jdbcType=VARCHAR}, #{url, jdbcType=VARCHAR},
- #{enableAuth, jdbcType=TINYINT}, #{username, jdbcType=VARCHAR}, #{token, jdbcType=VARCHAR},
- #{status, jdbcType=INTEGER}, #{creator, jdbcType=VARCHAR}, #{modifier, jdbcType=VARCHAR})
-
-
-
-
-
- update audit_source
- set status = 0
- where url = #{offlineUrl, jdbcType=VARCHAR}
- and is_deleted = 0
-
-
-
\ No newline at end of file
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/audit/AuditSourceRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/audit/AuditSourceRequest.java
deleted file mode 100644
index 33b6cd5bf4..0000000000
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/audit/AuditSourceRequest.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.pojo.audit;
-
-import io.swagger.annotations.ApiModel;
-import io.swagger.annotations.ApiModelProperty;
-import lombok.AllArgsConstructor;
-import lombok.Builder;
-import lombok.Data;
-import lombok.NoArgsConstructor;
-
-import javax.validation.constraints.NotBlank;
-import javax.validation.constraints.Pattern;
-
-/**
- * Audit source request
- */
-@Data
-@Builder
-@NoArgsConstructor
-@AllArgsConstructor
-@ApiModel("Audit source request")
-public class AuditSourceRequest {
-
- @NotBlank
- @ApiModelProperty(value = "Audit source name")
- private String name;
-
- @NotBlank
- @ApiModelProperty(value = "Audit source type, including: MYSQL, CLICKHOUSE, ELASTICSEARCH", required = true)
- private String type;
-
- @NotBlank
- @Pattern(regexp = "^(jdbc:(mysql|clickhouse)://[\\w.]+(:\\d+)?/[\\w]+(\\?.*)?|http://[\\w.]+(:\\d+)?(/[\\w]+)+(/\\d+(-\\d+)?(,\\d+(-\\d+)?)*)?)", message = "only supports MYSQL, CLICKHOUSE, ELASTICSEARCH url")
- @ApiModelProperty(value = "Audit source URL, for MYSQL or CLICKHOUSE, is jdbcUrl, and for ELASTICSEARCH is the access URL with hostname:port", required = true)
- private String url;
-
- @ApiModelProperty(value = "Offline the url if not null")
- private String offlineUrl;
-
- @ApiModelProperty(value = "Enable auth or not, 0: disable, 1: enable")
- private Integer enableAuth;
-
- @ApiModelProperty(value = "Audit source username, needed if enableAuth is 1")
- private String username;
-
- @ApiModelProperty(value = "Audit source token, needed if enableAuth is 1")
- private String token;
-
- @ApiModelProperty(value = "Version number")
- private Integer version;
-
-}
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/audit/AuditSourceResponse.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/audit/AuditSourceResponse.java
deleted file mode 100644
index 8a3ae2a148..0000000000
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/audit/AuditSourceResponse.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.pojo.audit;
-
-import com.fasterxml.jackson.annotation.JsonFormat;
-import io.swagger.annotations.ApiModel;
-import io.swagger.annotations.ApiModelProperty;
-import lombok.Data;
-
-import java.util.Date;
-
-/**
- * Audit source response
- */
-@Data
-@ApiModel("Audit source response")
-public class AuditSourceResponse {
-
- @ApiModelProperty(value = "Primary key")
- private Integer id;
-
- @ApiModelProperty(value = "Audit source name")
- private String name;
-
- @ApiModelProperty(value = "Audit source type, including: MYSQL, CLICKHOUSE, ELASTICSEARCH", required = true)
- private String type;
-
- @ApiModelProperty(value = "Audit source URL, for MYSQL or CLICKHOUSE, is jdbcUrl, and for ELASTICSEARCH is the access URL with hostname:port", required = true)
- private String url;
-
- @ApiModelProperty(value = "Offline the url if not null")
- private String offlineUrl;
-
- @ApiModelProperty(value = "Enable auth or not, 0: disable, 1: enable")
- private Integer enableAuth;
-
- @ApiModelProperty(value = "Audit source username, needed if enableAuth is 1")
- private String username;
-
- @ApiModelProperty(value = "Audit source token, needed if enableAuth is 1")
- private String token;
-
- @ApiModelProperty(value = "Creator")
- private String creator;
-
- @ApiModelProperty(value = "Modifier")
- private String modifier;
-
- @JsonFormat(pattern = "yyyy-MM-dd'T'HH:mm:ss.SSSZ", timezone = "GMT+8")
- private Date createTime;
-
- @JsonFormat(pattern = "yyyy-MM-dd'T'HH:mm:ss.SSSZ", timezone = "GMT+8")
- private Date modifyTime;
-
- @ApiModelProperty(value = "Version number")
- private Integer version;
-
-}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/audit/AbstractAuditSourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/audit/AbstractAuditSourceOperator.java
deleted file mode 100644
index 13b958ae99..0000000000
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/audit/AbstractAuditSourceOperator.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.service.audit;
-
-/**
- * Default of Audit Source Operator.
- */
-public abstract class AbstractAuditSourceOperator implements InlongAuditSourceOperator {
-
- @Override
- public Boolean accept(String type) {
- return getType().equals(type.toUpperCase());
- }
-}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/audit/AuditSourceClickhouseOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/audit/AuditSourceClickhouseOperator.java
deleted file mode 100644
index a015f16954..0000000000
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/audit/AuditSourceClickhouseOperator.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.service.audit;
-
-import org.apache.inlong.manager.common.consts.AuditSourceType;
-import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
-import org.apache.inlong.manager.common.exceptions.BusinessException;
-
-import org.apache.commons.lang3.StringUtils;
-import org.springframework.stereotype.Service;
-
-/**
- * Audit source clickhouse operator.
- */
-@Service
-public class AuditSourceClickhouseOperator extends AbstractAuditSourceOperator {
-
- private static final String CLICKHOUSE_URL_PREFIX = "jdbc:clickhouse://";
-
- @Override
- public String getType() {
- return AuditSourceType.CLICKHOUSE.name();
- }
-
- @Override
- public String convertTo(String url) {
- if (StringUtils.isNotBlank(url) && url.startsWith(CLICKHOUSE_URL_PREFIX)) {
- return url;
- }
-
- throw new BusinessException(String.format(ErrorCodeEnum.AUDIT_SOURCE_URL_NOT_SUPPORTED.getMessage(), url));
- }
-}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/audit/AuditSourceElasticsearchOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/audit/AuditSourceElasticsearchOperator.java
deleted file mode 100644
index 1c2bba0a67..0000000000
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/audit/AuditSourceElasticsearchOperator.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.service.audit;
-
-import org.apache.inlong.manager.common.consts.AuditSourceType;
-import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
-import org.apache.inlong.manager.common.exceptions.BusinessException;
-
-import org.apache.commons.lang3.StringUtils;
-import org.springframework.stereotype.Service;
-
-/**
- * Audit source elasticsearch operator.
- */
-@Service
-public class AuditSourceElasticsearchOperator extends AbstractAuditSourceOperator {
-
- private static final String ES_HTTP_HOST_PREFIX = "http://";
- private static final String ES_HTTPS_HOST_PREFIX = "https://";
-
- @Override
- public String getType() {
- return AuditSourceType.ELASTICSEARCH.name();
- }
-
- @Override
- public String convertTo(String url) {
- if (StringUtils.isNotBlank(url)
- && (url.startsWith(ES_HTTP_HOST_PREFIX) || url.startsWith(ES_HTTPS_HOST_PREFIX))) {
- return url;
- }
-
- throw new BusinessException(String.format(ErrorCodeEnum.AUDIT_SOURCE_URL_NOT_SUPPORTED.getMessage(), url));
- }
-}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/audit/AuditSourceMysqlOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/audit/AuditSourceMysqlOperator.java
deleted file mode 100644
index 448345e0ff..0000000000
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/audit/AuditSourceMysqlOperator.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.service.audit;
-
-import org.apache.inlong.manager.common.consts.AuditSourceType;
-import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
-import org.apache.inlong.manager.common.exceptions.BusinessException;
-import org.apache.inlong.manager.pojo.util.MySQLSensitiveUrlUtils;
-
-import org.apache.commons.lang3.StringUtils;
-import org.springframework.stereotype.Service;
-
-/**
- * Audit source mysql operator.
- */
-@Service
-public class AuditSourceMysqlOperator extends AbstractAuditSourceOperator {
-
- private static final String MYSQL_JDBC_PREFIX = "jdbc:mysql://";
-
- @Override
- public String getType() {
- return AuditSourceType.MYSQL.name();
- }
-
- @Override
- public String convertTo(String url) {
- if (StringUtils.isNotBlank(url) && url.startsWith(MYSQL_JDBC_PREFIX)) {
- return MySQLSensitiveUrlUtils.filterSensitive(url);
- }
-
- throw new BusinessException(String.format(ErrorCodeEnum.AUDIT_SOURCE_URL_NOT_SUPPORTED.getMessage(), url));
- }
-}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/audit/InlongAuditSourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/audit/InlongAuditSourceOperator.java
deleted file mode 100644
index faafb977f1..0000000000
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/audit/InlongAuditSourceOperator.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.service.audit;
-
-/**
- * Interface of the inlong audit source operator.
- */
-public interface InlongAuditSourceOperator {
-
- /**
- * Determines whether the current instance matches the specified type.
- */
- Boolean accept(String type);
-
- /**
- * Get the Audit Source type.
- *
- * @return audit source type string
- */
- String getType();
-
- /**
- * Convert the URL.
- *
- * @param url audit source url
- * @return converted url string
- */
- String convertTo(String url);
-}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/audit/InlongAuditSourceOperatorFactory.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/audit/InlongAuditSourceOperatorFactory.java
deleted file mode 100644
index e38b75635c..0000000000
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/audit/InlongAuditSourceOperatorFactory.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.service.audit;
-
-import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
-import org.apache.inlong.manager.common.exceptions.BusinessException;
-
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Service;
-
-import java.util.List;
-
-/**
- * Factory for {@link InlongAuditSourceOperator}.
- */
-@Service
-public class InlongAuditSourceOperatorFactory {
-
- @Autowired
- private List auditSourceOperatorList;
-
- /**
- * Get an inlong Audit Source operator instance via the given type
- */
- public InlongAuditSourceOperator getInstance(String type) {
- return auditSourceOperatorList.stream()
- .filter(inst -> inst.accept(type))
- .findFirst()
- .orElseThrow(() -> new BusinessException(
- String.format(ErrorCodeEnum.AUDIT_SOURCE_TYPE_NOT_SUPPORTED.getMessage(), type)));
- }
-}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/AuditService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/AuditService.java
index d60d4bebb3..77fdfd87fa 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/AuditService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/AuditService.java
@@ -20,8 +20,6 @@
import org.apache.inlong.common.enums.IndicatorType;
import org.apache.inlong.manager.pojo.audit.AuditBaseResponse;
import org.apache.inlong.manager.pojo.audit.AuditRequest;
-import org.apache.inlong.manager.pojo.audit.AuditSourceRequest;
-import org.apache.inlong.manager.pojo.audit.AuditSourceResponse;
import org.apache.inlong.manager.pojo.audit.AuditVO;
import java.util.List;
@@ -65,21 +63,4 @@ public interface AuditService {
*/
Boolean refreshBaseItemCache();
- /**
- * Offline the old audit source through url, and insert and online a new audit source.
- * If the new url already exists in the table, the insert operation will become an update operation.
- *
- * @param operator current operator
- * @param request audit source request
- * @return audit source id after updating or saving
- */
- Integer updateAuditSource(AuditSourceRequest request, String operator);
-
- /**
- * Get audit source that is online.
- *
- * @return audit source response.
- */
- AuditSourceResponse getAuditSource();
-
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java
index 51381ac373..8230771b76 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java
@@ -28,36 +28,27 @@
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.AuditBaseEntity;
-import org.apache.inlong.manager.dao.entity.AuditSourceEntity;
import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
import org.apache.inlong.manager.dao.mapper.AuditBaseEntityMapper;
import org.apache.inlong.manager.dao.mapper.AuditEntityMapper;
-import org.apache.inlong.manager.dao.mapper.AuditSourceEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamSinkEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper;
import org.apache.inlong.manager.pojo.audit.AuditBaseResponse;
import org.apache.inlong.manager.pojo.audit.AuditInfo;
import org.apache.inlong.manager.pojo.audit.AuditRequest;
-import org.apache.inlong.manager.pojo.audit.AuditSourceRequest;
-import org.apache.inlong.manager.pojo.audit.AuditSourceResponse;
import org.apache.inlong.manager.pojo.audit.AuditVO;
import org.apache.inlong.manager.pojo.user.LoginUserUtils;
import org.apache.inlong.manager.pojo.user.UserRoleCode;
import org.apache.inlong.manager.service.audit.AuditRunnable;
-import org.apache.inlong.manager.service.audit.InlongAuditSourceOperator;
-import org.apache.inlong.manager.service.audit.InlongAuditSourceOperatorFactory;
import org.apache.inlong.manager.service.core.AuditService;
-import org.apache.inlong.manager.service.resource.sink.ck.ClickHouseConfig;
-import org.apache.inlong.manager.service.resource.sink.es.ElasticsearchApi;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
-import org.apache.ibatis.jdbc.SQL;
import org.joda.time.DateTime;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
@@ -72,9 +63,6 @@
import javax.annotation.PostConstruct;
import java.math.BigDecimal;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
@@ -107,32 +95,10 @@ public class AuditServiceImpl implements AuditService {
private static final DateTimeFormatter SECOND_DATE_FORMATTER = DateTimeFormat.forPattern(SECOND_FORMAT);
private static final DateTimeFormatter HOUR_DATE_FORMATTER = DateTimeFormat.forPattern(HOUR_FORMAT);
private static final DateTimeFormatter DAY_DATE_FORMATTER = DateTimeFormat.forPattern(DAY_FORMAT);
-
- private static final double DEFAULT_BOOST = 1.0;
- private static final boolean ADJUST_PURE_NEGATIVE = true;
- private static final int QUERY_FROM = 0;
- private static final int QUERY_SIZE = 0;
- private static final String SORT_ORDER = "ASC";
- private static final String TERM_FILED = "log_ts";
- private static final String AGGREGATIONS_COUNT = "count";
- private static final String AGGREGATIONS_DELAY = "delay";
- private static final String AGGREGATIONS = "aggregations";
- private static final String BUCKETS = "buckets";
- private static final String KEY = "key";
- private static final String VALUE = "value";
- private static final String INLONG_GROUP_ID = "inlong_group_id";
- private static final String INLONG_STREAM_ID = "inlong_stream_id";
- private static final String COUNT = "count";
- private static final String DELAY = "delay";
- private static final String TERMS = "terms";
-
- private ScheduledExecutorService executor = Executors.newScheduledThreadPool(10);
-
// key 1: type of audit, like pulsar, hive, key 2: indicator type, value : entity of audit base item
private final Map> auditIndicatorMap = new ConcurrentHashMap<>();
-
private final Map auditItemMap = new ConcurrentHashMap<>();
-
+ private ScheduledExecutorService executor = Executors.newScheduledThreadPool(10);
// defaults to return all audit ids, can be overwritten in properties file
// see audit id definitions: https://inlong.apache.org/docs/modules/audit/overview#audit-id
@Value("#{'${audit.admin.ids:3,4,5,6}'.split(',')}")
@@ -150,20 +116,12 @@ public class AuditServiceImpl implements AuditService {
@Autowired
private AuditEntityMapper auditEntityMapper;
@Autowired
- private ElasticsearchApi elasticsearchApi;
- @Autowired
private StreamSinkEntityMapper sinkEntityMapper;
@Autowired
private StreamSourceEntityMapper sourceEntityMapper;
@Autowired
- private ClickHouseConfig config;
- @Autowired
- private AuditSourceEntityMapper auditSourceMapper;
- @Autowired
private InlongGroupEntityMapper inlongGroupMapper;
@Autowired
- private InlongAuditSourceOperatorFactory auditSourceOperatorFactory;
- @Autowired
private RestTemplate restTemplate;
@PostConstruct
@@ -196,44 +154,6 @@ public Boolean refreshBaseItemCache() {
return true;
}
- @Override
- public Integer updateAuditSource(AuditSourceRequest request, String operator) {
- InlongAuditSourceOperator auditSourceOperator = auditSourceOperatorFactory.getInstance(request.getType());
- request.setUrl(auditSourceOperator.convertTo(request.getUrl()));
-
- String offlineUrl = request.getOfflineUrl();
- if (StringUtils.isNotBlank(offlineUrl)) {
- auditSourceMapper.offlineSourceByUrl(offlineUrl);
- LOGGER.info("success offline the audit source with url: {}", offlineUrl);
- }
-
- // TODO firstly we should check to see if it exists, updated if it exists, and created if it doesn't exist
- AuditSourceEntity entity = CommonBeanUtils.copyProperties(request, AuditSourceEntity::new);
- entity.setStatus(InlongConstants.DEFAULT_ENABLE_VALUE);
- entity.setCreator(operator);
- entity.setModifier(operator);
- auditSourceMapper.insert(entity);
- Integer id = entity.getId();
- LOGGER.info("success to insert audit source with id={}", id);
-
- // TODO we should select the config that needs to be updated according to the source type
- config.updateRuntimeConfig();
- LOGGER.info("success to update audit source with id={}", id);
-
- return id;
- }
-
- @Override
- public AuditSourceResponse getAuditSource() {
- AuditSourceEntity entity = auditSourceMapper.selectOnlineSource();
- if (entity == null) {
- throw new BusinessException(ErrorCodeEnum.RECORD_NOT_FOUND);
- }
-
- LOGGER.debug("success to get audit source, id={}", entity.getId());
- return CommonBeanUtils.copyProperties(entity, AuditSourceResponse::new);
- }
-
@Override
public String getAuditId(String type, IndicatorType indicatorType) {
if (StringUtils.isBlank(type)) {
@@ -329,12 +249,12 @@ public List listByCondition(AuditRequest request) throws Exception {
return vo;
}).collect(Collectors.toList());
result.add(new AuditVO(auditId, auditName, auditSet, auditIdMap.getOrDefault(auditId, null)));
- } else if (AuditQuerySource.CLICKHOUSE == querySource) {
+ } else {
this.executor.execute(new AuditRunnable(request, auditId, auditName, result, latch, restTemplate,
auditQueryUrl, auditIdMap, false));
}
}
- if (AuditQuerySource.CLICKHOUSE == querySource) {
+ if (AuditQuerySource.MYSQL != querySource) {
latch.await(30, TimeUnit.SECONDS);
} else {
result = aggregateByTimeDim(result, request.getTimeStaticsDim());
@@ -372,12 +292,12 @@ public List listAll(AuditRequest request) throws Exception {
return vo;
}).collect(Collectors.toList());
result.add(new AuditVO(auditId, auditName, auditSet, null));
- } else if (AuditQuerySource.CLICKHOUSE == querySource) {
+ } else {
this.executor.execute(new AuditRunnable(request, auditId, auditName, result, latch, restTemplate,
auditQueryUrl, null, true));
}
}
- if (AuditQuerySource.CLICKHOUSE == querySource) {
+ if (AuditQuerySource.MYSQL != querySource) {
latch.await(30, TimeUnit.SECONDS);
}
return result;
@@ -421,86 +341,6 @@ private List getAuditIds(String groupId, String streamId, String sourceN
return new ArrayList<>(auditSet);
}
- /**
- * Get clickhouse Statement
- *
- * @param groupId The groupId of inlong
- * @param streamId The streamId of inlong
- * @param auditId The auditId of request
- * @param startDate The start datetime of request
- * @param endDate The en datetime of request
- * @return The clickhouse Statement
- */
- private PreparedStatement getAuditCkStatementGroupByLogTs(Connection connection, String groupId, String streamId,
- String ip,
- String auditId, String startDate, String endDate) throws SQLException {
- String start = DAY_DATE_FORMATTER.parseDateTime(startDate).toString(SECOND_FORMAT);
- String end = DAY_DATE_FORMATTER.parseDateTime(endDate).plusDays(1).toString(SECOND_FORMAT);
- if (StringUtils.isNotBlank(ip)) {
- return getAuditCkStatementByIp(connection, auditId, ip, startDate, endDate);
- }
- // Query results are duplicated according to all fields.
- String subQuery = new SQL()
- .SELECT_DISTINCT("ip", "docker_id", "thread_id", "sdk_ts", "packet_id", "log_ts", "inlong_group_id",
- "inlong_stream_id", "audit_id", "count", "size", "delay")
- .FROM("audit_data")
- .WHERE("inlong_group_id = ?")
- .WHERE("inlong_stream_id = ?")
- .WHERE("audit_id = ?")
- .WHERE("log_ts >= ?")
- .WHERE("log_ts < ?")
- .toString();
-
- String sql = new SQL()
- .SELECT("inlong_group_id", "inlong_stream_id", "log_ts", "sum(count) as total",
- "sum(delay) as total_delay", "sum(size) as total_size")
- .FROM("(" + subQuery + ") as sub")
- .GROUP_BY("log_ts", "inlong_group_id", "inlong_stream_id")
- .ORDER_BY("log_ts")
- .toString();
-
- PreparedStatement statement = connection.prepareStatement(sql);
- statement.setString(1, groupId);
- statement.setString(2, streamId);
- statement.setString(3, auditId);
- statement.setString(4, start);
- statement.setString(5, end);
- return statement;
- }
-
- private PreparedStatement getAuditCkStatementGroupByIp(Connection connection, String groupId,
- String streamId, String ip, String auditId, String startDate, String endDate) throws SQLException {
-
- if (StringUtils.isNotBlank(ip)) {
- return getAuditCkStatementByIpGroupByIp(connection, auditId, ip, startDate, endDate);
- }
- // Query results are duplicated according to all fields.
- String subQuery = new SQL()
- .SELECT_DISTINCT("ip", "docker_id", "thread_id", "sdk_ts", "packet_id", "log_ts", "inlong_group_id",
- "inlong_stream_id", "audit_id", "count", "size", "delay")
- .FROM("audit_data")
- .WHERE("inlong_group_id = ?")
- .WHERE("inlong_stream_id = ?")
- .WHERE("audit_id = ?")
- .WHERE("log_ts >= ?")
- .WHERE("log_ts < ?")
- .toString();
-
- String sql = new SQL()
- .SELECT("inlong_group_id", "inlong_stream_id", "sum(count) as total", "ip",
- "sum(delay) as total_delay", "sum(size) as total_size")
- .FROM("(" + subQuery + ") as sub")
- .GROUP_BY("inlong_group_id", "inlong_stream_id", "ip")
- .toString();
- PreparedStatement statement = connection.prepareStatement(sql);
- statement.setString(1, groupId);
- statement.setString(2, streamId);
- statement.setString(3, auditId);
- statement.setString(4, startDate);
- statement.setString(5, endDate);
- return statement;
- }
-
/**
* Aggregate by time dim
*/
@@ -574,60 +414,4 @@ private String formatLogTime(String dateString, String format) {
return formatDateString;
}
- private PreparedStatement getAuditCkStatementByIp(Connection connection, String auditId, String ip,
- String startDate, String endDate) throws SQLException {
- String start = DAY_DATE_FORMATTER.parseDateTime(startDate).toString(SECOND_FORMAT);
- String end = DAY_DATE_FORMATTER.parseDateTime(endDate).plusDays(1).toString(SECOND_FORMAT);
- String subQuery = new SQL()
- .SELECT_DISTINCT("ip", "docker_id", "thread_id", "sdk_ts", "packet_id", "log_ts", "inlong_group_id",
- "inlong_stream_id", "audit_id", "count", "size", "delay")
- .FROM("audit_data")
- .WHERE("ip = ?")
- .WHERE("audit_id = ?")
- .WHERE("log_ts >= ?")
- .WHERE("log_ts < ?")
- .toString();
-
- String sql = new SQL()
- .SELECT("inlong_group_id", "inlong_stream_id", "log_ts", "sum(count) as total",
- "sum(delay) as total_delay", "sum(size) as total_size")
- .FROM("(" + subQuery + ") as sub")
- .GROUP_BY("log_ts", "inlong_group_id", "inlong_stream_id")
- .ORDER_BY("log_ts")
- .toString();
-
- PreparedStatement statement = connection.prepareStatement(sql);
- statement.setString(1, ip);
- statement.setString(2, auditId);
- statement.setString(3, start);
- statement.setString(4, end);
- return statement;
- }
-
- private PreparedStatement getAuditCkStatementByIpGroupByIp(Connection connection, String auditId, String ip,
- String startDate, String endDate) throws SQLException {
- String subQuery = new SQL()
- .SELECT_DISTINCT("ip", "docker_id", "thread_id", "sdk_ts", "packet_id", "log_ts", "inlong_group_id",
- "inlong_stream_id", "audit_id", "count", "size", "delay")
- .FROM("audit_data")
- .WHERE("ip = ?")
- .WHERE("audit_id = ?")
- .WHERE("log_ts >= ?")
- .WHERE("log_ts < ?")
- .toString();
-
- String sql = new SQL()
- .SELECT("inlong_group_id", "inlong_stream_id", "ip", "sum(count) as total",
- "sum(delay) as total_delay", "sum(size) as total_size")
- .FROM("(" + subQuery + ") as sub")
- .GROUP_BY("inlong_group_id", "inlong_stream_id", "ip")
- .toString();
- PreparedStatement statement = connection.prepareStatement(sql);
- statement.setString(1, ip);
- statement.setString(2, auditId);
- statement.setString(3, startDate);
- statement.setString(4, endDate);
- return statement;
- }
-
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/ck/ClickHouseConfig.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/ck/ClickHouseConfig.java
deleted file mode 100644
index 156c4bac3c..0000000000
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/ck/ClickHouseConfig.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.service.resource.sink.ck;
-
-import org.apache.inlong.manager.dao.entity.AuditSourceEntity;
-import org.apache.inlong.manager.dao.mapper.AuditSourceEntityMapper;
-
-import com.alibaba.druid.pool.DruidDataSourceFactory;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang.StringUtils;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Service;
-
-import javax.sql.DataSource;
-
-import java.sql.Connection;
-import java.util.Objects;
-import java.util.Properties;
-
-/**
- * ClickHouse config, including url, user, etc.
- */
-@Slf4j
-@Service
-public class ClickHouseConfig {
-
- @Autowired
- private AuditSourceEntityMapper auditSourceMapper;
- private static volatile DataSource source;
- private static volatile String currentJdbcUrl = null;
- private static volatile String currentUsername = null;
- private static volatile String currentPassword = null;
-
- /**
- * Update the runtime config of ClickHouse connection.
- */
- public synchronized void updateRuntimeConfig() {
- try {
- AuditSourceEntity auditSource = auditSourceMapper.selectOnlineSource();
- String jdbcUrl = auditSource.getUrl();
- String username = auditSource.getUsername();
- String password = StringUtils.isBlank(auditSource.getToken()) ? "" : auditSource.getToken();
-
- boolean changed = !Objects.equals(currentJdbcUrl, jdbcUrl)
- || !Objects.equals(currentUsername, username)
- || !Objects.equals(currentPassword, password);
- if (changed) {
- currentJdbcUrl = jdbcUrl;
- currentUsername = username;
- currentPassword = password;
-
- Properties pros = new Properties();
- pros.put("url", jdbcUrl);
- if (StringUtils.isNotBlank(username)) {
- pros.put("username", username);
- }
- if (StringUtils.isNotBlank(password)) {
- pros.put("password", password);
- }
-
- source = DruidDataSourceFactory.createDataSource(pros);
- log.info("success to create connection to {}", jdbcUrl);
- }
- } catch (Exception e) {
- log.error("failed to read click house audit source: ", e);
- }
- }
-
- /**
- * Get ClickHouse connection from data source
- */
- public Connection getCkConnection() throws Exception {
- log.debug("start to get connection for CLICKHOUSE");
- int retry = 0;
- while (source == null && retry < 3) {
- updateRuntimeConfig();
- retry += 1;
- }
-
- if (source == null) {
- log.warn("jdbc source is null for CLICKHOUSE");
- return null;
- }
-
- Connection connection = source.getConnection();
- log.info("success to get connection for CLICKHOUSE");
- return connection;
- }
-}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/es/ElasticsearchConfig.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/es/ElasticsearchConfig.java
index 6f4580897a..d59133f0f8 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/es/ElasticsearchConfig.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/es/ElasticsearchConfig.java
@@ -25,7 +25,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.web.client.RestTemplate;
@@ -48,13 +47,9 @@ public class ElasticsearchConfig {
private RestTemplate restTemplate;
private List httpHosts;
- @Value("${es.index.search.hostname}")
private String hosts;
- @Value("${es.auth.enable}")
private Boolean authEnable = false;
- @Value("${es.auth.user}")
private String username;
- @Value("${es.auth.password}")
private String password;
/**
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AuditServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AuditServiceTest.java
deleted file mode 100644
index 28b8636969..0000000000
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AuditServiceTest.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.service.core.impl;
-
-import org.apache.inlong.manager.pojo.audit.AuditInfo;
-import org.apache.inlong.manager.pojo.audit.AuditRequest;
-import org.apache.inlong.manager.pojo.audit.AuditSourceRequest;
-import org.apache.inlong.manager.pojo.audit.AuditVO;
-import org.apache.inlong.manager.service.ServiceBaseTest;
-import org.apache.inlong.manager.service.core.AuditService;
-
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
-import org.springframework.beans.factory.annotation.Autowired;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-/**
- * Audit service test for {@link AuditService}
- */
-class AuditServiceTest extends ServiceBaseTest {
-
- @Autowired
- private AuditService auditService;
-
- @Test
- void testQueryFromMySQL() {
- AuditRequest request = new AuditRequest();
- request.setAuditIds(Arrays.asList("3", "4"));
- request.setInlongGroupId("g1");
- request.setInlongStreamId("s1");
- request.setStartDate("2022-01-01");
- request.setEndDate("2022-01-01");
- List result = new ArrayList<>();
- AuditVO auditVO = new AuditVO();
- auditVO.setAuditId("3");
- auditVO.setAuditSet(Arrays.asList(new AuditInfo("2022-01-01 00:00:00", 123L, 12L, 12L),
- new AuditInfo("2022-01-01 00:01:00", 124L, 12L, 12L)));
- result.add(auditVO);
- Assertions.assertNotNull(result);
- // close real test for testQueryFromMySQL due to date_format function not support in h2
- // Assertions.assertNotNull(auditService.listByCondition(request));
- }
-
- /**
- * Temporarily close testing for testQueryFromElasticsearch due to lack of elasticsearch dev environment
- * You can open it if exists elasticsearch dev environment
- *
- * @throws IOException The exception may throws
- */
- // @Test
- public void testQueryFromElasticsearch() throws Exception {
- AuditRequest request = new AuditRequest();
- request.setAuditIds(Arrays.asList("3", "4"));
- request.setInlongGroupId("g1");
- request.setInlongStreamId("s1");
- request.setStartDate("2022-01-01");
- request.setEndDate("2022-01-01");
- Assertions.assertNotNull(auditService.listByCondition(request));
- }
-
- @Test
- void testUpdateAuditSource() {
- AuditSourceRequest request1 = AuditSourceRequest.builder()
- .name("source_ch_1")
- .type("CLICKHOUSE")
- .url("jdbc:clickhouse://127.0.0.1:8123/db1")
- .offlineUrl(null)
- .enableAuth(0)
- .build();
- auditService.updateAuditSource(request1, GLOBAL_OPERATOR);
-
- AuditSourceRequest request2 = AuditSourceRequest.builder()
- .name("source_ch_2")
- .type("CLICKHOUSE")
- .url("jdbc:clickhouse://127.0.0.1:8123/db2")
- .offlineUrl("jdbc:clickhouse://127.0.0.1:8123/db1")
- .enableAuth(1)
- .username("default")
- .token("123456")
- .build();
- auditService.updateAuditSource(request2, GLOBAL_OPERATOR);
-
- Assertions.assertEquals(auditService.getAuditSource().getUrl(), request2.getUrl());
- }
-
-}
diff --git a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
index c8e4344842..59ee994fa1 100644
--- a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
+++ b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
@@ -855,29 +855,6 @@ VALUES ('audit_sdk_collect', 'SDK', 0, '1'),
('audit_sort_tube_input', 'TUBEMQ', 0, '33'),
('audit_sort_tube_output', 'TUBEMQ', 1, '34');
--- ----------------------------
--- Table structure for audit_source
--- ----------------------------
-CREATE TABLE IF NOT EXISTS `audit_source`
-(
- `id` int(11) NOT NULL AUTO_INCREMENT,
- `name` varchar(128) NOT NULL COMMENT 'Audit source name',
- `type` varchar(20) NOT NULL COMMENT 'Audit source type, including: MYSQL, CLICKHOUSE, ELASTICSEARCH',
- `url` varchar(256) NOT NULL COMMENT 'Audit source URL, for MYSQL or CLICKHOUSE, is jdbcUrl, and for ELASTICSEARCH is the access URL with hostname:port',
- `enable_auth` tinyint(1) DEFAULT '1' COMMENT 'Enable auth or not, 0: disable, 1: enable',
- `username` varchar(128) COMMENT 'Audit source username, needed if auth_enable is 1' ,
- `token` varchar(512) DEFAULT NULL COMMENT 'Audit source token, needed if auth_enable is 1',
- `status` smallint(4) NOT NULL DEFAULT '1' COMMENT 'Whether the audit source is online or offline, 0: offline, 1: online' ,
- `is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
- `creator` varchar(64) NOT NULL COMMENT 'Creator name',
- `modifier` varchar(64) NOT NULL COMMENT 'Modifier name',
- `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
- `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
- `version` int(11) NOT NULL DEFAULT '1' COMMENT 'Version number, which will be incremented by 1 after modification',
- PRIMARY KEY (`id`),
- UNIQUE KEY `unique_audit_source` (url, `is_deleted`)
-);
-
-- ----------------------------
-- Table structure for tenant_cluster_tag
-- ----------------------------
diff --git a/inlong-manager/manager-web/sql/apache_inlong_manager.sql b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
index 2ad73d3eb7..b7246c1da1 100644
--- a/inlong-manager/manager-web/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
@@ -916,30 +916,6 @@ VALUES ('audit_sdk_collect', 'SDK', 0, '1'),
('audit_sort_tube_input', 'TUBEMQ', 0, '33'),
('audit_sort_tube_output', 'TUBEMQ', 1, '34');
--- ----------------------------
--- Table structure for audit_source
--- ----------------------------
-CREATE TABLE IF NOT EXISTS `audit_source`
-(
- `id` int(11) NOT NULL AUTO_INCREMENT,
- `name` varchar(128) NOT NULL COMMENT 'Audit source name',
- `type` varchar(20) NOT NULL COMMENT 'Audit source type, including: MYSQL, CLICKHOUSE, ELASTICSEARCH',
- `url` varchar(256) NOT NULL COMMENT 'Audit source URL, for MYSQL or CLICKHOUSE, is jdbcUrl, and for ELASTICSEARCH is the access URL with hostname:port',
- `enable_auth` tinyint(1) DEFAULT '1' COMMENT 'Enable auth or not, 0: disable, 1: enable',
- `username` varchar(128) COMMENT 'Audit source username, needed if auth_enable is 1' ,
- `token` varchar(512) DEFAULT NULL COMMENT 'Audit source token, needed if auth_enable is 1',
- `status` smallint(4) NOT NULL DEFAULT '1' COMMENT 'Whether the audit source is online or offline, 0: offline, 1: online' ,
- `is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
- `creator` varchar(64) NOT NULL COMMENT 'Creator name',
- `modifier` varchar(64) NOT NULL COMMENT 'Modifier name',
- `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
- `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
- `version` int(11) NOT NULL DEFAULT '1' COMMENT 'Version number, which will be incremented by 1 after modification',
- PRIMARY KEY (`id`),
- UNIQUE KEY `unique_audit_source` (url, `is_deleted`)
-) ENGINE = InnoDB
- DEFAULT CHARSET = utf8mb4 COMMENT ='Audit source table';
-
-- ----------------------------
-- Table structure for tenant_cluster_tag
-- ----------------------------
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/AuditController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/AuditController.java
index 8bc0c103d8..11b97a1a3c 100644
--- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/AuditController.java
+++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/AuditController.java
@@ -19,11 +19,8 @@
import org.apache.inlong.manager.pojo.audit.AuditBaseResponse;
import org.apache.inlong.manager.pojo.audit.AuditRequest;
-import org.apache.inlong.manager.pojo.audit.AuditSourceRequest;
-import org.apache.inlong.manager.pojo.audit.AuditSourceResponse;
import org.apache.inlong.manager.pojo.audit.AuditVO;
import org.apache.inlong.manager.pojo.common.Response;
-import org.apache.inlong.manager.pojo.user.LoginUserUtils;
import org.apache.inlong.manager.service.core.AuditService;
import io.swagger.annotations.Api;
@@ -70,23 +67,10 @@ public Response refreshCache() {
return Response.success(auditService.refreshBaseItemCache());
}
- @ApiOperation(value = "Update the audit source")
- @PostMapping(value = "/audit/updateSource")
- public Response updateAuditSource(@Valid @RequestBody AuditSourceRequest request) {
- return Response.success(auditService.updateAuditSource(request, LoginUserUtils.getLoginUser().getName()));
- }
-
@ApiOperation(value = "Get the audit base info")
@GetMapping("/audit/getAuditBases")
public Response> getAuditBases() {
return Response.success(auditService.getAuditBases());
}
- @ApiOperation(value = "Get the audit source")
- @GetMapping("/audit/getSource")
- public Response getAuditSource() {
- // TODO support more parameters to query
- return Response.success(auditService.getAuditSource());
- }
-
}
diff --git a/inlong-manager/manager-web/src/main/resources/application-dev.properties b/inlong-manager/manager-web/src/main/resources/application-dev.properties
index e92862ca3e..74259dd340 100644
--- a/inlong-manager/manager-web/src/main/resources/application-dev.properties
+++ b/inlong-manager/manager-web/src/main/resources/application-dev.properties
@@ -50,31 +50,11 @@ spring.datasource.druid.connectionProperties=druid.stat.mergeSql=true;druid.stat
spring.autoconfigure.exclude=org.springframework.boot.autoconfigure.elasticsearch.ElasticsearchRestClientAutoConfiguration
# Audit configuration
-# Audit query source that decide what data source to query, currently only supports [MYSQL|ELASTICSEARCH|CLICKHOUSE]
+# Audit query source that decide what data source to query, currently only supports [MYSQL]
audit.query.source=MYSQL
# Audit query url
audit.query.url=http://127.0.0.1:10080
-# Elasticsearch config
-# Elasticsearch host split by coma if more than one host, such as 'host1,host2'
-es.index.search.hostname=127.0.0.1
-# Elasticsearch port
-es.index.search.port=9200
-# Elasticsearch support authentication flag
-es.auth.enable=false
-# Elasticsearch user of authentication info
-es.auth.user=admin
-# Elasticsearch password of authentication info
-es.auth.password=inlong
-
-# ClickHouse config
-# ClickHouse jdbcUrl
-audit.ck.jdbcUrl=jdbc:clickhouse://127.0.0.1:8123/apache_inlong_audit
-# ClickHouse username
-audit.ck.username=default
-# ClickHouse password
-audit.ck.password=
-
# Database clean
# If turned on, logically deleted data will be collected and permanently deleted periodically
data.clean.enabled=false
diff --git a/inlong-manager/manager-web/src/main/resources/application-prod.properties b/inlong-manager/manager-web/src/main/resources/application-prod.properties
index 8ae818be72..aff8da0cac 100644
--- a/inlong-manager/manager-web/src/main/resources/application-prod.properties
+++ b/inlong-manager/manager-web/src/main/resources/application-prod.properties
@@ -49,31 +49,11 @@ spring.datasource.druid.connectionProperties=druid.stat.mergeSql=true;druid.stat
spring.autoconfigure.exclude=org.springframework.boot.autoconfigure.elasticsearch.ElasticsearchRestClientAutoConfiguration
# Audit configuration
-# Audit query source that decide what data source to query, currently only supports [MYSQL|ELASTICSEARCH|CLICKHOUSE]
+# Audit query source that decide what data source to query, currently only supports [MYSQL]
audit.query.source=MYSQL
# Audit query url
audit.query.url=http://127.0.0.1:10080
-# Elasticsearch config
-# Elasticsearch host split by coma if more than one host, such as 'host1,host2'
-es.index.search.hostname=127.0.0.1
-# Elasticsearch port
-es.index.search.port=9200
-# Elasticsearch support authentication flag
-es.auth.enable=false
-# Elasticsearch user of authentication info
-es.auth.user=admin
-# Elasticsearch password of authentication info
-es.auth.password=inlong
-
-# ClickHouse config
-# ClickHouse jdbcUrl
-audit.ck.jdbcUrl=jdbc:clickhouse://127.0.0.1:8123/apache_inlong_audit
-# ClickHouse username
-audit.ck.username=default
-# ClickHouse password
-audit.ck.password=
-
# Database clean
# If turned on, logically deleted data will be collected and permanently deleted periodically
data.clean.enabled=false
diff --git a/inlong-manager/manager-web/src/main/resources/application-test.properties b/inlong-manager/manager-web/src/main/resources/application-test.properties
index ba694a7b09..a6b1cf0e1d 100644
--- a/inlong-manager/manager-web/src/main/resources/application-test.properties
+++ b/inlong-manager/manager-web/src/main/resources/application-test.properties
@@ -50,31 +50,11 @@ spring.datasource.druid.connectionProperties=druid.stat.mergeSql=true;druid.stat
spring.autoconfigure.exclude=org.springframework.boot.autoconfigure.elasticsearch.ElasticsearchRestClientAutoConfiguration
# Audit configuration
-# Audit query source that decide what data source to query, currently only supports [MYSQL|ELASTICSEARCH|CLICKHOUSE]
+# Audit query source that decide what data source to query, currently only supports [MYSQL]
audit.query.source=MYSQL
# Audit query url
audit.query.url=http://127.0.0.1:10080
-# Elasticsearch config
-# Elasticsearch host split by coma if more than one host, such as 'host1,host2'
-es.index.search.hostname=127.0.0.1
-# Elasticsearch port
-es.index.search.port=9200
-# Elasticsearch support authentication flag
-es.auth.enable=false
-# Elasticsearch user of authentication info
-es.auth.user=admin
-# Elasticsearch password of authentication info
-es.auth.password=inlong
-
-# ClickHouse config
-# ClickHouse jdbcUrl
-audit.ck.jdbcUrl=jdbc:clickhouse://127.0.0.1:8123/apache_inlong_audit
-# ClickHouse username
-audit.ck.username=default
-# ClickHouse password
-audit.ck.password=
-
# Database clean
# If turned on, logically deleted data will be collected and permanently deleted periodically
data.clean.enabled=false