Skip to content

Commit

Permalink
[INLONG-9969][Agent] Release the memory semaphore of the source only …
Browse files Browse the repository at this point in the history
…when the data is placed in the queue (#9971)
  • Loading branch information
justinwwhuang authored Apr 11, 2024
1 parent bff9c2c commit 24bdfc1
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ private class SourceData {
1L, TimeUnit.SECONDS,
new SynchronousQueue<>(),
new AgentThreadFactory("kafka-source"));
private BlockingQueue<KafkaSource.SourceData> queue;
private BlockingQueue<SourceData> queue;
public InstanceProfile profile;
private int maxPackSize;
private String taskId;
Expand Down Expand Up @@ -213,13 +213,13 @@ private void doRun(KafkaConsumer<String, byte[]> kafkaConsumer) {
break;
}
ConsumerRecords<String, byte[]> records = kafkaConsumer.poll(Duration.ofMillis(1000));
MemoryManager.getInstance().release(AGENT_GLOBAL_READER_SOURCE_PERMIT, BATCH_TOTAL_LEN);
if (records.isEmpty()) {
if (queue.isEmpty()) {
emptyCount.incrementAndGet();
} else {
emptyCount.set(0);
}
MemoryManager.getInstance().release(AGENT_GLOBAL_READER_SOURCE_PERMIT, BATCH_TOTAL_LEN);
AgentUtils.silenceSleepInSeconds(1);
continue;
}
Expand All @@ -234,6 +234,7 @@ private void doRun(KafkaConsumer<String, byte[]> kafkaConsumer) {
putIntoQueue(sourceData);
offset = record.offset();
}
MemoryManager.getInstance().release(AGENT_GLOBAL_READER_SOURCE_PERMIT, BATCH_TOTAL_LEN);
kafkaConsumer.commitSync();

if (AgentUtils.getCurrentTime() - lastPrintTime > CORE_THREAD_PRINT_INTERVAL_MS) {
Expand Down Expand Up @@ -298,7 +299,7 @@ public List<Reader> split(TaskProfile conf) {

@Override
public Message read() {
KafkaSource.SourceData sourceData = null;
SourceData sourceData = null;
try {
sourceData = queue.poll(READ_WAIT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -476,13 +476,13 @@ private void doRun() {
} catch (IOException e) {
LOGGER.error("readFromPos error: ", e);
}
MemoryManager.getInstance().release(AGENT_GLOBAL_READER_SOURCE_PERMIT, BATCH_READ_LINE_TOTAL_LEN);
if (lines.isEmpty()) {
if (queue.isEmpty()) {
emptyCount++;
} else {
emptyCount = 0;
}
MemoryManager.getInstance().release(AGENT_GLOBAL_READER_SOURCE_PERMIT, BATCH_READ_LINE_TOTAL_LEN);
AgentUtils.silenceSleepInSeconds(1);
continue;
}
Expand All @@ -494,6 +494,7 @@ private void doRun() {
}
putIntoQueue(lines.get(i));
}
MemoryManager.getInstance().release(AGENT_GLOBAL_READER_SOURCE_PERMIT, BATCH_READ_LINE_TOTAL_LEN);
if (AgentUtils.getCurrentTime() - lastPrintTime > CORE_THREAD_PRINT_INTERVAL_MS) {
lastPrintTime = AgentUtils.getCurrentTime();
LOGGER.info("path is {}, linePosition {}, bytePosition is {} file len {}, reads lines size {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,13 +185,13 @@ private void doRun(Consumer<byte[]> consumer) throws PulsarClientException {
break;
}
org.apache.pulsar.client.api.Message<byte[]> message = consumer.receive(0, TimeUnit.MILLISECONDS);
MemoryManager.getInstance().release(AGENT_GLOBAL_READER_SOURCE_PERMIT, BATCH_TOTAL_LEN);
if (ObjectUtils.isEmpty(message)) {
if (queue.isEmpty()) {
emptyCount.incrementAndGet();
} else {
emptyCount.set(0);
}
MemoryManager.getInstance().release(AGENT_GLOBAL_READER_SOURCE_PERMIT, BATCH_TOTAL_LEN);
AgentUtils.silenceSleepInSeconds(1);
continue;
}
Expand All @@ -203,6 +203,7 @@ private void doRun(Consumer<byte[]> consumer) throws PulsarClientException {
break;
}
putIntoQueue(sourceData);
MemoryManager.getInstance().release(AGENT_GLOBAL_READER_SOURCE_PERMIT, BATCH_TOTAL_LEN);
consumer.acknowledge(message);

if (AgentUtils.getCurrentTime() - lastPrintTime > CORE_THREAD_PRINT_INTERVAL_MS) {
Expand Down Expand Up @@ -260,7 +261,7 @@ public List<Reader> split(TaskProfile conf) {

@Override
public Message read() {
PulsarSource.SourceData sourceData = null;
SourceData sourceData = null;
try {
sourceData = queue.poll(READ_WAIT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Expand All @@ -274,7 +275,7 @@ public Message read() {
return finalMsg;
}

private Message createMessage(PulsarSource.SourceData sourceData) {
private Message createMessage(SourceData sourceData) {
String proxyPartitionKey = profile.get(PROXY_SEND_PARTITION_KEY, DigestUtils.md5Hex(inlongGroupId));
Map<String, String> header = new HashMap<>();
header.put(PROXY_KEY_DATA, proxyPartitionKey);
Expand Down

0 comments on commit 24bdfc1

Please sign in to comment.