From 56b071b563df1fda5b6a722418585fde77d6d5ed Mon Sep 17 00:00:00 2001 From: wenweihuang Date: Mon, 20 Nov 2023 16:42:35 +0800 Subject: [PATCH 1/3] [INLONG-9310][Agent] Add extended handler in file source --- .../message/filecollect/ProxyMessage.java | 2 +- .../filecollect/ProxyMessageCache.java | 2 - .../agent/plugin/sources/LogFileSource.java | 4 ++ .../sources/file/extend/ExtendedHandler.java | 37 +++++++++++++++++++ 4 files changed, 42 insertions(+), 3 deletions(-) create mode 100644 inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/extend/ExtendedHandler.java diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/ProxyMessage.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/ProxyMessage.java index 7d9f4930ac8..e8b74f40b10 100644 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/ProxyMessage.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/ProxyMessage.java @@ -27,7 +27,7 @@ import static org.apache.inlong.agent.constant.CommonConstants.PROXY_KEY_STREAM_ID; /** - * Bus message with body, header, inlongGroupId and inlongStreamId. + * proxy message with body, header, inlongGroupId and inlongStreamId. */ public class ProxyMessage implements Message { diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/ProxyMessageCache.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/ProxyMessageCache.java index 9c0c84e0efa..66b3fb3b432 100644 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/ProxyMessageCache.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/ProxyMessageCache.java @@ -59,7 +59,6 @@ public class ProxyMessageCache { private final int cacheTimeout; // streamId -> list of proxyMessage private final ConcurrentHashMap> messageQueueMap; - // private final LinkedBlockingQueue messageQueue; private final AtomicLong cacheSize = new AtomicLong(0); private long lastPrintTime = 0; private long dataTime; @@ -77,7 +76,6 @@ public ProxyMessageCache(InstanceProfile instanceProfile, String groupId, String DEFAULT_PROXY_INLONG_STREAM_ID_QUEUE_MAX_NUMBER); this.cacheTimeout = instanceProfile.getInt(PROXY_PACKAGE_MAX_TIMEOUT_MS, DEFAULT_PROXY_PACKAGE_MAX_TIMEOUT_MS); messageQueueMap = new ConcurrentHashMap<>(); - // this.messageQueue = new LinkedBlockingQueue<>(maxQueueNumber); try { dataTime = DateTransUtils.timeStrConvertToMillSec(instanceProfile.getSourceDataTime(), instanceProfile.get(TASK_CYCLE_UNIT)); diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java index e2b0b06517f..68964d6e4f1 100755 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java @@ -31,6 +31,7 @@ import org.apache.inlong.agent.plugin.Message; import org.apache.inlong.agent.plugin.file.Reader; import org.apache.inlong.agent.plugin.sources.file.AbstractSource; +import org.apache.inlong.agent.plugin.sources.file.extend.ExtendedHandler; import org.apache.inlong.agent.plugin.sources.reader.file.KubernetesMetadataProvider; import org.apache.inlong.agent.plugin.utils.file.FileDataUtils; import org.apache.inlong.agent.utils.AgentUtils; @@ -134,6 +135,7 @@ private class SourceData { private volatile boolean running = false; private long dataTime = 0; private volatile long emptyCount = 0; + private ExtendedHandler extendedHandler; public LogFileSource() { OffsetManager.init(); @@ -159,6 +161,7 @@ public void init(InstanceProfile profile) { queue = new LinkedBlockingQueue<>(CACHE_QUEUE_SIZE); dataTime = DateTransUtils.timeStrConvertToMillSec(profile.getSourceDataTime(), profile.get(TASK_CYCLE_UNIT)); + extendedHandler = new ExtendedHandler(profile); try { registerMeta(profile); } catch (Exception ex) { @@ -354,6 +357,7 @@ private Message createMessage(SourceData sourceData) { header.put(PROXY_KEY_DATA, proxyPartitionKey); header.put(OFFSET, sourceData.offset.toString()); header.put(PROXY_KEY_STREAM_ID, inlongStreamId); + extendedHandler.dealWithHeader(header, sourceData.getData().getBytes(StandardCharsets.UTF_8)); AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS, inlongGroupId, header.get(PROXY_KEY_STREAM_ID), dataTime, 1, msgWithMetaData.length()); Message finalMsg = new DefaultMessage(msgWithMetaData.getBytes(StandardCharsets.UTF_8), header); diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/extend/ExtendedHandler.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/extend/ExtendedHandler.java new file mode 100644 index 00000000000..32215646b20 --- /dev/null +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/extend/ExtendedHandler.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.agent.plugin.sources.file.extend; + +import org.apache.inlong.agent.conf.InstanceProfile; + +import java.util.Map; + +public class ExtendedHandler { + + public ExtendedHandler(InstanceProfile profile) { + + } + + public void dealWithHeader(Map header, byte[] body) { + + } + + public static class Constants { + + } +} From 89286d5877b81708a1d29ec2b8d1b1b251b1d203 Mon Sep 17 00:00:00 2001 From: wenweihuang Date: Mon, 20 Nov 2023 17:04:50 +0800 Subject: [PATCH 2/3] [INLONG-9310][Agent] Add extended handler in file source --- .../agent/plugin/sources/file/extend/ExtendedHandler.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/extend/ExtendedHandler.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/extend/ExtendedHandler.java index 32215646b20..636dcee5a3e 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/extend/ExtendedHandler.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/extend/ExtendedHandler.java @@ -21,12 +21,14 @@ import java.util.Map; +// For some private, customized extension processing public class ExtendedHandler { public ExtendedHandler(InstanceProfile profile) { } + // Modify the header by the body public void dealWithHeader(Map header, byte[] body) { } From b7e154ed3f50f52c41017d10ffa3f50e37439d41 Mon Sep 17 00:00:00 2001 From: wenweihuang Date: Mon, 20 Nov 2023 19:19:35 +0800 Subject: [PATCH 3/3] [INLONG-9310][Agent] Add extended handler in file source --- .../inlong/agent/constant/TaskConstants.java | 3 +++ .../agent/plugin/sources/LogFileSource.java | 15 +++++++++++++-- .../sources/file/extend/ExtendedHandler.java | 2 +- 3 files changed, 17 insertions(+), 3 deletions(-) diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java index 872c42319f2..c501ec110b8 100755 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java @@ -82,6 +82,9 @@ public class TaskConstants extends CommonConstants { public static final String TASK_END_TIME = "task.fileTask.endTime"; public static final String FILE_MAX_NUM = "task.fileTask.maxFileCount"; public static final String PREDEFINE_FIELDS = "task.predefinedFields"; + public static final String FILE_SOURCE_EXTEND_CLASS = "task.fileTask.extendedClass"; + public static final String DEFAULT_FILE_SOURCE_EXTEND_CLASS = + "org.apache.inlong.agent.plugin.sources.file.extend.ExtendedHandler"; // Binlog job public static final String JOB_DATABASE_USER = "job.binlogJob.user"; diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java index 68964d6e4f1..d713fb4e8a9 100755 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java @@ -52,6 +52,7 @@ import java.io.IOException; import java.io.LineNumberReader; import java.io.RandomAccessFile; +import java.lang.reflect.Constructor; import java.nio.charset.StandardCharsets; import java.text.SimpleDateFormat; import java.util.ArrayList; @@ -81,6 +82,7 @@ import static org.apache.inlong.agent.constant.MetadataConstants.METADATA_FILE_NAME; import static org.apache.inlong.agent.constant.MetadataConstants.METADATA_HOST_NAME; import static org.apache.inlong.agent.constant.MetadataConstants.METADATA_SOURCE_IP; +import static org.apache.inlong.agent.constant.TaskConstants.DEFAULT_FILE_SOURCE_EXTEND_CLASS; import static org.apache.inlong.agent.constant.TaskConstants.JOB_FILE_META_ENV_LIST; import static org.apache.inlong.agent.constant.TaskConstants.OFFSET; import static org.apache.inlong.agent.constant.TaskConstants.TASK_CYCLE_UNIT; @@ -161,7 +163,14 @@ public void init(InstanceProfile profile) { queue = new LinkedBlockingQueue<>(CACHE_QUEUE_SIZE); dataTime = DateTransUtils.timeStrConvertToMillSec(profile.getSourceDataTime(), profile.get(TASK_CYCLE_UNIT)); - extendedHandler = new ExtendedHandler(profile); + if (DEFAULT_FILE_SOURCE_EXTEND_CLASS.compareTo(ExtendedHandler.class.getCanonicalName()) != 0) { + Constructor constructor = + Class.forName( + profile.get(TaskConstants.FILE_SOURCE_EXTEND_CLASS, DEFAULT_FILE_SOURCE_EXTEND_CLASS)) + .getDeclaredConstructor(InstanceProfile.class); + constructor.setAccessible(true); + extendedHandler = (ExtendedHandler) constructor.newInstance(profile); + } try { registerMeta(profile); } catch (Exception ex) { @@ -357,7 +366,9 @@ private Message createMessage(SourceData sourceData) { header.put(PROXY_KEY_DATA, proxyPartitionKey); header.put(OFFSET, sourceData.offset.toString()); header.put(PROXY_KEY_STREAM_ID, inlongStreamId); - extendedHandler.dealWithHeader(header, sourceData.getData().getBytes(StandardCharsets.UTF_8)); + if (extendedHandler != null) { + extendedHandler.dealWithHeader(header, sourceData.getData().getBytes(StandardCharsets.UTF_8)); + } AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS, inlongGroupId, header.get(PROXY_KEY_STREAM_ID), dataTime, 1, msgWithMetaData.length()); Message finalMsg = new DefaultMessage(msgWithMetaData.getBytes(StandardCharsets.UTF_8), header); diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/extend/ExtendedHandler.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/extend/ExtendedHandler.java index 636dcee5a3e..8cd2e76f48f 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/extend/ExtendedHandler.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/extend/ExtendedHandler.java @@ -22,7 +22,7 @@ import java.util.Map; // For some private, customized extension processing -public class ExtendedHandler { +public abstract class ExtendedHandler { public ExtendedHandler(InstanceProfile profile) {