Skip to content

Commit

Permalink
[INLONG-10081][DataProxy] Modify the data format of metadata saved in…
Browse files Browse the repository at this point in the history
… the metadata.json file (#10083)
  • Loading branch information
gosonzhang authored Apr 26, 2024
1 parent 60c2676 commit b5714f8
Show file tree
Hide file tree
Showing 14 changed files with 535 additions and 354 deletions.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.util.HashMap;
import java.util.Map;
import java.util.Objects;

/**
*
Expand Down Expand Up @@ -94,4 +95,23 @@ public String toString() {
.append("params", params)
.toString();
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof CacheClusterConfig)) {
return false;
}
CacheClusterConfig that = (CacheClusterConfig) o;
return Objects.equals(clusterName, that.clusterName)
&& Objects.equals(token, that.token)
&& Objects.equals(params, that.params);
}

@Override
public int hashCode() {
return Objects.hash(clusterName, token, params);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,16 @@ public String toString() {
*/
public static CacheType convert(String value) {
for (CacheType v : values()) {
if (v.value().equals(value)) {
if (v.value().equalsIgnoreCase(value)) {
return v;
}
}
return N;
}

public static CacheType valueOf(int idValue) {
for (CacheType v : values()) {
if (v.getId() == idValue) {
return v;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,16 @@
package org.apache.inlong.dataproxy.config.pojo;

import org.apache.inlong.common.enums.DataTypeEnum;
import org.apache.inlong.common.enums.InlongCompressType;
import org.apache.inlong.common.enums.MessageWrapType;
import org.apache.inlong.sdk.commons.protocol.InlongId;

import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.builder.ToStringBuilder;

import java.util.HashMap;
import java.util.Map;
import java.util.Objects;

/**
* IdTopicConfig
Expand All @@ -42,21 +45,39 @@ public class IdTopicConfig {
private String fieldDelimiter = "|";
private String fileDelimiter = "\n";
private Boolean useExtendedFields = false;
private MessageWrapType msgWrapType = MessageWrapType.UNKNOWN;
private InlongCompressType v1CompressType = InlongCompressType.INLONG_SNAPPY;

private Map<String, String> params = new HashMap<>();

public IdTopicConfig() {

}

public Boolean getUseExtendedFields() {
public boolean isUseExtendedFields() {
return useExtendedFields;
}

public void setUseExtendedFields(Boolean useExtendedFields) {
this.useExtendedFields = useExtendedFields;
}

public MessageWrapType getMsgWrapType() {
return msgWrapType;
}

public void setMsgWrapType(MessageWrapType msgWrapType) {
this.msgWrapType = msgWrapType;
}

public InlongCompressType getV1CompressType() {
return v1CompressType;
}

public void setV1CompressType(InlongCompressType v1CompressType) {
this.v1CompressType = v1CompressType;
}

/**
* get uid
* @return the uid
Expand Down Expand Up @@ -221,11 +242,42 @@ public String toString() {
.append("inlongGroupId", inlongGroupId)
.append("inlongStreamid", inlongStreamid)
.append("topicName", topicName)
.append("tenant", tenant)
.append("nameSpace", nameSpace)
.append("dataType", dataType)
.append("fieldDelimiter", fieldDelimiter)
.append("fileDelimiter", fileDelimiter)
.append("useExtendedFields", useExtendedFields)
.append("msgWrapType", msgWrapType)
.append("pbCompressType", v1CompressType)
.append("params", params)
.toString();
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof IdTopicConfig)) {
return false;
}
IdTopicConfig that = (IdTopicConfig) o;
return uid.equals(that.uid) && Objects.equals(inlongGroupId, that.inlongGroupId)
&& Objects.equals(inlongStreamid, that.inlongStreamid) && topicName.equals(that.topicName)
&& Objects.equals(tenant, that.tenant) && Objects.equals(nameSpace, that.nameSpace)
&& dataType == that.dataType && Objects.equals(fieldDelimiter, that.fieldDelimiter)
&& Objects.equals(fileDelimiter, that.fileDelimiter)
&& Objects.equals(useExtendedFields, that.useExtendedFields)
&& Objects.equals(msgWrapType, that.msgWrapType)
&& v1CompressType == that.v1CompressType
&& Objects.equals(params, that.params);
}

@Override
public int hashCode() {
return Objects.hash(uid, inlongGroupId, inlongStreamid, topicName, tenant, nameSpace,
dataType, fieldDelimiter, fileDelimiter, useExtendedFields, msgWrapType,
v1CompressType, params);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.dataproxy.config.pojo;

import org.apache.commons.lang.builder.ToStringBuilder;

import java.util.Map;

public class InLongMetaConfig {

private String md5;
private CacheType mqType;
private Map<String, CacheClusterConfig> clusterConfigMap;
private Map<String, IdTopicConfig> idTopicConfigMap;

public InLongMetaConfig() {

}

public InLongMetaConfig(String md5, CacheType mqType,
Map<String, CacheClusterConfig> clusterConfigMap,
Map<String, IdTopicConfig> idTopicConfigMap) {
this.md5 = md5;
this.mqType = mqType;
this.clusterConfigMap = clusterConfigMap;
this.idTopicConfigMap = idTopicConfigMap;
}

public String getMd5() {
return md5;
}

public CacheType getMqType() {
return mqType;
}

public Map<String, CacheClusterConfig> getClusterConfigMap() {
return clusterConfigMap;
}

public Map<String, IdTopicConfig> getIdTopicConfigMap() {
return idTopicConfigMap;
}

@Override
public String toString() {
return new ToStringBuilder(this)
.append("md5", md5)
.append("mqType", mqType)
.append("clusterConfigMap", clusterConfigMap)
.append("idTopicConfigMap", idTopicConfigMap)
.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.inlong.dataproxy.source;
package org.apache.inlong.dataproxy.consts;

public class SourceConstants {

Expand Down Expand Up @@ -105,95 +105,4 @@ public class SourceConstants {
public static final String SRC_PROTOCOL_TYPE_TCP = "tcp";
public static final String SRC_PROTOCOL_TYPE_UDP = "udp";
public static final String SRC_PROTOCOL_TYPE_HTTP = "http";

public static final String SERVICE_PROCESSOR_NAME = "service-decoder-name";
public static final String ENABLE_EXCEPTION_RETURN = "enableExceptionReturn";

public static final String TRAFFIC_CLASS = "trafficClass";

public static final String HEART_INTERVAL_SEC = "heart-interval-sec";

public static final String PACKAGE_TIMEOUT_SEC = "package-timeout-sec";

public static final String HEART_SERVERS = "heart-servers";

public static final String TOPIC_KEY = "topic";
public static final String REMOTE_IP_KEY = "srcIp";
public static final String DATAPROXY_IP_KEY = "dpIp";
public static final String MSG_ENCODE_VER = "msgEnType";
public static final String REMOTE_IDC_KEY = "idc";
public static final String MSG_COUNTER_KEY = "msgcnt";
public static final String PKG_COUNTER_KEY = "pkgcnt";
public static final String PKG_TIME_KEY = "msg.pkg.time";
public static final String TRANSFER_KEY = "transfer";
public static final String DEST_IP_KEY = "dstIp";
public static final String INTERFACE_KEY = "interface";
public static final String SINK_MIN_METRIC_KEY = "sink-min-metric-topic";
public static final String SINK_HOUR_METRIC_KEY = "sink-hour-metric-topic";
public static final String SINK_TEN_METRIC_KEY = "sink-ten-metric-topic";
public static final String SINK_QUA_METRIC_KEY = "sink-qua-metric-topic";
public static final String L5_MIN_METRIC_KEY = "l5-min-metric-topic";
public static final String L5_MIN_FAIL_METRIC_KEY = "l5-min-fail-metric-key";
public static final String L5_HOUR_METRIC_KEY = "l5-hour-metric-topic";
public static final String L5_ID_KEY = "l5id";
public static final String SET_KEY = "set";
public static final String CLUSTER_ID_KEY = "clusterId";

public static final String DECODER_BODY = "body";
public static final String DECODER_TOPICKEY = "topic_key";
public static final String DECODER_ATTRS = "attrs";
public static final String MSG_TYPE = "msg_type";
public static final String COMPRESS_TYPE = "compress_type";
public static final String EXTRA_ATTR = "extra_attr";
public static final String COMMON_ATTR_MAP = "common_attr_map";
public static final String MSG_LIST = "msg_list";
public static final String VERSION_TYPE = "version";
public static final String FILE_CHECK_DATA = "file-check-data";
public static final String MINUTE_CHECK_DATA = "minute-check-data";
public static final String SLA_METRIC_DATA = "sla-metric-data";
public static final String SLA_METRIC_GROUPID = "manager_sla_metric";

public static final String FILE_BODY = "file-body";
public static final int MSG_MAX_LENGTH_BYTES = 20 * 1024 * 1024;

public static final String SEQUENCE_ID = "sequencial_id";

public static final String TOTAL_LEN = "totalLen";

public static final String LINK_MAX_ALLOWED_DELAYED_MSG_COUNT = "link_max_allowed_delayed_msg_count";
public static final String SESSION_WARN_DELAYED_MSG_COUNT = "session_warn_delayed_msg_count";
public static final String SESSION_MAX_ALLOWED_DELAYED_MSG_COUNT = "session_max_allowed_delayed_msg_count";
public static final String NETTY_WRITE_BUFFER_HIGH_WATER_MARK = "netty_write_buffer_high_water_mark";
public static final String RECOVER_THREAD_COUNT = "recover_thread_count";

public static final String MANAGER_PATH = "/inlong/manager/openapi";
public static final String MANAGER_GET_CONFIG_PATH = "/dataproxy/getConfig";
public static final String MANAGER_GET_ALL_CONFIG_PATH = "/dataproxy/getAllConfig";
public static final String MANAGER_HEARTBEAT_REPORT = "/heartbeat/report";

public static final String MANAGER_AUTH_SECRET_ID = "manager.auth.secretId";
public static final String MANAGER_AUTH_SECRET_KEY = "manager.auth.secretKey";
// Pulsar config
public static final String KEY_TENANT = "tenant";
public static final String KEY_NAMESPACE = "namespace";

public static final String KEY_SERVICE_URL = "serviceUrl";
public static final String KEY_AUTHENTICATION = "authentication";
public static final String KEY_STATS_INTERVAL_SECONDS = "statsIntervalSeconds";

public static final String KEY_ENABLEBATCHING = "enableBatching";
public static final String KEY_BATCHINGMAXBYTES = "batchingMaxBytes";
public static final String KEY_BATCHINGMAXMESSAGES = "batchingMaxMessages";
public static final String KEY_BATCHINGMAXPUBLISHDELAY = "batchingMaxPublishDelay";
public static final String KEY_MAXPENDINGMESSAGES = "maxPendingMessages";
public static final String KEY_MAXPENDINGMESSAGESACROSSPARTITIONS = "maxPendingMessagesAcrossPartitions";
public static final String KEY_SENDTIMEOUT = "sendTimeout";
public static final String KEY_COMPRESSIONTYPE = "compressionType";
public static final String KEY_BLOCKIFQUEUEFULL = "blockIfQueueFull";
public static final String KEY_ROUNDROBINROUTERBATCHINGPARTITIONSWITCHFREQUENCY = "roundRobinRouter"
+ "BatchingPartitionSwitchFrequency";

public static final String KEY_IOTHREADS = "ioThreads";
public static final String KEY_MEMORYLIMIT = "memoryLimit";
public static final String KEY_CONNECTIONSPERBROKER = "connectionsPerBroker";
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.inlong.dataproxy.config.ConfigManager;
import org.apache.inlong.dataproxy.config.holder.ConfigUpdateCallback;
import org.apache.inlong.dataproxy.consts.AttrConstants;
import org.apache.inlong.dataproxy.consts.SourceConstants;
import org.apache.inlong.dataproxy.consts.StatConstants;
import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.inlong.dataproxy.source;

import org.apache.inlong.dataproxy.consts.SourceConstants;

import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.inlong.dataproxy.source;

import org.apache.inlong.dataproxy.config.ConfigManager;
import org.apache.inlong.dataproxy.consts.SourceConstants;
import org.apache.inlong.dataproxy.utils.ConfStringUtils;

import com.google.common.base.Preconditions;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.inlong.dataproxy.source;

import org.apache.inlong.dataproxy.config.ConfigManager;
import org.apache.inlong.dataproxy.consts.SourceConstants;
import org.apache.inlong.dataproxy.utils.ConfStringUtils;
import org.apache.inlong.dataproxy.utils.EventLoopUtil;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.inlong.dataproxy.source;

import org.apache.inlong.dataproxy.config.ConfigManager;
import org.apache.inlong.dataproxy.consts.SourceConstants;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelOption;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.inlong.dataproxy.utils;

import org.apache.inlong.dataproxy.source.SourceConstants;
import org.apache.inlong.dataproxy.consts.SourceConstants;

import io.netty.channel.Channel;
import org.apache.commons.lang3.StringUtils;
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"result":true,"errCode":0,"md5":"5a3f5939bb7368f493bf41c1d785b8f3","data":{"proxyCluster":{"name":"test_dataproxy","setName":"test_set","zone":"default\u003dtrue","channels":[],"inlongIds":[{"inlongId":"test_group.stream1","topic":"stream1","params":{"namespace":"test_group","ignoreParseError":"true","appGroupName":"app_test_group","productId":"58","productName":"test_meta","wrapWithInlongMsg":"true"}}],"sources":[],"sinks":[]},"cacheClusterSet":{"setName":"test_set","type":"TUBEMQ","cacheClusters":[{"name":"test_tubemq","token":"******","zone":"default\u003dtrue","params":{"masterWebUrl":"http://127.0.0.1:8080","messageQueueHandler":"org.apache.inlong.dataproxy.sink.mq.tube.TubeHandler","master-host-port-list":"127.0.0.1:8000"}}],"topics":[]}}}
{"md5":"5a3f5939bb7368f493bf41c1d785b8f3","mqType":"TUBE","clusterConfigMap":{"test_tubemq":{"clusterName": "test_tubemq","token": "******","zone": "default=true","params": {"masterWebUrl": "http://127.0.0.1:8080","messageQueueHandler": "org.apache.inlong.dataproxy.sink.mq.tube.TubeHandler","master-host-port-list": "127.0.0.1:8000"}}},"idTopicConfigMap":{"test_group.stream1":{"uid":"test_group.stream1","inlongGroupId":"test_group","inlongStreamid":"stream1","topicName":"test_group","dataType":"N","fieldDelimiter":"|","fileDelimiter":"","useExtendedFields":false,"params":{"appGroupName":"app_test_group","productId":"58","namespace":"test_group","useExtendedFields":"false","ignoreParseError":"true","productName":"test_meta","wrapWithInlongMsg":"true"}}}}

0 comments on commit b5714f8

Please sign in to comment.