From e4a9b264bfa7a88d1a194d53e2a14ffde0298146 Mon Sep 17 00:00:00 2001 From: vernedeng Date: Fri, 19 Apr 2024 00:56:37 +0800 Subject: [PATCH] [INLONG-10017][Sort] Definition of Sort Config --- .../common/constant/CompressionType.java | 26 +++++++++++++ .../inlong/common/constant/DataNodeType.java | 26 +++++++++++++ .../common/constant/DeserializationType.java | 26 +++++++++++++ .../inlong/common/constant/SinkType.java | 26 +++++++++++++ .../common/pojo/sort/SortClusterConfig.java | 34 ++++++++++++++++ .../inlong/common/pojo/sort/SortConfig.java | 30 ++++++++++++++ .../common/pojo/sort/SortTaskConfig.java | 33 ++++++++++++++++ .../pojo/sort/dataflow/DataflowConfig.java | 35 +++++++++++++++++ .../pojo/sort/dataflow/SourceConfig.java | 32 +++++++++++++++ .../CsvDeserializationConfig.java | 27 +++++++++++++ .../DeserializationConfig.java | 36 +++++++++++++++++ .../InlongMsgDeserializationConfig.java | 27 +++++++++++++ .../InlongMsgPbDeserialiationConfig.java | 27 +++++++++++++ .../KvDeserializationConfig.java | 29 ++++++++++++++ .../sort/dataflow/node/ClsNodeConfig.java | 34 ++++++++++++++++ .../pojo/sort/dataflow/node/EsNodeConfig.java | 38 ++++++++++++++++++ .../pojo/sort/dataflow/node/NodeConfig.java | 39 +++++++++++++++++++ .../sort/dataflow/node/PulsarNodeConfig.java | 29 ++++++++++++++ .../sort/dataflow/sink/ClsSinkConfig.java | 36 +++++++++++++++++ .../pojo/sort/dataflow/sink/EsSinkConfig.java | 29 ++++++++++++++ .../sort/dataflow/sink/PulsarSinkConfig.java | 31 +++++++++++++++ .../pojo/sort/dataflow/sink/SinkConfig.java | 36 +++++++++++++++++ .../common/pojo/sort/mq/MqClusterConfig.java | 38 ++++++++++++++++++ .../pojo/sort/mq/PulsarClusterConfig.java | 33 ++++++++++++++++ .../pojo/sort/mq/TubeClusterConfig.java | 28 +++++++++++++ 25 files changed, 785 insertions(+) create mode 100644 inlong-common/src/main/java/org/apache/inlong/common/constant/CompressionType.java create mode 100644 inlong-common/src/main/java/org/apache/inlong/common/constant/DataNodeType.java create mode 100644 inlong-common/src/main/java/org/apache/inlong/common/constant/DeserializationType.java create mode 100644 inlong-common/src/main/java/org/apache/inlong/common/constant/SinkType.java create mode 100644 inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/SortClusterConfig.java create mode 100644 inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/SortConfig.java create mode 100644 inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/SortTaskConfig.java create mode 100644 inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/DataflowConfig.java create mode 100644 inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/SourceConfig.java create mode 100644 inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/deserialization/CsvDeserializationConfig.java create mode 100644 inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/deserialization/DeserializationConfig.java create mode 100644 inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/deserialization/InlongMsgDeserializationConfig.java create mode 100644 inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/deserialization/InlongMsgPbDeserialiationConfig.java create mode 100644 inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/deserialization/KvDeserializationConfig.java create mode 100644 inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/node/ClsNodeConfig.java create mode 100644 inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/node/EsNodeConfig.java create mode 100644 inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/node/NodeConfig.java create mode 100644 inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/node/PulsarNodeConfig.java create mode 100644 inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/sink/ClsSinkConfig.java create mode 100644 inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/sink/EsSinkConfig.java create mode 100644 inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/sink/PulsarSinkConfig.java create mode 100644 inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/sink/SinkConfig.java create mode 100644 inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/mq/MqClusterConfig.java create mode 100644 inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/mq/PulsarClusterConfig.java create mode 100644 inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/mq/TubeClusterConfig.java diff --git a/inlong-common/src/main/java/org/apache/inlong/common/constant/CompressionType.java b/inlong-common/src/main/java/org/apache/inlong/common/constant/CompressionType.java new file mode 100644 index 00000000000..7b9caafe5e1 --- /dev/null +++ b/inlong-common/src/main/java/org/apache/inlong/common/constant/CompressionType.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.common.constant; + +public class CompressionType { + + public static final String GZIP = "GZIP"; + public static final String SNAPPY = "SNAPPY"; + public static final String LZO = "LZO"; + public static final String NONE = "NONE"; +} diff --git a/inlong-common/src/main/java/org/apache/inlong/common/constant/DataNodeType.java b/inlong-common/src/main/java/org/apache/inlong/common/constant/DataNodeType.java new file mode 100644 index 00000000000..2ccae832fa2 --- /dev/null +++ b/inlong-common/src/main/java/org/apache/inlong/common/constant/DataNodeType.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.common.constant; + +public class DataNodeType { + + public static final String KAFKA = "KAFKA"; + public static final String PULSAR = "PULSAR"; + public static final String CLS = "CLS"; + public static final String ELASTICSEARCH = "ELASTICSEARCH"; +} diff --git a/inlong-common/src/main/java/org/apache/inlong/common/constant/DeserializationType.java b/inlong-common/src/main/java/org/apache/inlong/common/constant/DeserializationType.java new file mode 100644 index 00000000000..66999634fac --- /dev/null +++ b/inlong-common/src/main/java/org/apache/inlong/common/constant/DeserializationType.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.common.constant; + +public class DeserializationType { + + public static final String INLONG_MSG = "INLONG_MSG"; + public static final String INLONG_MSG_PB = "INLONG_MSG_PB"; + public static final String CSV = "CSV"; + public static final String KV = "KV"; +} diff --git a/inlong-common/src/main/java/org/apache/inlong/common/constant/SinkType.java b/inlong-common/src/main/java/org/apache/inlong/common/constant/SinkType.java new file mode 100644 index 00000000000..0dba17eb60b --- /dev/null +++ b/inlong-common/src/main/java/org/apache/inlong/common/constant/SinkType.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.common.constant; + +public class SinkType { + + public static final String KAFKA = "KAFKA"; + public static final String PULSAR = "PULSAR"; + public static final String CLS = "CLS"; + public static final String ELASTICSEARCH = "ELASTICSEARCH"; +} diff --git a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/SortClusterConfig.java b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/SortClusterConfig.java new file mode 100644 index 00000000000..d322ca19aeb --- /dev/null +++ b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/SortClusterConfig.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.common.pojo.sort; + +import org.apache.inlong.common.pojo.sort.dataflow.DataflowConfig; +import org.apache.inlong.common.pojo.sort.mq.MqClusterConfig; + +import lombok.Data; + +import java.io.Serializable; +import java.util.List; + +@Data +public class SortClusterConfig implements Serializable { + + private String clusterTag; + private List mqClusterConfigs; + private List dataflowConfigs; +} diff --git a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/SortConfig.java b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/SortConfig.java new file mode 100644 index 00000000000..ad3a6bb8923 --- /dev/null +++ b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/SortConfig.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.common.pojo.sort; + +import lombok.Data; + +import java.io.Serializable; +import java.util.List; + +@Data +public class SortConfig implements Serializable { + + private String sortClusterName; + private List clusters; +} diff --git a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/SortTaskConfig.java b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/SortTaskConfig.java new file mode 100644 index 00000000000..f94590e314e --- /dev/null +++ b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/SortTaskConfig.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.common.pojo.sort; + +import org.apache.inlong.common.pojo.sort.dataflow.node.NodeConfig; + +import lombok.Data; + +import java.io.Serializable; +import java.util.List; + +@Data +public class SortTaskConfig implements Serializable { + + private String sortTaskName; + private List clusters; + private NodeConfig nodeConfig; +} diff --git a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/DataflowConfig.java b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/DataflowConfig.java new file mode 100644 index 00000000000..170c991ab17 --- /dev/null +++ b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/DataflowConfig.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.common.pojo.sort.dataflow; + +import org.apache.inlong.common.pojo.sort.dataflow.sink.SinkConfig; + +import lombok.Data; + +import java.io.Serializable; +import java.util.Map; + +@Data +public class DataflowConfig implements Serializable { + + private String dataflowId; + private Integer version; + private SourceConfig sourceConfig; + private SinkConfig sinkConfig; + private Map properties; +} diff --git a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/SourceConfig.java b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/SourceConfig.java new file mode 100644 index 00000000000..0adf501b4dd --- /dev/null +++ b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/SourceConfig.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.common.pojo.sort.dataflow; + +import org.apache.inlong.common.pojo.sort.dataflow.deserialization.DeserializationConfig; + +import lombok.Data; + +import java.io.Serializable; + +@Data +public class SourceConfig implements Serializable { + + private String topic; + private String subscription; + private DeserializationConfig deserializationConfig; +} diff --git a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/deserialization/CsvDeserializationConfig.java b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/deserialization/CsvDeserializationConfig.java new file mode 100644 index 00000000000..5a071e42ba6 --- /dev/null +++ b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/deserialization/CsvDeserializationConfig.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.common.pojo.sort.dataflow.deserialization; + +import lombok.Data; + +@Data +public class CsvDeserializationConfig implements DeserializationConfig { + + private char delimiter; + private Character escapeChar; +} diff --git a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/deserialization/DeserializationConfig.java b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/deserialization/DeserializationConfig.java new file mode 100644 index 00000000000..49ae3da39a6 --- /dev/null +++ b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/deserialization/DeserializationConfig.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.common.pojo.sort.dataflow.deserialization; + +import org.apache.inlong.common.constant.DeserializationType; + +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; + +import java.io.Serializable; + +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") +@JsonSubTypes({ + @JsonSubTypes.Type(value = InlongMsgDeserializationConfig.class, name = DeserializationType.INLONG_MSG), + @JsonSubTypes.Type(value = InlongMsgPbDeserialiationConfig.class, name = DeserializationType.INLONG_MSG_PB), + @JsonSubTypes.Type(value = CsvDeserializationConfig.class, name = DeserializationType.CSV), + @JsonSubTypes.Type(value = KvDeserializationConfig.class, name = DeserializationType.KV), +}) +public interface DeserializationConfig extends Serializable { + +} diff --git a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/deserialization/InlongMsgDeserializationConfig.java b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/deserialization/InlongMsgDeserializationConfig.java new file mode 100644 index 00000000000..b491f20a00b --- /dev/null +++ b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/deserialization/InlongMsgDeserializationConfig.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.common.pojo.sort.dataflow.deserialization; + +import lombok.Data; + +@Data +public class InlongMsgDeserializationConfig implements DeserializationConfig { + + private String streamId; + private DeserializationConfig innerDeserializationConfig; +} diff --git a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/deserialization/InlongMsgPbDeserialiationConfig.java b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/deserialization/InlongMsgPbDeserialiationConfig.java new file mode 100644 index 00000000000..4b62cf171f8 --- /dev/null +++ b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/deserialization/InlongMsgPbDeserialiationConfig.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.common.pojo.sort.dataflow.deserialization; + +import lombok.Data; + +@Data +public class InlongMsgPbDeserialiationConfig implements DeserializationConfig { + + private final String compressionType; + private DeserializationConfig innerDeserializationConfig; +} diff --git a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/deserialization/KvDeserializationConfig.java b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/deserialization/KvDeserializationConfig.java new file mode 100644 index 00000000000..649e677af6c --- /dev/null +++ b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/deserialization/KvDeserializationConfig.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.common.pojo.sort.dataflow.deserialization; + +import lombok.Data; + +@Data +public class KvDeserializationConfig implements DeserializationConfig { + + private char entrySplitter; + private char kvSplitter; + private String streamId; + private Character escapeChar; +} diff --git a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/node/ClsNodeConfig.java b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/node/ClsNodeConfig.java new file mode 100644 index 00000000000..f1c671561ac --- /dev/null +++ b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/node/ClsNodeConfig.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.common.pojo.sort.dataflow.node; + +import lombok.Data; + +@Data +public class ClsNodeConfig extends NodeConfig { + + private String mainAccountId; + private String subAccountId; + private String sendSecretKey; + private String sendSecretId; + private String manageSecretKey; + private String manageSecretId; + private String endpoint; + private String region; + private String logSetId; +} diff --git a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/node/EsNodeConfig.java b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/node/EsNodeConfig.java new file mode 100644 index 00000000000..789b4cbd6bb --- /dev/null +++ b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/node/EsNodeConfig.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.common.pojo.sort.dataflow.node; + +import lombok.Data; + +@Data +public class EsNodeConfig extends NodeConfig { + + private Integer bulkAction; + private Integer bulkSizeMb; + private Integer flushInterval; + private Integer concurrentRequests; + private Integer maxConnect; + private Integer keywordMaxLength; + private Boolean isUseIndexId; + private Integer maxThreads; + private String auditSetName; + private String httpHosts; + private String username; + private String token; + private String password; +} diff --git a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/node/NodeConfig.java b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/node/NodeConfig.java new file mode 100644 index 00000000000..5ba37674b48 --- /dev/null +++ b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/node/NodeConfig.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.common.pojo.sort.dataflow.node; + +import org.apache.inlong.common.constant.DataNodeType; + +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import lombok.Data; + +import java.io.Serializable; + +@Data +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") +@JsonSubTypes({ + @JsonSubTypes.Type(value = ClsNodeConfig.class, name = DataNodeType.CLS), + @JsonSubTypes.Type(value = EsNodeConfig.class, name = DataNodeType.ELASTICSEARCH), + @JsonSubTypes.Type(value = PulsarNodeConfig.class, name = DataNodeType.PULSAR), +}) +public abstract class NodeConfig implements Serializable { + + private Integer version; + private String nodeName; +} diff --git a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/node/PulsarNodeConfig.java b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/node/PulsarNodeConfig.java new file mode 100644 index 00000000000..f596f600222 --- /dev/null +++ b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/node/PulsarNodeConfig.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.common.pojo.sort.dataflow.node; + +import lombok.Data; + +@Data +public class PulsarNodeConfig extends NodeConfig { + + private String serviceUrl; + private String adminUrl; + private String token; + private String compressionType; +} diff --git a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/sink/ClsSinkConfig.java b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/sink/ClsSinkConfig.java new file mode 100644 index 00000000000..64df375ce32 --- /dev/null +++ b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/sink/ClsSinkConfig.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.common.pojo.sort.dataflow.sink; + +import lombok.Data; +import lombok.EqualsAndHashCode; + +@EqualsAndHashCode(callSuper = true) +@Data +public class ClsSinkConfig extends SinkConfig { + + private String topicId; + private String topicName; + private Integer storageDuration; + private String tag; + private String tokenizer; + private Integer contentOffset; + private Integer fieldOffset; + private String separator; + +} diff --git a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/sink/EsSinkConfig.java b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/sink/EsSinkConfig.java new file mode 100644 index 00000000000..a6eb0f07b85 --- /dev/null +++ b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/sink/EsSinkConfig.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.common.pojo.sort.dataflow.sink; + +import lombok.Data; + +@Data +public class EsSinkConfig extends SinkConfig { + + private String indexNamePattern; + private Integer contentOffset; + private Integer fieldOffset; + private String separator; +} diff --git a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/sink/PulsarSinkConfig.java b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/sink/PulsarSinkConfig.java new file mode 100644 index 00000000000..9142019d205 --- /dev/null +++ b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/sink/PulsarSinkConfig.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.common.pojo.sort.dataflow.sink; + +import lombok.Data; +import lombok.EqualsAndHashCode; + +@EqualsAndHashCode(callSuper = true) +@Data +public class PulsarSinkConfig extends SinkConfig { + + private String pulsarTenant; + private String namespace; + private String topic; + private Integer partitionNum; +} diff --git a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/sink/SinkConfig.java b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/sink/SinkConfig.java new file mode 100644 index 00000000000..e79ceb993d1 --- /dev/null +++ b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/sink/SinkConfig.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.common.pojo.sort.dataflow.sink; + +import org.apache.inlong.common.constant.SinkType; + +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import lombok.Data; + +import java.io.Serializable; + +@Data +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") +@JsonSubTypes({ + @JsonSubTypes.Type(value = ClsSinkConfig.class, name = SinkType.CLS), + @JsonSubTypes.Type(value = EsSinkConfig.class, name = SinkType.ELASTICSEARCH), + @JsonSubTypes.Type(value = PulsarSinkConfig.class, name = SinkType.PULSAR), +}) +public abstract class SinkConfig implements Serializable { +} diff --git a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/mq/MqClusterConfig.java b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/mq/MqClusterConfig.java new file mode 100644 index 00000000000..45eaa6762ee --- /dev/null +++ b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/mq/MqClusterConfig.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.common.pojo.sort.mq; + +import org.apache.inlong.common.constant.MQType; + +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import lombok.Data; + +import java.io.Serializable; + +@Data +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") +@JsonSubTypes({ + @JsonSubTypes.Type(value = PulsarClusterConfig.class, name = MQType.PULSAR), + @JsonSubTypes.Type(value = TubeClusterConfig.class, name = MQType.TUBEMQ) +}) +public abstract class MqClusterConfig implements Serializable { + + private String version; + private String clusterName; +} diff --git a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/mq/PulsarClusterConfig.java b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/mq/PulsarClusterConfig.java new file mode 100644 index 00000000000..1bb503b7a43 --- /dev/null +++ b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/mq/PulsarClusterConfig.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.common.pojo.sort.mq; + +import com.fasterxml.jackson.annotation.JsonInclude; +import lombok.Data; +import lombok.EqualsAndHashCode; + +@EqualsAndHashCode(callSuper = true) +@Data +public class PulsarClusterConfig extends MqClusterConfig { + + @JsonInclude(JsonInclude.Include.NON_NULL) + private String adminUrl; + + @JsonInclude(JsonInclude.Include.NON_NULL) + private String serviceUrl; +} diff --git a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/mq/TubeClusterConfig.java b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/mq/TubeClusterConfig.java new file mode 100644 index 00000000000..baa3a56ffd2 --- /dev/null +++ b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/mq/TubeClusterConfig.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.common.pojo.sort.mq; + +import lombok.Data; +import lombok.EqualsAndHashCode; + +@EqualsAndHashCode(callSuper = true) +@Data +public class TubeClusterConfig extends MqClusterConfig { + + private String masterAddress; +}