From ad381e12a3a9681df7962bdba58be6ea5a83ba32 Mon Sep 17 00:00:00 2001 From: wenweihuang Date: Sat, 13 Apr 2024 19:11:39 +0800 Subject: [PATCH 1/5] [INLONG-9982][Agent] Adjusting the abstraction of source code to facilitate rapid addition of sources --- .../inlong/agent/plugin/file/Source.java | 6 + .../plugin/sources/DatabaseSqlSource.java | 25 ++ .../agent/plugin/sources/KafkaSource.java | 283 +++--------- .../agent/plugin/sources/LogFileSource.java | 421 +++--------------- .../agent/plugin/sources/MongoDBSource.java | 25 ++ .../agent/plugin/sources/MqttSource.java | 25 ++ .../agent/plugin/sources/OracleSource.java | 25 ++ .../plugin/sources/PostgreSQLSource.java | 25 ++ .../agent/plugin/sources/PulsarSource.java | 266 +++-------- .../agent/plugin/sources/RedisSource.java | 25 ++ .../agent/plugin/sources/SQLServerSource.java | 25 ++ .../plugin/sources/file/AbstractSource.java | 329 ++++++++++++++ .../file/KubernetesMetadataProvider.java | 121 ----- .../plugin/sources/TestLogFileSource.java | 1 + 14 files changed, 680 insertions(+), 922 deletions(-) delete mode 100644 inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/KubernetesMetadataProvider.java diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/file/Source.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/file/Source.java index c65f3052275..26a5ec733d9 100755 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/file/Source.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/file/Source.java @@ -50,6 +50,12 @@ public interface Source { */ void init(InstanceProfile profile); + /** + * Executed after init, usually used to start the source's worker thread + * + */ + void start(); + /** * destroy */ diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/DatabaseSqlSource.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/DatabaseSqlSource.java index 227dd375ef4..f24a919ca7b 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/DatabaseSqlSource.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/DatabaseSqlSource.java @@ -88,11 +88,36 @@ public List split(TaskProfile conf) { return readerList; } + @Override + protected String getThreadName() { + return null; + } + + @Override + protected void printCurrentState() { + + } + + @Override + protected boolean doPrepareToRead() { + return false; + } + + @Override + protected List readFromSource() { + return null; + } + @Override public Message read() { return null; } + @Override + protected boolean isRunnable() { + return runnable; + } + @Override public boolean sourceFinish() { return false; diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/KafkaSource.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/KafkaSource.java index fe31730fdc8..defbc780c79 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/KafkaSource.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/KafkaSource.java @@ -17,23 +17,12 @@ package org.apache.inlong.agent.plugin.sources; -import org.apache.inlong.agent.common.AgentThreadFactory; import org.apache.inlong.agent.conf.InstanceProfile; import org.apache.inlong.agent.conf.TaskProfile; -import org.apache.inlong.agent.constant.CycleUnitType; -import org.apache.inlong.agent.core.task.MemoryManager; import org.apache.inlong.agent.except.FileException; -import org.apache.inlong.agent.message.DefaultMessage; -import org.apache.inlong.agent.metrics.audit.AuditUtils; -import org.apache.inlong.agent.plugin.Message; import org.apache.inlong.agent.plugin.file.Reader; import org.apache.inlong.agent.plugin.sources.file.AbstractSource; -import org.apache.inlong.agent.utils.AgentUtils; -import lombok.AllArgsConstructor; -import lombok.Data; -import lombok.NoArgsConstructor; -import org.apache.commons.codec.digest.DigestUtils; import org.apache.commons.lang3.ObjectUtils; import org.apache.commons.lang3.StringUtils; import org.apache.kafka.clients.consumer.ConsumerConfig; @@ -51,23 +40,8 @@ import java.util.List; import java.util.Map; import java.util.Properties; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; -import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_PACKAGE_MAX_SIZE; -import static org.apache.inlong.agent.constant.CommonConstants.PROXY_KEY_DATA; -import static org.apache.inlong.agent.constant.CommonConstants.PROXY_KEY_STREAM_ID; -import static org.apache.inlong.agent.constant.CommonConstants.PROXY_PACKAGE_MAX_SIZE; -import static org.apache.inlong.agent.constant.CommonConstants.PROXY_SEND_PARTITION_KEY; -import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_READER_QUEUE_PERMIT; -import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_READER_SOURCE_PERMIT; -import static org.apache.inlong.agent.constant.TaskConstants.OFFSET; import static org.apache.inlong.agent.constant.TaskConstants.RESTORE_FROM_DB; -import static org.apache.inlong.agent.constant.TaskConstants.TASK_CYCLE_UNIT; import static org.apache.inlong.agent.constant.TaskConstants.TASK_KAFKA_AUTO_COMMIT_OFFSET_RESET; import static org.apache.inlong.agent.constant.TaskConstants.TASK_KAFKA_BOOTSTRAP_SERVERS; import static org.apache.inlong.agent.constant.TaskConstants.TASK_KAFKA_OFFSET; @@ -79,45 +53,17 @@ */ public class KafkaSource extends AbstractSource { - @Data - @AllArgsConstructor - @NoArgsConstructor - private class SourceData { - - private byte[] data; - private Long offset; - } - private static final Logger LOGGER = LoggerFactory.getLogger(KafkaSource.class); - private static final ThreadPoolExecutor EXECUTOR_SERVICE = new ThreadPoolExecutor( - 0, Integer.MAX_VALUE, - 1L, TimeUnit.SECONDS, - new SynchronousQueue<>(), - new AgentThreadFactory("kafka-source")); - private BlockingQueue queue; - public InstanceProfile profile; - private int maxPackSize; - private String taskId; - private String instanceId; private String topic; private Properties props = new Properties(); private String allPartitionOffsets; Map partitionOffsets = new HashMap<>(); - private volatile boolean running = false; - private volatile boolean runnable = true; - private volatile AtomicLong emptyCount = new AtomicLong(0); - - private final Integer CACHE_QUEUE_SIZE = 100000; - private final Integer READ_WAIT_TIMEOUT_MS = 10; - private final Integer EMPTY_CHECK_COUNT_AT_LEAST = 5 * 60; - private final Integer BATCH_TOTAL_LEN = 1024 * 1024; - private static final String KAFKA_DESERIALIZER_METHOD = "org.apache.kafka.common.serialization.ByteArrayDeserializer"; private static final String KAFKA_SESSION_TIMEOUT = "session.timeout.ms"; - private final Integer CORE_THREAD_PRINT_INTERVAL_MS = 1000; - private boolean isRealTime = false; private boolean isRestoreFromDB = false; + private KafkaConsumer kafkaConsumer; + private long offset = 0L; public KafkaSource() { } @@ -128,15 +74,6 @@ public void init(InstanceProfile profile) { LOGGER.info("KafkaSource init: {}", profile.toJsonStr()); this.profile = profile; super.init(profile); - String cycleUnit = profile.get(TASK_CYCLE_UNIT); - if (cycleUnit.compareToIgnoreCase(CycleUnitType.REAL_TIME) == 0) { - isRealTime = true; - cycleUnit = CycleUnitType.HOUR; - } - queue = new LinkedBlockingQueue<>(CACHE_QUEUE_SIZE); - maxPackSize = profile.getInt(PROXY_PACKAGE_MAX_SIZE, DEFAULT_PROXY_PACKAGE_MAX_SIZE); - taskId = profile.getTaskId(); - instanceId = profile.getInstanceId(); topic = profile.getInstanceId(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, profile.get(TASK_KAFKA_BOOTSTRAP_SERVERS)); props.put(ConsumerConfig.GROUP_ID_CONFIG, taskId); @@ -155,196 +92,84 @@ public void init(InstanceProfile profile) { Long.valueOf(offset.split(TASK_KAFKA_PARTITION_OFFSET_DELIMITER)[1])); } } - - EXECUTOR_SERVICE.execute(run()); + kafkaConsumer = getKafkaConsumer(); } catch (Exception ex) { stopRunning(); throw new FileException("error init stream for " + topic, ex); } } - private Runnable run() { - return () -> { - AgentThreadFactory.nameThread("kafka-source-" + taskId + "-" + instanceId); - running = true; - try { - List partitionInfoList; - try (KafkaConsumer consumer = new KafkaConsumer<>(props)) { - partitionInfoList = consumer.partitionsFor(topic); - } - - props.put(KAFKA_SESSION_TIMEOUT, 30000); - - try (KafkaConsumer kafkaConsumer = new KafkaConsumer<>(props)) { - if (null != partitionInfoList) { - List topicPartitions = new ArrayList<>(); - for (PartitionInfo partitionInfo : partitionInfoList) { - TopicPartition topicPartition = new TopicPartition(partitionInfo.topic(), - partitionInfo.partition()); - topicPartitions.add(topicPartition); - } - kafkaConsumer.assign(topicPartitions); - - if (!isRestoreFromDB && StringUtils.isNotBlank(allPartitionOffsets)) { - for (TopicPartition topicPartition : topicPartitions) { - Long offset = partitionOffsets.get(topicPartition.partition()); - if (ObjectUtils.isNotEmpty(offset)) { - kafkaConsumer.seek(topicPartition, offset); - } - } - } else { - LOGGER.info("Skip to seek offset"); - } - } - doRun(kafkaConsumer); - } - } catch (Throwable e) { - LOGGER.error("do run error maybe topic is configured incorrectly: ", e); - } - running = false; - }; + @Override + protected String getThreadName() { + return "kafka-source-" + taskId + "-" + instanceId; } - private void doRun(KafkaConsumer kafkaConsumer) { - long lastPrintTime = 0; - while (isRunnable()) { - boolean suc = waitForPermit(AGENT_GLOBAL_READER_SOURCE_PERMIT, BATCH_TOTAL_LEN); - if (!suc) { - break; - } - ConsumerRecords records = kafkaConsumer.poll(Duration.ofMillis(1000)); - if (records.isEmpty()) { - if (queue.isEmpty()) { - emptyCount.incrementAndGet(); - } else { - emptyCount.set(0); - } - MemoryManager.getInstance().release(AGENT_GLOBAL_READER_SOURCE_PERMIT, BATCH_TOTAL_LEN); - AgentUtils.silenceSleepInSeconds(1); - continue; - } - emptyCount.set(0); - long offset = 0L; - for (ConsumerRecord record : records) { - SourceData sourceData = new SourceData(record.value(), record.offset()); - boolean suc4Queue = waitForPermit(AGENT_GLOBAL_READER_QUEUE_PERMIT, record.value().length); - if (!suc4Queue) { - break; - } - putIntoQueue(sourceData); - offset = record.offset(); - } - MemoryManager.getInstance().release(AGENT_GLOBAL_READER_SOURCE_PERMIT, BATCH_TOTAL_LEN); - kafkaConsumer.commitSync(); - - if (AgentUtils.getCurrentTime() - lastPrintTime > CORE_THREAD_PRINT_INTERVAL_MS) { - lastPrintTime = AgentUtils.getCurrentTime(); - LOGGER.info("kafka topic is {}, offset is {}", topic, offset); - } - } + @Override + protected boolean doPrepareToRead() { + return true; } - private boolean waitForPermit(String permitName, int permitLen) { - boolean suc = false; - while (!suc) { - suc = MemoryManager.getInstance().tryAcquire(permitName, permitLen); - if (!suc) { - MemoryManager.getInstance().printDetail(permitName, "log file source"); - if (!isRunnable()) { - return false; - } - AgentUtils.silenceSleepInSeconds(1); - } + @Override + protected List readFromSource() { + List dataList = new ArrayList<>(); + ConsumerRecords records = kafkaConsumer.poll(Duration.ofMillis(1000)); + for (ConsumerRecord record : records) { + SourceData sourceData = new SourceData(record.value(), record.offset()); + dataList.add(sourceData); + offset = record.offset(); } - return true; + kafkaConsumer.commitSync(); + return dataList; } - private void putIntoQueue(SourceData sourceData) { - if (sourceData == null) { - return; - } + private KafkaConsumer getKafkaConsumer() { try { - boolean offerSuc = false; - if (queue.remainingCapacity() > 0) { - while (isRunnable() && !offerSuc) { - offerSuc = queue.offer(sourceData, 1, TimeUnit.SECONDS); - } + List partitionInfoList; + try (KafkaConsumer consumer = new KafkaConsumer<>(props)) { + partitionInfoList = consumer.partitionsFor(topic); } - - if (!offerSuc) { - MemoryManager.getInstance().release(AGENT_GLOBAL_READER_QUEUE_PERMIT, sourceData.data.length); + props.put(KAFKA_SESSION_TIMEOUT, 30000); + try (KafkaConsumer kafkaConsumer = new KafkaConsumer<>(props)) { + if (null != partitionInfoList) { + List topicPartitions = new ArrayList<>(); + for (PartitionInfo partitionInfo : partitionInfoList) { + TopicPartition topicPartition = new TopicPartition(partitionInfo.topic(), + partitionInfo.partition()); + topicPartitions.add(topicPartition); + } + kafkaConsumer.assign(topicPartitions); + if (!isRestoreFromDB && StringUtils.isNotBlank(allPartitionOffsets)) { + for (TopicPartition topicPartition : topicPartitions) { + Long offset = partitionOffsets.get(topicPartition.partition()); + if (ObjectUtils.isNotEmpty(offset)) { + kafkaConsumer.seek(topicPartition, offset); + } + } + } else { + LOGGER.info("Skip to seek offset"); + } + return kafkaConsumer; + } } - LOGGER.debug("Read {} from kafka topic {}", sourceData.getData(), topic); - } catch (InterruptedException e) { - MemoryManager.getInstance().release(AGENT_GLOBAL_READER_QUEUE_PERMIT, sourceData.data.length); - LOGGER.error("fetchData offer failed {}", e.getMessage()); + } catch (Throwable e) { + LOGGER.error("do run error maybe topic is configured incorrectly: ", e); } - } - - public boolean isRunnable() { - return runnable; - } - - /** - * Stop running threads. - */ - public void stopRunning() { - runnable = false; - } - - @Override - public List split(TaskProfile conf) { return null; } @Override - public Message read() { - SourceData sourceData = null; - try { - sourceData = queue.poll(READ_WAIT_TIMEOUT_MS, TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - LOGGER.warn("poll {} data get interrupted.", topic, e); - } - if (sourceData == null) { - return null; - } - MemoryManager.getInstance().release(AGENT_GLOBAL_READER_QUEUE_PERMIT, sourceData.data.length); - Message finalMsg = createMessage(sourceData); - return finalMsg; + protected void printCurrentState() { + LOGGER.info("kafka topic is {}, offset is {}", topic, offset); } - private Message createMessage(SourceData sourceData) { - String proxyPartitionKey = profile.get(PROXY_SEND_PARTITION_KEY, DigestUtils.md5Hex(inlongGroupId)); - Map header = new HashMap<>(); - header.put(PROXY_KEY_DATA, proxyPartitionKey); - header.put(OFFSET, sourceData.offset.toString()); - header.put(PROXY_KEY_STREAM_ID, inlongStreamId); - - long auditTime = 0; - if (isRealTime) { - auditTime = AgentUtils.getCurrentTime(); - } else { - auditTime = profile.getSinkDataTime(); - } - AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS, inlongGroupId, header.get(PROXY_KEY_STREAM_ID), - auditTime, 1, sourceData.data.length); - AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS_REAL_TIME, inlongGroupId, header.get(PROXY_KEY_STREAM_ID), - AgentUtils.getCurrentTime(), 1, sourceData.data.length); - Message finalMsg = new DefaultMessage(sourceData.data, header); - if (finalMsg.getBody().length > maxPackSize) { - LOGGER.warn("message size is {}, greater than max pack size {}, drop it!", - finalMsg.getBody().length, maxPackSize); - return null; - } - return finalMsg; + @Override + protected boolean isRunnable() { + return runnable; } @Override - public boolean sourceFinish() { - if (isRealTime) { - return false; - } - return emptyCount.get() > EMPTY_CHECK_COUNT_AT_LEAST; + public List split(TaskProfile conf) { + return null; } @Override diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java index 9f83f6fc06e..89574fa3622 100755 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java @@ -17,32 +17,17 @@ package org.apache.inlong.agent.plugin.sources; -import org.apache.inlong.agent.common.AgentThreadFactory; import org.apache.inlong.agent.conf.InstanceProfile; -import org.apache.inlong.agent.conf.OffsetProfile; import org.apache.inlong.agent.conf.TaskProfile; -import org.apache.inlong.agent.constant.CycleUnitType; import org.apache.inlong.agent.constant.DataCollectType; import org.apache.inlong.agent.constant.TaskConstants; -import org.apache.inlong.agent.core.task.MemoryManager; -import org.apache.inlong.agent.core.task.OffsetManager; import org.apache.inlong.agent.except.FileException; -import org.apache.inlong.agent.message.DefaultMessage; import org.apache.inlong.agent.metrics.audit.AuditUtils; -import org.apache.inlong.agent.plugin.Message; import org.apache.inlong.agent.plugin.file.Reader; import org.apache.inlong.agent.plugin.sources.file.AbstractSource; -import org.apache.inlong.agent.plugin.sources.file.extend.ExtendedHandler; -import org.apache.inlong.agent.plugin.sources.reader.file.KubernetesMetadataProvider; import org.apache.inlong.agent.plugin.utils.file.FileDataUtils; import org.apache.inlong.agent.utils.AgentUtils; -import org.apache.inlong.agent.utils.DateTransUtils; -import com.google.gson.Gson; -import lombok.AllArgsConstructor; -import lombok.Data; -import lombok.NoArgsConstructor; -import org.apache.commons.codec.digest.DigestUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,93 +38,27 @@ import java.io.IOException; import java.io.LineNumberReader; import java.io.RandomAccessFile; -import java.lang.reflect.Constructor; -import java.nio.charset.StandardCharsets; -import java.text.SimpleDateFormat; import java.util.ArrayList; -import java.util.Arrays; -import java.util.Date; -import java.util.HashMap; import java.util.List; -import java.util.Map; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - -import static org.apache.inlong.agent.constant.CommonConstants.COMMA; -import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_PACKAGE_MAX_SIZE; -import static org.apache.inlong.agent.constant.CommonConstants.PROXY_KEY_DATA; -import static org.apache.inlong.agent.constant.CommonConstants.PROXY_KEY_STREAM_ID; -import static org.apache.inlong.agent.constant.CommonConstants.PROXY_PACKAGE_MAX_SIZE; -import static org.apache.inlong.agent.constant.CommonConstants.PROXY_SEND_PARTITION_KEY; -import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_READER_QUEUE_PERMIT; -import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_READER_SOURCE_PERMIT; -import static org.apache.inlong.agent.constant.KubernetesConstants.KUBERNETES; -import static org.apache.inlong.agent.constant.MetadataConstants.DATA_CONTENT; -import static org.apache.inlong.agent.constant.MetadataConstants.DATA_CONTENT_TIME; -import static org.apache.inlong.agent.constant.MetadataConstants.ENV_CVM; -import static org.apache.inlong.agent.constant.MetadataConstants.METADATA_FILE_NAME; -import static org.apache.inlong.agent.constant.MetadataConstants.METADATA_HOST_NAME; -import static org.apache.inlong.agent.constant.MetadataConstants.METADATA_SOURCE_IP; -import static org.apache.inlong.agent.constant.TaskConstants.DEFAULT_FILE_SOURCE_EXTEND_CLASS; -import static org.apache.inlong.agent.constant.TaskConstants.OFFSET; -import static org.apache.inlong.agent.constant.TaskConstants.TASK_CYCLE_UNIT; -import static org.apache.inlong.agent.constant.TaskConstants.TASK_FILE_META_ENV_LIST; /** * Read text files */ public class LogFileSource extends AbstractSource { - @Data - @AllArgsConstructor - @NoArgsConstructor - private class SourceData { - - private String data; - private Long offset; - } - private static final Logger LOGGER = LoggerFactory.getLogger(LogFileSource.class); - private static final ThreadPoolExecutor EXECUTOR_SERVICE = new ThreadPoolExecutor( - 0, Integer.MAX_VALUE, - 1L, TimeUnit.SECONDS, - new SynchronousQueue<>(), - new AgentThreadFactory("log-file-source")); - private final Integer BATCH_READ_LINE_COUNT = 10000; - private final Integer BATCH_READ_LINE_TOTAL_LEN = 1024 * 1024; - private final Integer CORE_THREAD_PRINT_INTERVAL_MS = 1000; - private final Integer CACHE_QUEUE_SIZE = 10 * BATCH_READ_LINE_COUNT; private final Integer SIZE_OF_BUFFER_TO_READ_FILE = 64 * 1024; - private final Integer EMPTY_CHECK_COUNT_AT_LEAST = 5 * 60; private final Long INODE_UPDATE_INTERVAL_MS = 1000L; - private final Integer READ_WAIT_TIMEOUT_MS = 10; - private final SimpleDateFormat RECORD_TIME_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); - public InstanceProfile profile; - private String taskId; - private String instanceId; - private int maxPackSize; + private String fileName; private File file; private byte[] bufferToReadFile; public volatile long linePosition = 0; public volatile long bytePosition = 0; - private boolean needMetadata = false; - public Map metadata; private boolean isIncrement = false; - private BlockingQueue queue; - private final Gson GSON = new Gson(); - private volatile boolean runnable = true; private volatile boolean fileExist = true; private String inodeInfo; private volatile long lastInodeUpdateTime = 0; - private volatile boolean running = false; - private long dataTime = 0; - private volatile long emptyCount = 0; - private ExtendedHandler extendedHandler; - private boolean isRealTime = false; public LogFileSource() { } @@ -148,17 +67,8 @@ public LogFileSource() { public void init(InstanceProfile profile) { try { LOGGER.info("LogFileSource init: {}", profile.toJsonStr()); - this.profile = profile; super.init(profile); - String cycleUnit = profile.get(TASK_CYCLE_UNIT); - if (cycleUnit.compareToIgnoreCase(CycleUnitType.REAL_TIME) == 0) { - isRealTime = true; - cycleUnit = CycleUnitType.HOUR; - } - taskId = profile.getTaskId(); - instanceId = profile.getInstanceId(); fileName = profile.getInstanceId(); - maxPackSize = profile.getInt(PROXY_PACKAGE_MAX_SIZE, DEFAULT_PROXY_PACKAGE_MAX_SIZE); bufferToReadFile = new byte[SIZE_OF_BUFFER_TO_READ_FILE]; isIncrement = isIncrement(profile); file = new File(fileName); @@ -166,28 +76,68 @@ public void init(InstanceProfile profile) { lastInodeUpdateTime = AgentUtils.getCurrentTime(); linePosition = getInitLineOffset(isIncrement, taskId, instanceId, inodeInfo); bytePosition = getBytePositionByLine(linePosition); - queue = new LinkedBlockingQueue<>(CACHE_QUEUE_SIZE); - dataTime = DateTransUtils.timeStrConvertToMillSec(profile.getSourceDataTime(), cycleUnit); - if (DEFAULT_FILE_SOURCE_EXTEND_CLASS.compareTo(ExtendedHandler.class.getCanonicalName()) != 0) { - Constructor constructor = - Class.forName( - profile.get(TaskConstants.FILE_SOURCE_EXTEND_CLASS, DEFAULT_FILE_SOURCE_EXTEND_CLASS)) - .getDeclaredConstructor(InstanceProfile.class); - constructor.setAccessible(true); - extendedHandler = (ExtendedHandler) constructor.newInstance(profile); - } - try { - registerMeta(profile); - } catch (Exception ex) { - LOGGER.error("init metadata error", ex); - } - EXECUTOR_SERVICE.execute(run()); } catch (Exception ex) { stopRunning(); throw new FileException("error init stream for " + file.getPath(), ex); } } + @Override + protected boolean doPrepareToRead() { + if (isInodeChanged()) { + fileExist = false; + LOGGER.info("inode changed, instance will restart and offset will be clean, file {}", + fileName); + return false; + } + if (file.length() < bytePosition) { + fileExist = false; + LOGGER.info("file rotate, instance will restart and offset will be clean, file {}", + fileName); + return false; + } + return true; + } + + @Override + protected List readFromSource() { + try { + return readFromPos(bytePosition); + } catch (FileNotFoundException e) { + fileExist = false; + LOGGER.error("readFromPos file deleted error: ", e); + } catch (IOException e) { + LOGGER.error("readFromPos error: ", e); + } + return null; + } + + @Override + protected void printCurrentState() { + LOGGER.info("path is {}, linePosition {}, bytePosition is {} file len {}", file.getName(), linePosition, + bytePosition, file.length()); + } + + @Override + protected String getThreadName() { + return "log-file-source-" + taskId + "-" + fileName; + } + + private List readFromPos(long pos) throws IOException { + List lines = new ArrayList<>(); + List dataList = new ArrayList<>(); + RandomAccessFile input = new RandomAccessFile(file, "r"); + bytePosition = readLines(input, pos, lines, BATCH_READ_LINE_COUNT, BATCH_READ_LINE_TOTAL_LEN, false); + for (int i = 0; i < lines.size(); i++) { + linePosition++; + dataList.add(new SourceData(lines.get(i), linePosition)); + } + if (input != null) { + input.close(); + } + return dataList; + } + private int getRealLineCount(String fileName) { try (LineNumberReader lineNumberReader = new LineNumberReader(new FileReader(instanceId))) { lineNumberReader.skip(Long.MAX_VALUE); @@ -199,7 +149,6 @@ private int getRealLineCount(String fileName) { } private long getInitLineOffset(boolean isIncrement, String taskId, String instanceId, String inodeInfo) { - OffsetProfile offsetProfile = OffsetManager.getInstance().getOffset(taskId, instanceId); long offset = 0; if (offsetProfile != null && offsetProfile.getInodeInfo().compareTo(inodeInfo) == 0) { offset = offsetProfile.getOffset(); @@ -229,24 +178,6 @@ public File getFile() { return file; } - public void registerMeta(InstanceProfile jobConf) { - if (!jobConf.hasKey(TASK_FILE_META_ENV_LIST)) { - return; - } - String[] env = jobConf.get(TASK_FILE_META_ENV_LIST).split(COMMA); - Arrays.stream(env).forEach(data -> { - if (data.equalsIgnoreCase(KUBERNETES)) { - needMetadata = true; - new KubernetesMetadataProvider(this).getData(); - } else if (data.equalsIgnoreCase(ENV_CVM)) { - needMetadata = true; - metadata.put(METADATA_HOST_NAME, AgentUtils.getLocalHost()); - metadata.put(METADATA_SOURCE_IP, AgentUtils.fetchLocalIp()); - metadata.put(METADATA_FILE_NAME, file.getName()); - } - }); - } - private boolean isIncrement(InstanceProfile profile) { if (profile.hasKey(TaskConstants.TASK_FILE_CONTENT_COLLECT_TYPE) && DataCollectType.INCREMENT .equalsIgnoreCase(profile.get(TaskConstants.TASK_FILE_CONTENT_COLLECT_TYPE))) { @@ -262,7 +193,7 @@ private long getBytePositionByLine(long linePosition) throws IOException { try { input = new RandomAccessFile(file, "r"); while (readCount < linePosition) { - List lines = new ArrayList<>(); + List lines = new ArrayList<>(); pos = readLines(input, pos, lines, Math.min((int) (linePosition - readCount), BATCH_READ_LINE_COUNT), BATCH_READ_LINE_TOTAL_LEN, true); readCount += lines.size(); @@ -289,7 +220,7 @@ private long getBytePositionByLine(long linePosition) throws IOException { * @return The new position after the lines have been read * @throws IOException if an I/O error occurs. */ - private long readLines(RandomAccessFile reader, long pos, List lines, int maxLineCount, int maxLineTotalLen, + private long readLines(RandomAccessFile reader, long pos, List lines, int maxLineCount, int maxLineTotalLen, boolean isCounting) throws IOException { if (maxLineCount == 0) { @@ -309,11 +240,10 @@ private long readLines(RandomAccessFile reader, long pos, List lines, in switch (ch) { case '\n': if (isCounting) { - lines.add(new String("")); + lines.add(null); } else { - String temp = new String(baos.toByteArray(), StandardCharsets.UTF_8); - lines.add(temp); - lineTotalLen += temp.length(); + lines.add(baos.toByteArray()); + lineTotalLen += baos.size(); } rePos = pos + i + 1; if (overLen) { @@ -350,79 +280,6 @@ private long readLines(RandomAccessFile reader, long pos, List lines, in return rePos; } - @Override - public Message read() { - SourceData sourceData = null; - try { - sourceData = queue.poll(READ_WAIT_TIMEOUT_MS, TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - LOGGER.warn("poll {} data get interrupted.", file.getPath(), e); - } - if (sourceData == null) { - return null; - } - MemoryManager.getInstance().release(AGENT_GLOBAL_READER_QUEUE_PERMIT, sourceData.data.length()); - Message finalMsg = createMessage(sourceData); - return finalMsg; - } - - private Message createMessage(SourceData sourceData) { - String msgWithMetaData = fillMetaData(sourceData.data); - String proxyPartitionKey = profile.get(PROXY_SEND_PARTITION_KEY, DigestUtils.md5Hex(inlongGroupId)); - Map header = new HashMap<>(); - header.put(PROXY_KEY_DATA, proxyPartitionKey); - header.put(OFFSET, sourceData.offset.toString()); - header.put(PROXY_KEY_STREAM_ID, inlongStreamId); - if (extendedHandler != null) { - extendedHandler.dealWithHeader(header, sourceData.getData().getBytes(StandardCharsets.UTF_8)); - } - long auditTime = 0; - if (isRealTime) { - auditTime = AgentUtils.getCurrentTime(); - } else { - auditTime = profile.getSinkDataTime(); - } - AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS, inlongGroupId, header.get(PROXY_KEY_STREAM_ID), - auditTime, 1, msgWithMetaData.length()); - AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS_REAL_TIME, inlongGroupId, header.get(PROXY_KEY_STREAM_ID), - AgentUtils.getCurrentTime(), 1, msgWithMetaData.length()); - Message finalMsg = new DefaultMessage(msgWithMetaData.getBytes(StandardCharsets.UTF_8), header); - // if the message size is greater than max pack size,should drop it. - if (finalMsg.getBody().length > maxPackSize) { - LOGGER.warn("message size is {}, greater than max pack size {}, drop it!", - finalMsg.getBody().length, maxPackSize); - return null; - } - return finalMsg; - } - - public String fillMetaData(String message) { - if (!needMetadata) { - return message; - } - long timestamp = System.currentTimeMillis(); - boolean isJson = FileDataUtils.isJSON(message); - Map mergeData = new HashMap<>(metadata); - mergeData.put(DATA_CONTENT, FileDataUtils.getK8sJsonLog(message, isJson)); - mergeData.put(DATA_CONTENT_TIME, RECORD_TIME_FORMAT.format(new Date(timestamp))); - return GSON.toJson(mergeData); - } - - private boolean waitForPermit(String permitName, int permitLen) { - boolean suc = false; - while (!suc) { - suc = MemoryManager.getInstance().tryAcquire(permitName, permitLen); - if (!suc) { - MemoryManager.getInstance().printDetail(permitName, "log file source"); - if (isInodeChanged() || !isRunnable()) { - return false; - } - AgentUtils.silenceSleepInSeconds(1); - } - } - return true; - } - private boolean isInodeChanged() { if (AgentUtils.getCurrentTime() - lastInodeUpdateTime > INODE_UPDATE_INTERVAL_MS) { try { @@ -435,159 +292,9 @@ private boolean isInodeChanged() { return false; } - private Runnable run() { - return () -> { - AgentThreadFactory.nameThread("log-file-source-" + taskId + "-" + file); - running = true; - try { - doRun(); - } catch (Throwable e) { - LOGGER.error("do run error maybe file deleted: ", e); - } - running = false; - }; - } - - private void doRun() { - long lastPrintTime = 0; - while (isRunnable() && fileExist) { - if (isInodeChanged()) { - fileExist = false; - LOGGER.info("inode changed, instance will restart and offset will be clean, file {}", - fileName); - break; - } - if (file.length() < bytePosition) { - fileExist = false; - LOGGER.info("file rotate, instance will restart and offset will be clean, file {}", - fileName); - break; - } - boolean suc = waitForPermit(AGENT_GLOBAL_READER_SOURCE_PERMIT, BATCH_READ_LINE_TOTAL_LEN); - if (!suc) { - break; - } - List lines = null; - try { - lines = readFromPos(bytePosition); - } catch (FileNotFoundException e) { - fileExist = false; - LOGGER.error("readFromPos file deleted error: ", e); - } catch (IOException e) { - LOGGER.error("readFromPos error: ", e); - } - if (lines.isEmpty()) { - if (queue.isEmpty()) { - emptyCount++; - } else { - emptyCount = 0; - } - MemoryManager.getInstance().release(AGENT_GLOBAL_READER_SOURCE_PERMIT, BATCH_READ_LINE_TOTAL_LEN); - AgentUtils.silenceSleepInSeconds(1); - continue; - } - emptyCount = 0; - for (int i = 0; i < lines.size(); i++) { - boolean suc4Queue = waitForPermit(AGENT_GLOBAL_READER_QUEUE_PERMIT, lines.get(i).data.length()); - if (!suc4Queue) { - break; - } - putIntoQueue(lines.get(i)); - } - MemoryManager.getInstance().release(AGENT_GLOBAL_READER_SOURCE_PERMIT, BATCH_READ_LINE_TOTAL_LEN); - if (AgentUtils.getCurrentTime() - lastPrintTime > CORE_THREAD_PRINT_INTERVAL_MS) { - lastPrintTime = AgentUtils.getCurrentTime(); - LOGGER.info("path is {}, linePosition {}, bytePosition is {} file len {}, reads lines size {}", - file.getName(), linePosition, bytePosition, file.length(), lines.size()); - } - } - } - - private void putIntoQueue(SourceData sourceData) { - if (sourceData == null) { - return; - } - try { - boolean offerSuc = false; - while (isRunnable() && offerSuc != true) { - offerSuc = queue.offer(sourceData, 1, TimeUnit.SECONDS); - } - if (!offerSuc) { - MemoryManager.getInstance().release(AGENT_GLOBAL_READER_QUEUE_PERMIT, sourceData.data.length()); - } - LOGGER.debug("Read {} from file {}", sourceData.getData(), fileName); - } catch (InterruptedException e) { - MemoryManager.getInstance().release(AGENT_GLOBAL_READER_QUEUE_PERMIT, sourceData.data.length()); - LOGGER.error("fetchData offer failed {}", e.getMessage()); - } - } - - /** - * Whether threads can in running state with while loop. - * - * @return true if threads can run - */ - public boolean isRunnable() { - return runnable; - } - - /** - * Stop running threads. - */ - public void stopRunning() { - runnable = false; - } - - private List readFromPos(long pos) throws IOException { - List lines = new ArrayList<>(); - List dataList = new ArrayList<>(); - RandomAccessFile input = new RandomAccessFile(file, "r"); - bytePosition = readLines(input, pos, lines, BATCH_READ_LINE_COUNT, BATCH_READ_LINE_TOTAL_LEN, false); - for (int i = 0; i < lines.size(); i++) { - linePosition++; - dataList.add(new SourceData(lines.get(i), linePosition)); - } - if (input != null) { - input.close(); - } - return dataList; - } - - @Override - public void destroy() { - LOGGER.info("destroy read source name {}", fileName); - stopRunning(); - while (running) { - AgentUtils.silenceSleepInMs(1); - } - clearQueue(queue); - LOGGER.info("destroy read source name {} end", fileName); - } - - private void clearQueue(BlockingQueue queue) { - if (queue == null) { - return; - } - while (queue != null && !queue.isEmpty()) { - SourceData sourceData = null; - try { - sourceData = queue.poll(READ_WAIT_TIMEOUT_MS, TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - LOGGER.warn("poll {} data get interrupted.", file.getPath(), e); - } - if (sourceData != null) { - MemoryManager.getInstance().release(AGENT_GLOBAL_READER_QUEUE_PERMIT, sourceData.data.length()); - } - } - queue.clear(); - } - @Override - public boolean sourceFinish() { - if (isRealTime) { - return false; - } - return emptyCount > EMPTY_CHECK_COUNT_AT_LEAST; + protected boolean isRunnable() { + return runnable && fileExist && !isInodeChanged(); } @Override diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/MongoDBSource.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/MongoDBSource.java index 2f96fe66a2e..5cc5717bcf2 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/MongoDBSource.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/MongoDBSource.java @@ -44,11 +44,36 @@ public List split(TaskProfile conf) { return readerList; } + @Override + protected String getThreadName() { + return null; + } + + @Override + protected void printCurrentState() { + + } + + @Override + protected boolean doPrepareToRead() { + return false; + } + + @Override + protected List readFromSource() { + return null; + } + @Override public Message read() { return null; } + @Override + protected boolean isRunnable() { + return runnable; + } + @Override public boolean sourceFinish() { return false; diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/MqttSource.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/MqttSource.java index bb333c2d272..c438e81de96 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/MqttSource.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/MqttSource.java @@ -79,11 +79,36 @@ public List split(TaskProfile conf) { return readerList; } + @Override + protected String getThreadName() { + return null; + } + + @Override + protected void printCurrentState() { + + } + + @Override + protected boolean doPrepareToRead() { + return false; + } + + @Override + protected List readFromSource() { + return null; + } + @Override public Message read() { return null; } + @Override + protected boolean isRunnable() { + return runnable; + } + @Override public boolean sourceFinish() { return false; diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/OracleSource.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/OracleSource.java index d4c86e7634b..323f8b5305b 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/OracleSource.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/OracleSource.java @@ -48,11 +48,36 @@ public List split(TaskProfile conf) { return readerList; } + @Override + protected String getThreadName() { + return null; + } + + @Override + protected void printCurrentState() { + + } + + @Override + protected boolean doPrepareToRead() { + return false; + } + + @Override + protected List readFromSource() { + return null; + } + @Override public Message read() { return null; } + @Override + protected boolean isRunnable() { + return runnable; + } + @Override public boolean sourceFinish() { return false; diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PostgreSQLSource.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PostgreSQLSource.java index 58f03cfa9bb..37effaf04ff 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PostgreSQLSource.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PostgreSQLSource.java @@ -49,11 +49,36 @@ public List split(TaskProfile conf) { return readerList; } + @Override + protected String getThreadName() { + return null; + } + + @Override + protected void printCurrentState() { + + } + + @Override + protected boolean doPrepareToRead() { + return false; + } + + @Override + protected List readFromSource() { + return null; + } + @Override public Message read() { return null; } + @Override + protected boolean isRunnable() { + return runnable; + } + @Override public boolean sourceFinish() { return false; diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PulsarSource.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PulsarSource.java index c64653e2a3b..3cfa37af22b 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PulsarSource.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PulsarSource.java @@ -17,23 +17,12 @@ package org.apache.inlong.agent.plugin.sources; -import org.apache.inlong.agent.common.AgentThreadFactory; import org.apache.inlong.agent.conf.InstanceProfile; import org.apache.inlong.agent.conf.TaskProfile; -import org.apache.inlong.agent.constant.CycleUnitType; -import org.apache.inlong.agent.core.task.MemoryManager; import org.apache.inlong.agent.except.FileException; -import org.apache.inlong.agent.message.DefaultMessage; -import org.apache.inlong.agent.metrics.audit.AuditUtils; -import org.apache.inlong.agent.plugin.Message; import org.apache.inlong.agent.plugin.file.Reader; import org.apache.inlong.agent.plugin.sources.file.AbstractSource; -import org.apache.inlong.agent.utils.AgentUtils; -import lombok.AllArgsConstructor; -import lombok.Data; -import lombok.NoArgsConstructor; -import org.apache.commons.codec.digest.DigestUtils; import org.apache.commons.lang3.ObjectUtils; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.PulsarClient; @@ -44,26 +33,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.HashMap; +import java.util.ArrayList; import java.util.List; -import java.util.Map; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; -import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_PACKAGE_MAX_SIZE; -import static org.apache.inlong.agent.constant.CommonConstants.PROXY_KEY_DATA; -import static org.apache.inlong.agent.constant.CommonConstants.PROXY_KEY_STREAM_ID; -import static org.apache.inlong.agent.constant.CommonConstants.PROXY_PACKAGE_MAX_SIZE; -import static org.apache.inlong.agent.constant.CommonConstants.PROXY_SEND_PARTITION_KEY; -import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_READER_QUEUE_PERMIT; -import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_READER_SOURCE_PERMIT; -import static org.apache.inlong.agent.constant.TaskConstants.OFFSET; import static org.apache.inlong.agent.constant.TaskConstants.RESTORE_FROM_DB; -import static org.apache.inlong.agent.constant.TaskConstants.TASK_CYCLE_UNIT; import static org.apache.inlong.agent.constant.TaskConstants.TASK_PULSAR_RESET_TIME; import static org.apache.inlong.agent.constant.TaskConstants.TASK_PULSAR_SERVICE_URL; import static org.apache.inlong.agent.constant.TaskConstants.TASK_PULSAR_SUBSCRIPTION; @@ -72,27 +46,7 @@ public class PulsarSource extends AbstractSource { - @Data - @AllArgsConstructor - @NoArgsConstructor - private class SourceData { - - private byte[] data; - private Long offset; - } - private static final Logger LOGGER = LoggerFactory.getLogger(PulsarSource.class); - private static final ThreadPoolExecutor EXECUTOR_SERVICE = new ThreadPoolExecutor( - 0, Integer.MAX_VALUE, - 1L, TimeUnit.SECONDS, - new SynchronousQueue<>(), - new AgentThreadFactory("pulsar-source")); - private BlockingQueue queue; - public InstanceProfile profile; - private int maxPackSize; - private String inlongStreamId; - private String taskId; - private String instanceId; private String topic; private String serviceUrl; private String subscription; @@ -100,18 +54,10 @@ private class SourceData { private String subscriptionPosition; private PulsarClient pulsarClient; private Long timestamp; - private volatile boolean running = false; - private volatile boolean runnable = true; - private volatile AtomicLong emptyCount = new AtomicLong(0); - - private final Integer CACHE_QUEUE_SIZE = 100000; - private final Integer READ_WAIT_TIMEOUT_MS = 10; - private final Integer EMPTY_CHECK_COUNT_AT_LEAST = 5 * 60; - private final Integer BATCH_TOTAL_LEN = 1024 * 1024; - private final Integer CORE_THREAD_PRINT_INTERVAL_MS = 1000; private final static String PULSAR_SUBSCRIPTION_PREFIX = "inlong-agent-"; - private boolean isRealTime = false; private boolean isRestoreFromDB = false; + private Consumer consumer; + private long offset = 0L; public PulsarSource() { } @@ -120,18 +66,7 @@ public PulsarSource() { public void init(InstanceProfile profile) { try { LOGGER.info("PulsarSource init: {}", profile.toJsonStr()); - this.profile = profile; super.init(profile); - String cycleUnit = profile.get(TASK_CYCLE_UNIT); - if (cycleUnit.compareToIgnoreCase(CycleUnitType.REAL_TIME) == 0) { - isRealTime = true; - cycleUnit = CycleUnitType.HOUR; - } - queue = new LinkedBlockingQueue<>(CACHE_QUEUE_SIZE); - maxPackSize = profile.getInt(PROXY_PACKAGE_MAX_SIZE, DEFAULT_PROXY_PACKAGE_MAX_SIZE); - inlongStreamId = profile.getInlongStreamId(); - taskId = profile.getTaskId(); - instanceId = profile.getInstanceId(); topic = profile.getInstanceId(); serviceUrl = profile.get(TASK_PULSAR_SERVICE_URL); subscription = profile.get(TASK_PULSAR_SUBSCRIPTION, PULSAR_SUBSCRIPTION_PREFIX + inlongStreamId); @@ -141,183 +76,84 @@ public void init(InstanceProfile profile) { timestamp = profile.getLong(TASK_PULSAR_RESET_TIME, 0); pulsarClient = PulsarClient.builder().serviceUrl(serviceUrl).build(); isRestoreFromDB = profile.getBoolean(RESTORE_FROM_DB, false); - - EXECUTOR_SERVICE.execute(run()); + consumer = getConsumer(); } catch (Exception ex) { stopRunning(); throw new FileException("error init stream for " + topic, ex); } } - private Runnable run() { - return () -> { - AgentThreadFactory.nameThread("pulsar-source-" + taskId + "-" + instanceId); - running = true; - try { - try (Consumer consumer = pulsarClient.newConsumer(Schema.BYTES) - .topic(topic) - .subscriptionName(subscription) - .subscriptionInitialPosition(SubscriptionInitialPosition.valueOf(subscriptionPosition)) - .subscriptionType(SubscriptionType.valueOf(subscriptionType)) - .subscribe()) { - - if (!isRestoreFromDB && timestamp != 0L) { - consumer.seek(timestamp); - LOGGER.info("Reset consume from {}", timestamp); - } else { - LOGGER.info("Skip to reset consume"); - } - - doRun(consumer); - } - } catch (Throwable e) { - LOGGER.error("do run error maybe pulsar client is configured incorrectly: ", e); - } - running = false; - }; - } - - private void doRun(Consumer consumer) throws PulsarClientException { - long lastPrintTime = 0; - while (isRunnable()) { - boolean suc = waitForPermit(AGENT_GLOBAL_READER_SOURCE_PERMIT, BATCH_TOTAL_LEN); - if (!suc) { - break; - } - org.apache.pulsar.client.api.Message message = consumer.receive(0, TimeUnit.MILLISECONDS); - if (ObjectUtils.isEmpty(message)) { - if (queue.isEmpty()) { - emptyCount.incrementAndGet(); - } else { - emptyCount.set(0); - } - MemoryManager.getInstance().release(AGENT_GLOBAL_READER_SOURCE_PERMIT, BATCH_TOTAL_LEN); - AgentUtils.silenceSleepInSeconds(1); - continue; - } - emptyCount.set(0); - long offset = 0L; - SourceData sourceData = new SourceData(message.getValue(), 0L); - boolean suc4Queue = waitForPermit(AGENT_GLOBAL_READER_QUEUE_PERMIT, message.getValue().length); - if (!suc4Queue) { - break; - } - putIntoQueue(sourceData); - MemoryManager.getInstance().release(AGENT_GLOBAL_READER_SOURCE_PERMIT, BATCH_TOTAL_LEN); - consumer.acknowledge(message); - - if (AgentUtils.getCurrentTime() - lastPrintTime > CORE_THREAD_PRINT_INTERVAL_MS) { - lastPrintTime = AgentUtils.getCurrentTime(); - LOGGER.info("pulsar topic is {}, offset is {}", topic, offset); - } - } + @Override + protected String getThreadName() { + return "pulsar-source-" + taskId + "-" + instanceId; } - public boolean isRunnable() { - return runnable; + @Override + protected boolean doPrepareToRead() { + return true; } - private boolean waitForPermit(String permitName, int permitLen) { - boolean suc = false; - while (!suc) { - suc = MemoryManager.getInstance().tryAcquire(permitName, permitLen); - if (!suc) { - MemoryManager.getInstance().printDetail(permitName, "log file source"); - if (!isRunnable()) { - return false; - } - AgentUtils.silenceSleepInSeconds(1); - } + @Override + protected List readFromSource() { + List dataList = new ArrayList<>(); + org.apache.pulsar.client.api.Message message = null; + try { + message = consumer.receive(0, TimeUnit.MILLISECONDS); + offset = message.getSequenceId(); + } catch (PulsarClientException e) { + LOGGER.error("read from pulsar error", e); } - return true; + if (!ObjectUtils.isEmpty(message)) { + dataList.add(new SourceData(message.getValue(), 0L)); + } + try { + consumer.acknowledge(message); + } catch (PulsarClientException e) { + LOGGER.error("ack pulsar error", e); + } + return dataList; } - private void putIntoQueue(SourceData sourceData) { - if (sourceData == null) { - return; - } + private Consumer getConsumer() { try { - boolean offerSuc = false; - if (queue.remainingCapacity() > 0) { - while (isRunnable() && !offerSuc) { - offerSuc = queue.offer(sourceData, 1, TimeUnit.SECONDS); + try (Consumer consumer = pulsarClient.newConsumer(Schema.BYTES) + .topic(topic) + .subscriptionName(subscription) + .subscriptionInitialPosition(SubscriptionInitialPosition.valueOf(subscriptionPosition)) + .subscriptionType(SubscriptionType.valueOf(subscriptionType)) + .subscribe()) { + + if (!isRestoreFromDB && timestamp != 0L) { + consumer.seek(timestamp); + LOGGER.info("Reset consume from {}", timestamp); + } else { + LOGGER.info("Skip to reset consume"); } + return consumer; } - - if (!offerSuc) { - MemoryManager.getInstance().release(AGENT_GLOBAL_READER_QUEUE_PERMIT, sourceData.data.length); - } - LOGGER.debug("Read {} from pulsar topic {}", sourceData.getData(), topic); - } catch (InterruptedException e) { - MemoryManager.getInstance().release(AGENT_GLOBAL_READER_QUEUE_PERMIT, sourceData.data.length); - LOGGER.error("fetchData offer failed {}", e.getMessage()); + } catch (Throwable e) { + LOGGER.error("do run error maybe pulsar client is configured incorrectly: ", e); } - } - - @Override - public List split(TaskProfile conf) { return null; } @Override - public Message read() { - SourceData sourceData = null; - try { - sourceData = queue.poll(READ_WAIT_TIMEOUT_MS, TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - LOGGER.warn("poll {} data get interrupted.", topic, e); - } - if (sourceData == null) { - return null; - } - MemoryManager.getInstance().release(AGENT_GLOBAL_READER_QUEUE_PERMIT, sourceData.data.length); - Message finalMsg = createMessage(sourceData); - return finalMsg; + protected void printCurrentState() { + LOGGER.info("pulsar topic is {}, offset is {}", topic, offset); } - private Message createMessage(SourceData sourceData) { - String proxyPartitionKey = profile.get(PROXY_SEND_PARTITION_KEY, DigestUtils.md5Hex(inlongGroupId)); - Map header = new HashMap<>(); - header.put(PROXY_KEY_DATA, proxyPartitionKey); - header.put(OFFSET, sourceData.offset.toString()); - header.put(PROXY_KEY_STREAM_ID, inlongStreamId); - - long auditTime; - if (isRealTime) { - auditTime = AgentUtils.getCurrentTime(); - } else { - auditTime = profile.getSinkDataTime(); - } - AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS, inlongGroupId, header.get(PROXY_KEY_STREAM_ID), - auditTime, 1, sourceData.data.length); - AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS_REAL_TIME, inlongGroupId, header.get(PROXY_KEY_STREAM_ID), - AgentUtils.getCurrentTime(), 1, sourceData.data.length); - Message finalMsg = new DefaultMessage(sourceData.data, header); - if (finalMsg.getBody().length > maxPackSize) { - LOGGER.warn("message size is {}, greater than max pack size {}, drop it!", - finalMsg.getBody().length, maxPackSize); - return null; - } - return finalMsg; + @Override + protected boolean isRunnable() { + return runnable; } @Override - public boolean sourceFinish() { - if (isRealTime) { - return false; - } - return emptyCount.get() > EMPTY_CHECK_COUNT_AT_LEAST; + public List split(TaskProfile conf) { + return null; } @Override public boolean sourceExist() { return true; } - - /** - * Stop running threads. - */ - public void stopRunning() { - runnable = false; - } } diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/RedisSource.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/RedisSource.java index 9f3671355ee..8405eca1ec1 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/RedisSource.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/RedisSource.java @@ -49,11 +49,36 @@ public List split(TaskProfile conf) { return readerList; } + @Override + protected String getThreadName() { + return null; + } + + @Override + protected void printCurrentState() { + + } + + @Override + protected boolean doPrepareToRead() { + return false; + } + + @Override + protected List readFromSource() { + return null; + } + @Override public Message read() { return null; } + @Override + protected boolean isRunnable() { + return runnable; + } + @Override public boolean sourceFinish() { return false; diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/SQLServerSource.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/SQLServerSource.java index f199f983ebb..6539c621c76 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/SQLServerSource.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/SQLServerSource.java @@ -48,11 +48,36 @@ public List split(TaskProfile conf) { return readerList; } + @Override + protected String getThreadName() { + return null; + } + + @Override + protected void printCurrentState() { + + } + + @Override + protected boolean doPrepareToRead() { + return false; + } + + @Override + protected List readFromSource() { + return null; + } + @Override public Message read() { return null; } + @Override + protected boolean isRunnable() { + return runnable; + } + @Override public boolean sourceFinish() { return false; diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java index 5dc79377deb..50313d8bb15 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java @@ -17,22 +17,77 @@ package org.apache.inlong.agent.plugin.sources.file; +import org.apache.inlong.agent.common.AgentThreadFactory; import org.apache.inlong.agent.conf.InstanceProfile; +import org.apache.inlong.agent.conf.OffsetProfile; +import org.apache.inlong.agent.constant.CycleUnitType; +import org.apache.inlong.agent.constant.TaskConstants; +import org.apache.inlong.agent.core.task.MemoryManager; +import org.apache.inlong.agent.core.task.OffsetManager; +import org.apache.inlong.agent.message.DefaultMessage; import org.apache.inlong.agent.metrics.AgentMetricItem; import org.apache.inlong.agent.metrics.AgentMetricItemSet; +import org.apache.inlong.agent.metrics.audit.AuditUtils; +import org.apache.inlong.agent.plugin.Message; import org.apache.inlong.agent.plugin.file.Source; +import org.apache.inlong.agent.plugin.sources.file.extend.ExtendedHandler; +import org.apache.inlong.agent.utils.AgentUtils; import org.apache.inlong.common.metric.MetricRegister; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; +import org.apache.commons.codec.digest.DigestUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_PACKAGE_MAX_SIZE; +import static org.apache.inlong.agent.constant.CommonConstants.PROXY_KEY_DATA; +import static org.apache.inlong.agent.constant.CommonConstants.PROXY_KEY_STREAM_ID; +import static org.apache.inlong.agent.constant.CommonConstants.PROXY_PACKAGE_MAX_SIZE; +import static org.apache.inlong.agent.constant.CommonConstants.PROXY_SEND_PARTITION_KEY; +import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_READER_QUEUE_PERMIT; +import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_READER_SOURCE_PERMIT; +import static org.apache.inlong.agent.constant.TaskConstants.DEFAULT_FILE_SOURCE_EXTEND_CLASS; +import static org.apache.inlong.agent.constant.TaskConstants.OFFSET; +import static org.apache.inlong.agent.constant.TaskConstants.TASK_CYCLE_UNIT; import static org.apache.inlong.agent.metrics.AgentMetricItem.KEY_INLONG_GROUP_ID; import static org.apache.inlong.agent.metrics.AgentMetricItem.KEY_INLONG_STREAM_ID; import static org.apache.inlong.agent.metrics.AgentMetricItem.KEY_PLUGIN_ID; public abstract class AbstractSource implements Source { + @Data + @AllArgsConstructor + @NoArgsConstructor + protected class SourceData { + + private byte[] data; + private Long offset; + } + + private static final Logger LOGGER = LoggerFactory.getLogger(AbstractSource.class); + + protected final Integer BATCH_READ_LINE_COUNT = 10000; + protected final Integer BATCH_READ_LINE_TOTAL_LEN = 1024 * 1024; + protected final Integer CACHE_QUEUE_SIZE = 10 * BATCH_READ_LINE_COUNT; + protected final Integer READ_WAIT_TIMEOUT_MS = 10; + private final Integer EMPTY_CHECK_COUNT_AT_LEAST = 5 * 60; + private final Integer CORE_THREAD_PRINT_INTERVAL_MS = 1000; + protected BlockingQueue queue; + protected String inlongGroupId; protected String inlongStreamId; // metric @@ -41,11 +96,169 @@ public abstract class AbstractSource implements Source { protected String metricName; protected Map dimensions; protected static final AtomicLong METRIX_INDEX = new AtomicLong(0); + protected volatile boolean runnable = true; + protected volatile boolean running = false; + protected String taskId; + protected String instanceId; + protected InstanceProfile profile; + private ExtendedHandler extendedHandler; + private boolean isRealTime = false; + protected volatile long emptyCount = 0; + protected int maxPackSize; + private static final ThreadPoolExecutor EXECUTOR_SERVICE = new ThreadPoolExecutor( + 0, Integer.MAX_VALUE, + 1L, TimeUnit.SECONDS, + new SynchronousQueue<>(), + new AgentThreadFactory("source-pool")); + protected OffsetProfile offsetProfile; @Override public void init(InstanceProfile profile) { + this.profile = profile; + taskId = profile.getTaskId(); + instanceId = profile.getInstanceId(); inlongGroupId = profile.getInlongGroupId(); inlongStreamId = profile.getInlongStreamId(); + maxPackSize = profile.getInt(PROXY_PACKAGE_MAX_SIZE, DEFAULT_PROXY_PACKAGE_MAX_SIZE); + queue = new LinkedBlockingQueue<>(CACHE_QUEUE_SIZE); + String cycleUnit = profile.get(TASK_CYCLE_UNIT); + if (cycleUnit.compareToIgnoreCase(CycleUnitType.REAL_TIME) == 0) { + isRealTime = true; + } + initOffset(); + registerMetric(); + initExtendHandler(); + } + + protected void initOffset() { + offsetProfile = OffsetManager.getInstance().getOffset(taskId, instanceId); + } + + @Override + public void start() { + EXECUTOR_SERVICE.execute(run()); + } + + private Runnable run() { + return () -> { + AgentThreadFactory.nameThread(getThreadName()); + running = true; + try { + doRun(); + } catch (Throwable e) { + LOGGER.error("do run error maybe file deleted: ", e); + } + running = false; + }; + } + + private void doRun() { + long lastPrintTime = 0; + while (isRunnable()) { + if (!prepareToRead()) { + break; + } + List lines = readFromSource(); + if (lines != null && lines.isEmpty()) { + if (queue.isEmpty()) { + emptyCount++; + } else { + emptyCount = 0; + } + MemoryManager.getInstance().release(AGENT_GLOBAL_READER_SOURCE_PERMIT, BATCH_READ_LINE_TOTAL_LEN); + AgentUtils.silenceSleepInSeconds(1); + continue; + } + emptyCount = 0; + for (int i = 0; i < lines.size(); i++) { + boolean suc4Queue = waitForPermit(AGENT_GLOBAL_READER_QUEUE_PERMIT, lines.get(i).getData().length); + if (!suc4Queue) { + break; + } + putIntoQueue(lines.get(i)); + } + MemoryManager.getInstance().release(AGENT_GLOBAL_READER_SOURCE_PERMIT, BATCH_READ_LINE_TOTAL_LEN); + if (AgentUtils.getCurrentTime() - lastPrintTime > CORE_THREAD_PRINT_INTERVAL_MS) { + lastPrintTime = AgentUtils.getCurrentTime(); + printCurrentState(); + } + } + } + + protected abstract void printCurrentState(); + + /** + * Before reading the data source, some preparation operations need to be done, such as memory control semaphore + * application and data source legitimacy verification + * + * @return true if prepared ok + */ + private boolean prepareToRead() { + if (!doPrepareToRead()) { + return false; + } + return waitForPermit(AGENT_GLOBAL_READER_SOURCE_PERMIT, BATCH_READ_LINE_TOTAL_LEN); + } + + /** + * Except for applying for memory control semaphores, all other preparatory work is implemented by this function + * + * @return true if prepared ok + */ + protected abstract boolean doPrepareToRead(); + + /** + * After preparation work, we started to truly read data from the data source + * + * @return source data list + */ + protected abstract List readFromSource(); + + private boolean waitForPermit(String permitName, int permitLen) { + boolean suc = false; + while (!suc) { + suc = MemoryManager.getInstance().tryAcquire(permitName, permitLen); + if (!suc) { + MemoryManager.getInstance().printDetail(permitName, "source"); + if (!isRunnable()) { + return false; + } + AgentUtils.silenceSleepInSeconds(1); + } + } + return true; + } + + /** + * After preparation work, we started to truly read data from the data source + */ + private void putIntoQueue(SourceData sourceData) { + if (sourceData == null) { + return; + } + try { + boolean offerSuc = false; + while (isRunnable() && offerSuc != true) { + offerSuc = queue.offer(sourceData, 1, TimeUnit.SECONDS); + } + if (!offerSuc) { + MemoryManager.getInstance().release(AGENT_GLOBAL_READER_QUEUE_PERMIT, sourceData.getData().length); + } + LOGGER.debug("Read {} from source {}", sourceData.getData(), inlongGroupId); + } catch (InterruptedException e) { + MemoryManager.getInstance().release(AGENT_GLOBAL_READER_QUEUE_PERMIT, sourceData.getData().length); + LOGGER.error("fetchData offer failed", e); + } + } + + /** + * Returns the name of the thread, used to identify different data sources + * + * @return source data list + */ + protected abstract String getThreadName(); + + private void registerMetric() { // register metric this.dimensions = new HashMap<>(); dimensions.put(KEY_PLUGIN_ID, this.getClass().getSimpleName()); @@ -58,7 +271,123 @@ public void init(InstanceProfile profile) { sourceMetric = metricItemSet.findMetricItem(dimensions); } + private void initExtendHandler() { + if (DEFAULT_FILE_SOURCE_EXTEND_CLASS.compareTo(ExtendedHandler.class.getCanonicalName()) != 0) { + Constructor constructor = + null; + try { + constructor = Class.forName( + profile.get(TaskConstants.FILE_SOURCE_EXTEND_CLASS, DEFAULT_FILE_SOURCE_EXTEND_CLASS)) + .getDeclaredConstructor(InstanceProfile.class); + } catch (NoSuchMethodException e) { + LOGGER.error("init {} NoSuchMethodException error", e); + } catch (ClassNotFoundException e) { + LOGGER.error("init {} ClassNotFoundException error", e); + } + constructor.setAccessible(true); + try { + extendedHandler = (ExtendedHandler) constructor.newInstance(profile); + } catch (InstantiationException e) { + LOGGER.error("init {} InstantiationException error", e); + } catch (IllegalAccessException e) { + LOGGER.error("init {} IllegalAccessException error", e); + } catch (InvocationTargetException e) { + LOGGER.error("init {} InvocationTargetException error", e); + } + } + } + + @Override + public Message read() { + SourceData sourceData = null; + try { + sourceData = queue.poll(READ_WAIT_TIMEOUT_MS, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + LOGGER.warn("poll {} data get interrupted.", instanceId); + } + if (sourceData == null) { + return null; + } + MemoryManager.getInstance().release(AGENT_GLOBAL_READER_QUEUE_PERMIT, sourceData.getData().length); + return createMessage(sourceData); + } + + private Message createMessage(SourceData sourceData) { + String proxyPartitionKey = profile.get(PROXY_SEND_PARTITION_KEY, DigestUtils.md5Hex(inlongGroupId)); + Map header = new HashMap<>(); + header.put(PROXY_KEY_DATA, proxyPartitionKey); + header.put(OFFSET, sourceData.getOffset().toString()); + header.put(PROXY_KEY_STREAM_ID, inlongStreamId); + if (extendedHandler != null) { + extendedHandler.dealWithHeader(header, sourceData.getData()); + } + long auditTime = 0; + if (isRealTime) { + auditTime = AgentUtils.getCurrentTime(); + } else { + auditTime = profile.getSinkDataTime(); + } + AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS, inlongGroupId, header.get(PROXY_KEY_STREAM_ID), + auditTime, 1, sourceData.getData().length); + AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS_REAL_TIME, inlongGroupId, header.get(PROXY_KEY_STREAM_ID), + AgentUtils.getCurrentTime(), 1, sourceData.getData().length); + Message finalMsg = new DefaultMessage(sourceData.getData(), header); + // if the message size is greater than max pack size,should drop it. + if (finalMsg.getBody().length > maxPackSize) { + LOGGER.warn("message size is {}, greater than max pack size {}, drop it!", + finalMsg.getBody().length, maxPackSize); + return null; + } + return finalMsg; + } + + /** + * Whether threads can in running state with while loop. + * + * @return true if threads can run + */ + protected abstract boolean isRunnable(); + + /** + * Stop running threads. + */ + public void stopRunning() { + runnable = false; + } + public void destroy() { + LOGGER.info("destroy read source name {}", instanceId); + stopRunning(); + while (running) { + AgentUtils.silenceSleepInMs(1); + } + clearQueue(queue); + LOGGER.info("destroy read source name {} end", instanceId); + } + private void clearQueue(BlockingQueue queue) { + if (queue == null) { + return; + } + while (queue != null && !queue.isEmpty()) { + SourceData sourceData = null; + try { + sourceData = queue.poll(READ_WAIT_TIMEOUT_MS, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + LOGGER.warn("poll {} data get interrupted.", instanceId, e); + } + if (sourceData != null) { + MemoryManager.getInstance().release(AGENT_GLOBAL_READER_QUEUE_PERMIT, sourceData.getData().length); + } + } + queue.clear(); + } + + @Override + public boolean sourceFinish() { + if (isRealTime) { + return false; + } + return emptyCount > EMPTY_CHECK_COUNT_AT_LEAST; } } diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/KubernetesMetadataProvider.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/KubernetesMetadataProvider.java deleted file mode 100644 index 63b0ae32ba9..00000000000 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/KubernetesMetadataProvider.java +++ /dev/null @@ -1,121 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.inlong.agent.plugin.sources.reader.file; - -import org.apache.inlong.agent.conf.InstanceProfile; -import org.apache.inlong.agent.plugin.sources.LogFileSource; -import org.apache.inlong.agent.plugin.utils.MetaDataUtils; -import org.apache.inlong.agent.plugin.utils.PluginUtils; - -import com.google.gson.Gson; -import io.fabric8.kubernetes.api.model.Pod; -import io.fabric8.kubernetes.api.model.PodList; -import io.fabric8.kubernetes.client.KubernetesClient; -import io.fabric8.kubernetes.client.dsl.MixedOperation; -import io.fabric8.kubernetes.client.dsl.PodResource; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.Map; -import java.util.Objects; - -import static org.apache.inlong.agent.constant.KubernetesConstants.CONTAINER_ID; -import static org.apache.inlong.agent.constant.KubernetesConstants.CONTAINER_NAME; -import static org.apache.inlong.agent.constant.KubernetesConstants.NAMESPACE; -import static org.apache.inlong.agent.constant.KubernetesConstants.POD_NAME; -import static org.apache.inlong.agent.constant.MetadataConstants.METADATA_CONTAINER_ID; -import static org.apache.inlong.agent.constant.MetadataConstants.METADATA_CONTAINER_NAME; -import static org.apache.inlong.agent.constant.MetadataConstants.METADATA_NAMESPACE; -import static org.apache.inlong.agent.constant.MetadataConstants.METADATA_POD_LABEL; -import static org.apache.inlong.agent.constant.MetadataConstants.METADATA_POD_NAME; -import static org.apache.inlong.agent.constant.MetadataConstants.METADATA_POD_UID; - -/** - * k8s file reader - */ -public final class KubernetesMetadataProvider { - - private static final Logger log = LoggerFactory.getLogger(KubernetesMetadataProvider.class); - private static final Gson GSON = new Gson(); - - private KubernetesClient client; - private LogFileSource fileReaderOperator; - - public KubernetesMetadataProvider(LogFileSource fileReaderOperator) { - this.fileReaderOperator = fileReaderOperator; - } - - public void getData() { - if (Objects.nonNull(client) && Objects.nonNull(fileReaderOperator.metadata)) { - return; - } - try { - client = PluginUtils.getKubernetesClient(); - } catch (IOException e) { - log.error("get k8s client error: ", e); - } - getK8sMetadata(fileReaderOperator.profile); - } - - /** - * get PODS of kubernetes information - */ - public PodList getPods() { - if (Objects.isNull(client)) { - return null; - } - MixedOperation pods = client.pods(); - return pods.list(); - } - - /** - * get pod metadata by namespace and pod name - */ - public void getK8sMetadata(InstanceProfile jobConf) { - if (Objects.isNull(jobConf)) { - return; - } - Map k8sInfo = MetaDataUtils.getLogInfo(fileReaderOperator.getFile().getName()); - log.info("file name is: {}, k8s information size: {}", fileReaderOperator.getFile().getName(), k8sInfo.size()); - if (k8sInfo.isEmpty()) { - return; - } - - Map metadata = fileReaderOperator.metadata; - metadata.put(METADATA_NAMESPACE, k8sInfo.get(NAMESPACE)); - metadata.put(METADATA_CONTAINER_NAME, k8sInfo.get(CONTAINER_NAME)); - metadata.put(METADATA_CONTAINER_ID, k8sInfo.get(CONTAINER_ID)); - metadata.put(METADATA_POD_NAME, k8sInfo.get(POD_NAME)); - - PodResource podResource = client.pods().inNamespace(k8sInfo.get(NAMESPACE)) - .withName(k8sInfo.get(POD_NAME)); - if (Objects.isNull(podResource)) { - return; - } - Pod pod = podResource.get(); - PodList podList = client.pods().inNamespace(k8sInfo.get(NAMESPACE)).list(); - podList.getItems().forEach(data -> { - if (data.equals(pod)) { - metadata.put(METADATA_POD_UID, pod.getMetadata().getUid()); - metadata.put(METADATA_POD_LABEL, GSON.toJson(pod.getMetadata().getLabels())); - } - }); - return; - } -} diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java index fb404dea0a3..a9d683750c3 100644 --- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java @@ -94,6 +94,7 @@ private LogFileSource getSource(int taskId, long offset) { OffsetManager.getInstance().setOffset(offsetProfile); } source.init(instanceProfile); + source.start(); return source; } catch (Exception e) { LOGGER.error("source init error {}", e); From 4951a58167e41be1b8bdc1ef41f44f2644327dc4 Mon Sep 17 00:00:00 2001 From: justinwwhuang Date: Mon, 15 Apr 2024 10:14:41 +0800 Subject: [PATCH 2/5] Update inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java Co-authored-by: AloysZhang --- .../apache/inlong/agent/plugin/sources/file/AbstractSource.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java index 50313d8bb15..44603881a7c 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java @@ -238,7 +238,7 @@ private void putIntoQueue(SourceData sourceData) { } try { boolean offerSuc = false; - while (isRunnable() && offerSuc != true) { + while (isRunnable() && !offerSuc) { offerSuc = queue.offer(sourceData, 1, TimeUnit.SECONDS); } if (!offerSuc) { From ba2f7e0f252886947958ec350595f9c4422ccac1 Mon Sep 17 00:00:00 2001 From: wenweihuang Date: Mon, 15 Apr 2024 14:09:49 +0800 Subject: [PATCH 3/5] [INLONG-9982][Agent] Modify based on comments --- .../plugin/sources/DatabaseSqlSource.java | 5 ++ .../agent/plugin/sources/KafkaSource.java | 56 ++++++++++++------- .../agent/plugin/sources/LogFileSource.java | 19 +++++-- .../agent/plugin/sources/MongoDBSource.java | 5 ++ .../agent/plugin/sources/MqttSource.java | 5 ++ .../agent/plugin/sources/OracleSource.java | 5 ++ .../plugin/sources/PostgreSQLSource.java | 5 ++ .../agent/plugin/sources/PulsarSource.java | 36 +++++++++--- .../agent/plugin/sources/RedisSource.java | 5 ++ .../agent/plugin/sources/SQLServerSource.java | 5 ++ .../plugin/sources/file/AbstractSource.java | 16 ++++-- 11 files changed, 124 insertions(+), 38 deletions(-) diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/DatabaseSqlSource.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/DatabaseSqlSource.java index f24a919ca7b..2df77b4f830 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/DatabaseSqlSource.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/DatabaseSqlSource.java @@ -118,6 +118,11 @@ protected boolean isRunnable() { return runnable; } + @Override + protected void releaseSource() { + + } + @Override public boolean sourceFinish() { return false; diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/KafkaSource.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/KafkaSource.java index defbc780c79..3e24a79c200 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/KafkaSource.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/KafkaSource.java @@ -125,31 +125,38 @@ protected List readFromSource() { private KafkaConsumer getKafkaConsumer() { try { List partitionInfoList; - try (KafkaConsumer consumer = new KafkaConsumer<>(props)) { - partitionInfoList = consumer.partitionsFor(topic); - } + KafkaConsumer kafkaConsumer = null; props.put(KAFKA_SESSION_TIMEOUT, 30000); - try (KafkaConsumer kafkaConsumer = new KafkaConsumer<>(props)) { - if (null != partitionInfoList) { - List topicPartitions = new ArrayList<>(); - for (PartitionInfo partitionInfo : partitionInfoList) { - TopicPartition topicPartition = new TopicPartition(partitionInfo.topic(), - partitionInfo.partition()); - topicPartitions.add(topicPartition); - } - kafkaConsumer.assign(topicPartitions); - if (!isRestoreFromDB && StringUtils.isNotBlank(allPartitionOffsets)) { - for (TopicPartition topicPartition : topicPartitions) { - Long offset = partitionOffsets.get(topicPartition.partition()); - if (ObjectUtils.isNotEmpty(offset)) { - kafkaConsumer.seek(topicPartition, offset); - } + try { + kafkaConsumer = new KafkaConsumer<>(props); + partitionInfoList = kafkaConsumer.partitionsFor(topic); + if (partitionInfoList == null) { + kafkaConsumer.close(); + return null; + } + List topicPartitions = new ArrayList<>(); + for (PartitionInfo partitionInfo : partitionInfoList) { + TopicPartition topicPartition = new TopicPartition(partitionInfo.topic(), + partitionInfo.partition()); + topicPartitions.add(topicPartition); + } + kafkaConsumer.assign(topicPartitions); + if (!isRestoreFromDB && StringUtils.isNotBlank(allPartitionOffsets)) { + for (TopicPartition topicPartition : topicPartitions) { + Long offset = partitionOffsets.get(topicPartition.partition()); + if (ObjectUtils.isNotEmpty(offset)) { + kafkaConsumer.seek(topicPartition, offset); } - } else { - LOGGER.info("Skip to seek offset"); } - return kafkaConsumer; + } else { + LOGGER.info("Skip to seek offset"); + } + return kafkaConsumer; + } catch (Exception e) { + if (kafkaConsumer != null) { + kafkaConsumer.close(); } + LOGGER.error("get kafka consumer error", e); } } catch (Throwable e) { LOGGER.error("do run error maybe topic is configured incorrectly: ", e); @@ -176,4 +183,11 @@ public List split(TaskProfile conf) { public boolean sourceExist() { return true; } + + @Override + protected void releaseSource() { + if (kafkaConsumer != null) { + kafkaConsumer.close(); + } + } } diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java index 89574fa3622..037470fb419 100755 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java @@ -59,6 +59,7 @@ public class LogFileSource extends AbstractSource { private volatile boolean fileExist = true; private String inodeInfo; private volatile long lastInodeUpdateTime = 0; + private RandomAccessFile randomAccessFile; public LogFileSource() { } @@ -76,6 +77,7 @@ public void init(InstanceProfile profile) { lastInodeUpdateTime = AgentUtils.getCurrentTime(); linePosition = getInitLineOffset(isIncrement, taskId, instanceId, inodeInfo); bytePosition = getBytePositionByLine(linePosition); + randomAccessFile = new RandomAccessFile(file, "r"); } catch (Exception ex) { stopRunning(); throw new FileException("error init stream for " + file.getPath(), ex); @@ -126,15 +128,11 @@ protected String getThreadName() { private List readFromPos(long pos) throws IOException { List lines = new ArrayList<>(); List dataList = new ArrayList<>(); - RandomAccessFile input = new RandomAccessFile(file, "r"); - bytePosition = readLines(input, pos, lines, BATCH_READ_LINE_COUNT, BATCH_READ_LINE_TOTAL_LEN, false); + bytePosition = readLines(randomAccessFile, pos, lines, BATCH_READ_LINE_COUNT, BATCH_READ_LINE_TOTAL_LEN, false); for (int i = 0; i < lines.size(); i++) { linePosition++; dataList.add(new SourceData(lines.get(i), linePosition)); } - if (input != null) { - input.close(); - } return dataList; } @@ -306,4 +304,15 @@ public boolean sourceExist() { public List split(TaskProfile jobConf) { return null; } + + @Override + protected void releaseSource() { + if (randomAccessFile != null) { + try { + randomAccessFile.close(); + } catch (IOException e) { + LOGGER.error("close randomAccessFile error", e); + } + } + } } diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/MongoDBSource.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/MongoDBSource.java index 5cc5717bcf2..555d87b412a 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/MongoDBSource.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/MongoDBSource.java @@ -74,6 +74,11 @@ protected boolean isRunnable() { return runnable; } + @Override + protected void releaseSource() { + + } + @Override public boolean sourceFinish() { return false; diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/MqttSource.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/MqttSource.java index c438e81de96..5f14cf30271 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/MqttSource.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/MqttSource.java @@ -109,6 +109,11 @@ protected boolean isRunnable() { return runnable; } + @Override + protected void releaseSource() { + + } + @Override public boolean sourceFinish() { return false; diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/OracleSource.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/OracleSource.java index 323f8b5305b..1d3088a0aab 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/OracleSource.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/OracleSource.java @@ -78,6 +78,11 @@ protected boolean isRunnable() { return runnable; } + @Override + protected void releaseSource() { + + } + @Override public boolean sourceFinish() { return false; diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PostgreSQLSource.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PostgreSQLSource.java index 37effaf04ff..7a65767c1b1 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PostgreSQLSource.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PostgreSQLSource.java @@ -79,6 +79,11 @@ protected boolean isRunnable() { return runnable; } + @Override + protected void releaseSource() { + + } + @Override public boolean sourceFinish() { return false; diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PulsarSource.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PulsarSource.java index 3cfa37af22b..206f11b2db1 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PulsarSource.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PulsarSource.java @@ -116,13 +116,14 @@ protected List readFromSource() { private Consumer getConsumer() { try { - try (Consumer consumer = pulsarClient.newConsumer(Schema.BYTES) - .topic(topic) - .subscriptionName(subscription) - .subscriptionInitialPosition(SubscriptionInitialPosition.valueOf(subscriptionPosition)) - .subscriptionType(SubscriptionType.valueOf(subscriptionType)) - .subscribe()) { - + Consumer consumer = null; + try { + consumer = pulsarClient.newConsumer(Schema.BYTES) + .topic(topic) + .subscriptionName(subscription) + .subscriptionInitialPosition(SubscriptionInitialPosition.valueOf(subscriptionPosition)) + .subscriptionType(SubscriptionType.valueOf(subscriptionType)) + .subscribe(); if (!isRestoreFromDB && timestamp != 0L) { consumer.seek(timestamp); LOGGER.info("Reset consume from {}", timestamp); @@ -130,6 +131,16 @@ private Consumer getConsumer() { LOGGER.info("Skip to reset consume"); } return consumer; + } catch (PulsarClientException e) { + if (consumer == null) { + consumer.close(); + } + LOGGER.error("get consumer error", e); + } catch (IllegalArgumentException e) { + if (consumer == null) { + consumer.close(); + } + LOGGER.error("get consumer error", e); } } catch (Throwable e) { LOGGER.error("do run error maybe pulsar client is configured incorrectly: ", e); @@ -147,6 +158,17 @@ protected boolean isRunnable() { return runnable; } + @Override + protected void releaseSource() { + if (consumer != null) { + try { + consumer.close(); + } catch (PulsarClientException e) { + LOGGER.error("close consumer error", e); + } + } + } + @Override public List split(TaskProfile conf) { return null; diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/RedisSource.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/RedisSource.java index 8405eca1ec1..a844ba38cbe 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/RedisSource.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/RedisSource.java @@ -79,6 +79,11 @@ protected boolean isRunnable() { return runnable; } + @Override + protected void releaseSource() { + + } + @Override public boolean sourceFinish() { return false; diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/SQLServerSource.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/SQLServerSource.java index 6539c621c76..8fceb8f635e 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/SQLServerSource.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/SQLServerSource.java @@ -78,6 +78,11 @@ protected boolean isRunnable() { return runnable; } + @Override + protected void releaseSource() { + + } + @Override public boolean sourceFinish() { return false; diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java index 44603881a7c..d9e8b9834a7 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java @@ -280,19 +280,19 @@ private void initExtendHandler() { profile.get(TaskConstants.FILE_SOURCE_EXTEND_CLASS, DEFAULT_FILE_SOURCE_EXTEND_CLASS)) .getDeclaredConstructor(InstanceProfile.class); } catch (NoSuchMethodException e) { - LOGGER.error("init {} NoSuchMethodException error", e); + LOGGER.error("init {} NoSuchMethodException error", instanceId, e); } catch (ClassNotFoundException e) { - LOGGER.error("init {} ClassNotFoundException error", e); + LOGGER.error("init {} ClassNotFoundException error", instanceId, e); } constructor.setAccessible(true); try { extendedHandler = (ExtendedHandler) constructor.newInstance(profile); } catch (InstantiationException e) { - LOGGER.error("init {} InstantiationException error", e); + LOGGER.error("init {} InstantiationException error", instanceId, e); } catch (IllegalAccessException e) { - LOGGER.error("init {} IllegalAccessException error", e); + LOGGER.error("init {} IllegalAccessException error", instanceId, e); } catch (InvocationTargetException e) { - LOGGER.error("init {} InvocationTargetException error", e); + LOGGER.error("init {} InvocationTargetException error", instanceId, e); } } } @@ -362,9 +362,15 @@ public void destroy() { AgentUtils.silenceSleepInMs(1); } clearQueue(queue); + releaseSource(); LOGGER.info("destroy read source name {} end", instanceId); } + /** + * Release the source resource if needed + */ + protected abstract void releaseSource(); + private void clearQueue(BlockingQueue queue) { if (queue == null) { return; From 4b99136eacc7e52b031fc9526d92f8607f85e8c7 Mon Sep 17 00:00:00 2001 From: wenweihuang Date: Mon, 15 Apr 2024 15:39:08 +0800 Subject: [PATCH 4/5] [INLONG-9982][Agent] Modify based on comments --- .../agent/plugin/sources/KafkaSource.java | 62 +++++++++---------- .../agent/plugin/sources/PulsarSource.java | 50 ++++++++------- 2 files changed, 56 insertions(+), 56 deletions(-) diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/KafkaSource.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/KafkaSource.java index 3e24a79c200..42e6d7c70fb 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/KafkaSource.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/KafkaSource.java @@ -123,43 +123,39 @@ protected List readFromSource() { } private KafkaConsumer getKafkaConsumer() { + List partitionInfoList; + KafkaConsumer kafkaConsumer = null; + props.put(KAFKA_SESSION_TIMEOUT, 30000); try { - List partitionInfoList; - KafkaConsumer kafkaConsumer = null; - props.put(KAFKA_SESSION_TIMEOUT, 30000); - try { - kafkaConsumer = new KafkaConsumer<>(props); - partitionInfoList = kafkaConsumer.partitionsFor(topic); - if (partitionInfoList == null) { - kafkaConsumer.close(); - return null; - } - List topicPartitions = new ArrayList<>(); - for (PartitionInfo partitionInfo : partitionInfoList) { - TopicPartition topicPartition = new TopicPartition(partitionInfo.topic(), - partitionInfo.partition()); - topicPartitions.add(topicPartition); - } - kafkaConsumer.assign(topicPartitions); - if (!isRestoreFromDB && StringUtils.isNotBlank(allPartitionOffsets)) { - for (TopicPartition topicPartition : topicPartitions) { - Long offset = partitionOffsets.get(topicPartition.partition()); - if (ObjectUtils.isNotEmpty(offset)) { - kafkaConsumer.seek(topicPartition, offset); - } + kafkaConsumer = new KafkaConsumer<>(props); + partitionInfoList = kafkaConsumer.partitionsFor(topic); + if (partitionInfoList == null) { + kafkaConsumer.close(); + return null; + } + List topicPartitions = new ArrayList<>(); + for (PartitionInfo partitionInfo : partitionInfoList) { + TopicPartition topicPartition = new TopicPartition(partitionInfo.topic(), + partitionInfo.partition()); + topicPartitions.add(topicPartition); + } + kafkaConsumer.assign(topicPartitions); + if (!isRestoreFromDB && StringUtils.isNotBlank(allPartitionOffsets)) { + for (TopicPartition topicPartition : topicPartitions) { + Long offset = partitionOffsets.get(topicPartition.partition()); + if (ObjectUtils.isNotEmpty(offset)) { + kafkaConsumer.seek(topicPartition, offset); } - } else { - LOGGER.info("Skip to seek offset"); - } - return kafkaConsumer; - } catch (Exception e) { - if (kafkaConsumer != null) { - kafkaConsumer.close(); } - LOGGER.error("get kafka consumer error", e); + } else { + LOGGER.info("Skip to seek offset"); + } + return kafkaConsumer; + } catch (Exception e) { + if (kafkaConsumer != null) { + kafkaConsumer.close(); } - } catch (Throwable e) { - LOGGER.error("do run error maybe topic is configured incorrectly: ", e); + LOGGER.error("get kafka consumer error", e); } return null; } diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PulsarSource.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PulsarSource.java index 206f11b2db1..74b98eced72 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PulsarSource.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PulsarSource.java @@ -115,35 +115,39 @@ protected List readFromSource() { } private Consumer getConsumer() { + Consumer consumer = null; try { - Consumer consumer = null; - try { - consumer = pulsarClient.newConsumer(Schema.BYTES) - .topic(topic) - .subscriptionName(subscription) - .subscriptionInitialPosition(SubscriptionInitialPosition.valueOf(subscriptionPosition)) - .subscriptionType(SubscriptionType.valueOf(subscriptionType)) - .subscribe(); - if (!isRestoreFromDB && timestamp != 0L) { - consumer.seek(timestamp); - LOGGER.info("Reset consume from {}", timestamp); - } else { - LOGGER.info("Skip to reset consume"); - } - return consumer; - } catch (PulsarClientException e) { - if (consumer == null) { + consumer = pulsarClient.newConsumer(Schema.BYTES) + .topic(topic) + .subscriptionName(subscription) + .subscriptionInitialPosition(SubscriptionInitialPosition.valueOf(subscriptionPosition)) + .subscriptionType(SubscriptionType.valueOf(subscriptionType)) + .subscribe(); + if (!isRestoreFromDB && timestamp != 0L) { + consumer.seek(timestamp); + LOGGER.info("Reset consume from {}", timestamp); + } else { + LOGGER.info("Skip to reset consume"); + } + return consumer; + } catch (PulsarClientException e) { + if (consumer == null) { + try { consumer.close(); + } catch (PulsarClientException ex) { + LOGGER.error("close consumer error", e); } - LOGGER.error("get consumer error", e); - } catch (IllegalArgumentException e) { - if (consumer == null) { + } + LOGGER.error("get consumer error", e); + } catch (IllegalArgumentException e) { + if (consumer == null) { + try { consumer.close(); + } catch (PulsarClientException ex) { + LOGGER.error("close consumer error", e); } - LOGGER.error("get consumer error", e); } - } catch (Throwable e) { - LOGGER.error("do run error maybe pulsar client is configured incorrectly: ", e); + LOGGER.error("get consumer error", e); } return null; } From e63799a6a360ef2af02fa209cb6ac89e0f9d5c5e Mon Sep 17 00:00:00 2001 From: wenweihuang Date: Mon, 15 Apr 2024 21:38:24 +0800 Subject: [PATCH 5/5] [INLONG-9982][Agent] Modify based on comments --- .../inlong/agent/plugin/sources/PulsarSource.java | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PulsarSource.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PulsarSource.java index 74b98eced72..64974d79942 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PulsarSource.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PulsarSource.java @@ -130,16 +130,7 @@ private Consumer getConsumer() { LOGGER.info("Skip to reset consume"); } return consumer; - } catch (PulsarClientException e) { - if (consumer == null) { - try { - consumer.close(); - } catch (PulsarClientException ex) { - LOGGER.error("close consumer error", e); - } - } - LOGGER.error("get consumer error", e); - } catch (IllegalArgumentException e) { + } catch (PulsarClientException | IllegalArgumentException e) { if (consumer == null) { try { consumer.close();