diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/KafkaSource.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/KafkaSource.java index fa9034c6bc..62aa87433b 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/KafkaSource.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/KafkaSource.java @@ -94,7 +94,7 @@ private class SourceData { 1L, TimeUnit.SECONDS, new SynchronousQueue<>(), new AgentThreadFactory("kafka-source")); - private BlockingQueue queue; + private BlockingQueue queue; public InstanceProfile profile; private int maxPackSize; private String taskId; @@ -213,13 +213,13 @@ private void doRun(KafkaConsumer kafkaConsumer) { break; } ConsumerRecords records = kafkaConsumer.poll(Duration.ofMillis(1000)); - MemoryManager.getInstance().release(AGENT_GLOBAL_READER_SOURCE_PERMIT, BATCH_TOTAL_LEN); if (records.isEmpty()) { if (queue.isEmpty()) { emptyCount.incrementAndGet(); } else { emptyCount.set(0); } + MemoryManager.getInstance().release(AGENT_GLOBAL_READER_SOURCE_PERMIT, BATCH_TOTAL_LEN); AgentUtils.silenceSleepInSeconds(1); continue; } @@ -234,6 +234,7 @@ private void doRun(KafkaConsumer kafkaConsumer) { putIntoQueue(sourceData); offset = record.offset(); } + MemoryManager.getInstance().release(AGENT_GLOBAL_READER_SOURCE_PERMIT, BATCH_TOTAL_LEN); kafkaConsumer.commitSync(); if (AgentUtils.getCurrentTime() - lastPrintTime > CORE_THREAD_PRINT_INTERVAL_MS) { @@ -298,7 +299,7 @@ public List split(TaskProfile conf) { @Override public Message read() { - KafkaSource.SourceData sourceData = null; + SourceData sourceData = null; try { sourceData = queue.poll(READ_WAIT_TIMEOUT_MS, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java index 3ba0f28f9c..9f83f6fc06 100755 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java @@ -476,13 +476,13 @@ private void doRun() { } catch (IOException e) { LOGGER.error("readFromPos error: ", e); } - MemoryManager.getInstance().release(AGENT_GLOBAL_READER_SOURCE_PERMIT, BATCH_READ_LINE_TOTAL_LEN); if (lines.isEmpty()) { if (queue.isEmpty()) { emptyCount++; } else { emptyCount = 0; } + MemoryManager.getInstance().release(AGENT_GLOBAL_READER_SOURCE_PERMIT, BATCH_READ_LINE_TOTAL_LEN); AgentUtils.silenceSleepInSeconds(1); continue; } @@ -494,6 +494,7 @@ private void doRun() { } putIntoQueue(lines.get(i)); } + MemoryManager.getInstance().release(AGENT_GLOBAL_READER_SOURCE_PERMIT, BATCH_READ_LINE_TOTAL_LEN); if (AgentUtils.getCurrentTime() - lastPrintTime > CORE_THREAD_PRINT_INTERVAL_MS) { lastPrintTime = AgentUtils.getCurrentTime(); LOGGER.info("path is {}, linePosition {}, bytePosition is {} file len {}, reads lines size {}", diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PulsarSource.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PulsarSource.java index ebd8495d9f..c64653e2a3 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PulsarSource.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PulsarSource.java @@ -185,13 +185,13 @@ private void doRun(Consumer consumer) throws PulsarClientException { break; } org.apache.pulsar.client.api.Message message = consumer.receive(0, TimeUnit.MILLISECONDS); - MemoryManager.getInstance().release(AGENT_GLOBAL_READER_SOURCE_PERMIT, BATCH_TOTAL_LEN); if (ObjectUtils.isEmpty(message)) { if (queue.isEmpty()) { emptyCount.incrementAndGet(); } else { emptyCount.set(0); } + MemoryManager.getInstance().release(AGENT_GLOBAL_READER_SOURCE_PERMIT, BATCH_TOTAL_LEN); AgentUtils.silenceSleepInSeconds(1); continue; } @@ -203,6 +203,7 @@ private void doRun(Consumer consumer) throws PulsarClientException { break; } putIntoQueue(sourceData); + MemoryManager.getInstance().release(AGENT_GLOBAL_READER_SOURCE_PERMIT, BATCH_TOTAL_LEN); consumer.acknowledge(message); if (AgentUtils.getCurrentTime() - lastPrintTime > CORE_THREAD_PRINT_INTERVAL_MS) { @@ -260,7 +261,7 @@ public List split(TaskProfile conf) { @Override public Message read() { - PulsarSource.SourceData sourceData = null; + SourceData sourceData = null; try { sourceData = queue.poll(READ_WAIT_TIMEOUT_MS, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { @@ -274,7 +275,7 @@ public Message read() { return finalMsg; } - private Message createMessage(PulsarSource.SourceData sourceData) { + private Message createMessage(SourceData sourceData) { String proxyPartitionKey = profile.get(PROXY_SEND_PARTITION_KEY, DigestUtils.md5Hex(inlongGroupId)); Map header = new HashMap<>(); header.put(PROXY_KEY_DATA, proxyPartitionKey);