From ccc44e2e8e869a7667a5dd462b2d8f522b39ef9e Mon Sep 17 00:00:00 2001 From: V_Galaxy Date: Mon, 28 Oct 2024 18:07:06 +0800 Subject: [PATCH] Subscription: implement subscription event optimistic transmission strategy to reduce peak memory usage (#13763) --- .../rpc/subscription/config/TopicConfig.java | 7 +- .../poll/SubscriptionPollResponse.java | 20 +- .../broker/SubscriptionBroker.java | 20 +- .../broker/SubscriptionPrefetchingQueue.java | 21 +- .../SubscriptionPrefetchingTabletQueue.java | 7 +- .../SubscriptionPrefetchingTsFileQueue.java | 6 +- .../subscription/event/SubscriptionEvent.java | 298 ++++-------------- .../batch/SubscriptionPipeEventBatch.java | 84 ++++- .../SubscriptionPipeTabletEventBatch.java | 157 ++++----- .../SubscriptionPipeTsFileEventBatch.java | 81 ++--- .../cache/CachedSubscriptionPollResponse.java | 83 +++++ .../SubscriptionPollResponseCache.java} | 43 +-- .../pipe/SubscriptionPipeEmptyEvent.java | 7 - .../event/pipe/SubscriptionPipeEvents.java | 7 - .../SubscriptionPipeTabletBatchEvents.java | 7 - .../SubscriptionPipeTsFileBatchEvents.java | 14 +- .../SubscriptionPipeTsFilePlainEvent.java | 7 - .../SubscriptionEventExtendableResponse.java | 151 +++++++++ .../response/SubscriptionEventResponse.java | 52 +++ .../SubscriptionEventSingleResponse.java | 122 +++++++ .../SubscriptionEventTabletResponse.java | 162 ++++++++++ .../SubscriptionEventTsFileResponse.java | 170 ++++++++++ .../receiver/SubscriptionReceiverV1.java | 2 +- .../subscription/SubscriptionStatesTest.java | 5 - 24 files changed, 1024 insertions(+), 509 deletions(-) create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/cache/CachedSubscriptionPollResponse.java rename iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/{SubscriptionEventBinaryCache.java => cache/SubscriptionPollResponseCache.java} (78%) create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventExtendableResponse.java create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventResponse.java create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventSingleResponse.java create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventTabletResponse.java create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventTsFileResponse.java diff --git a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/TopicConfig.java b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/TopicConfig.java index 4bec8d762a2a..46dc7601e0f4 100644 --- a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/TopicConfig.java +++ b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/TopicConfig.java @@ -106,7 +106,7 @@ public Map getAttributesWithTimeRange() { } public Map getAttributesWithRealtimeMode() { - return REALTIME_STREAM_MODE_CONFIG; + return REALTIME_STREAM_MODE_CONFIG; // default to stream (hybrid) } public Map getAttributesWithSourceMode() { @@ -136,9 +136,6 @@ public Map getAttributesWithProcessorPrefix() { } public Map getAttributesWithSinkFormat() { - return TopicConstant.FORMAT_TS_FILE_HANDLER_VALUE.equalsIgnoreCase( - attributes.getOrDefault(TopicConstant.FORMAT_KEY, TopicConstant.FORMAT_DEFAULT_VALUE)) - ? Collections.emptyMap() - : SINK_TABLET_FORMAT_CONFIG; + return Collections.emptyMap(); // default to hybrid } } diff --git a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/poll/SubscriptionPollResponse.java b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/poll/SubscriptionPollResponse.java index 01d173d27428..06baa30acee9 100644 --- a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/poll/SubscriptionPollResponse.java +++ b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/poll/SubscriptionPollResponse.java @@ -27,6 +27,8 @@ import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; public class SubscriptionPollResponse { @@ -110,16 +112,18 @@ public static SubscriptionPollResponse deserialize(final ByteBuffer buffer) { return new SubscriptionPollResponse(responseType, payload, commitContext); } - /////////////////////////////// object /////////////////////////////// + /////////////////////////////// stringify /////////////////////////////// @Override public String toString() { - return "SubscriptionPollResponse{responseType=" - + SubscriptionPollResponseType.valueOf(responseType).toString() - + ", payload=" - + payload - + ", commitContext=" - + commitContext - + "}"; + return "SubscriptionPollResponse" + coreReportMessage(); + } + + protected Map coreReportMessage() { + final Map result = new HashMap<>(); + result.put("responseType", SubscriptionPollResponseType.valueOf(responseType).toString()); + result.put("payload", payload.toString()); + result.put("commitContext", commitContext.toString()); + return result; } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java index afc8f2f22908..c4e47dec1506 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java @@ -24,13 +24,11 @@ import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent; import org.apache.iotdb.db.subscription.agent.SubscriptionAgent; import org.apache.iotdb.db.subscription.event.SubscriptionEvent; -import org.apache.iotdb.db.subscription.event.pipe.SubscriptionPipeEmptyEvent; import org.apache.iotdb.db.subscription.metric.SubscriptionPrefetchingQueueMetrics; import org.apache.iotdb.pipe.api.event.Event; import org.apache.iotdb.rpc.subscription.config.TopicConstant; import org.apache.iotdb.rpc.subscription.exception.SubscriptionException; import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext; -import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponse; import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponseType; import org.apache.iotdb.rpc.subscription.payload.poll.TerminationPayload; @@ -103,16 +101,14 @@ public List poll( brokerId); events.add( new SubscriptionEvent( - new SubscriptionPipeEmptyEvent(), - new SubscriptionPollResponse( - SubscriptionPollResponseType.TERMINATION.getType(), - new TerminationPayload(), - new SubscriptionCommitContext( - IoTDBDescriptor.getInstance().getConfig().getDataNodeId(), - PipeDataNodeAgent.runtime().getRebootTimes(), - topicName, - brokerId, - INVALID_COMMIT_ID)))); + SubscriptionPollResponseType.TERMINATION.getType(), + new TerminationPayload(), + new SubscriptionCommitContext( + IoTDBDescriptor.getInstance().getConfig().getDataNodeId(), + PipeDataNodeAgent.runtime().getRebootTimes(), + topicName, + brokerId, + INVALID_COMMIT_ID))); continue; } // There are two reasons for not printing logs here: diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java index e072b7a8d512..9aa9343083da 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java @@ -28,13 +28,11 @@ import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent; import org.apache.iotdb.db.subscription.event.SubscriptionEvent; import org.apache.iotdb.db.subscription.event.batch.SubscriptionPipeEventBatches; -import org.apache.iotdb.db.subscription.event.pipe.SubscriptionPipeEmptyEvent; import org.apache.iotdb.pipe.api.event.Event; import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent; import org.apache.iotdb.rpc.subscription.payload.poll.ErrorPayload; import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext; -import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponse; import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponseType; import com.google.common.collect.ImmutableSet; @@ -288,6 +286,7 @@ public void executePrefetchInternal() { } protected void enqueueEventToPrefetchingQueue(final SubscriptionEvent event) { + // TODO: consider memory usage event.trySerializeCurrentResponse(); prefetchingQueue.add(event); } @@ -532,16 +531,14 @@ public SubscriptionCommitContext generateSubscriptionCommitContext() { protected SubscriptionEvent generateSubscriptionPollErrorResponse(final String errorMessage) { // consider non-critical by default, meaning the client can retry return new SubscriptionEvent( - new SubscriptionPipeEmptyEvent(), - new SubscriptionPollResponse( - SubscriptionPollResponseType.ERROR.getType(), - new ErrorPayload(errorMessage, false), - new SubscriptionCommitContext( - IoTDBDescriptor.getInstance().getConfig().getDataNodeId(), - PipeDataNodeAgent.runtime().getRebootTimes(), - topicName, - brokerId, - INVALID_COMMIT_ID))); + SubscriptionPollResponseType.ERROR.getType(), + new ErrorPayload(errorMessage, false), + new SubscriptionCommitContext( + IoTDBDescriptor.getInstance().getConfig().getDataNodeId(), + PipeDataNodeAgent.runtime().getRebootTimes(), + topicName, + brokerId, + INVALID_COMMIT_ID)); } //////////////////////////// APIs provided for metric framework //////////////////////////// diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java index ea7a652e4615..09f41cf23cb4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.subscription.broker; +import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.commons.subscription.config.SubscriptionConfig; import org.apache.iotdb.db.subscription.event.SubscriptionEvent; import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent; @@ -162,11 +163,7 @@ public SubscriptionEvent pollTablets( @Override protected boolean onEvent(final TsFileInsertionEvent event) { - LOGGER.warn( - "Subscription: SubscriptionPrefetchingTabletQueue {} ignore TsFileInsertionEvent {} when prefetching.", - this, - event); - return false; + return batches.onEvent((EnrichedEvent) event, this::enqueueEventToPrefetchingQueue); } /////////////////////////////// stringify /////////////////////////////// diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java index 64daaca4b4c4..5d12eb490ccb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java @@ -240,10 +240,8 @@ protected boolean onEvent(final TsFileInsertionEvent event) { final SubscriptionEvent ev = new SubscriptionEvent( new SubscriptionPipeTsFilePlainEvent((PipeTsFileInsertionEvent) event), - new SubscriptionPollResponse( - SubscriptionPollResponseType.FILE_INIT.getType(), - new FileInitPayload(((PipeTsFileInsertionEvent) event).getTsFile().getName()), - commitContext)); + ((PipeTsFileInsertionEvent) event).getTsFile(), + commitContext); super.enqueueEventToPrefetchingQueue(ev); return true; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java index ad274023f6a8..0e7fd04d0095 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java @@ -20,34 +20,29 @@ package org.apache.iotdb.db.subscription.event; import org.apache.iotdb.commons.subscription.config.SubscriptionConfig; +import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.db.subscription.broker.SubscriptionPrefetchingQueue; +import org.apache.iotdb.db.subscription.event.batch.SubscriptionPipeTabletEventBatch; +import org.apache.iotdb.db.subscription.event.cache.CachedSubscriptionPollResponse; +import org.apache.iotdb.db.subscription.event.pipe.SubscriptionPipeEmptyEvent; import org.apache.iotdb.db.subscription.event.pipe.SubscriptionPipeEvents; -import org.apache.iotdb.rpc.subscription.payload.poll.ErrorPayload; -import org.apache.iotdb.rpc.subscription.payload.poll.FileInitPayload; -import org.apache.iotdb.rpc.subscription.payload.poll.FilePiecePayload; -import org.apache.iotdb.rpc.subscription.payload.poll.FileSealPayload; +import org.apache.iotdb.db.subscription.event.pipe.SubscriptionPipeTabletBatchEvents; +import org.apache.iotdb.db.subscription.event.response.SubscriptionEventResponse; +import org.apache.iotdb.db.subscription.event.response.SubscriptionEventSingleResponse; +import org.apache.iotdb.db.subscription.event.response.SubscriptionEventTabletResponse; +import org.apache.iotdb.db.subscription.event.response.SubscriptionEventTsFileResponse; import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext; import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollPayload; import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponse; -import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponseType; -import org.apache.iotdb.rpc.subscription.payload.poll.TabletsPayload; -import org.apache.iotdb.rpc.subscription.payload.poll.TerminationPayload; -import org.checkerframework.checker.nullness.qual.NonNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; -import java.io.RandomAccessFile; import java.nio.ByteBuffer; -import java.util.Arrays; -import java.util.List; -import java.util.Objects; -import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; -import java.util.stream.Collectors; import static org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext.INVALID_COMMIT_ID; @@ -58,81 +53,63 @@ public class SubscriptionEvent { private static final long INVALID_TIMESTAMP = -1; private final SubscriptionPipeEvents pipeEvents; - - private final SubscriptionPollResponse[] responses; - private int currentResponseIndex = 0; - - private final ByteBuffer[] byteBuffers; // serialized responses - private final SubscriptionCommitContext - commitContext; // all responses have the same commit context + private final SubscriptionEventResponse response; + private final SubscriptionCommitContext commitContext; // lastPolledConsumerId is not used as a criterion for determining pollability private volatile String lastPolledConsumerId = null; private final AtomicLong lastPolledTimestamp = new AtomicLong(INVALID_TIMESTAMP); private final AtomicLong committedTimestamp = new AtomicLong(INVALID_TIMESTAMP); + // record file name for file payload + private String fileName; + /** - * Constructs a {@link SubscriptionEvent} with an initial response. - * - * @param pipeEvents The underlying pipe events corresponding to this {@link SubscriptionEvent}. - * @param initialResponse The initial response which must be of type {@link FileInitPayload}. This - * indicates that subsequent responses need to be fetched using {@link - * SubscriptionEvent#prefetchRemainingResponses()}. + * Constructs a {@link SubscriptionEvent} with the response type of {@link + * SubscriptionEventSingleResponse}. */ public SubscriptionEvent( - final SubscriptionPipeEvents pipeEvents, final SubscriptionPollResponse initialResponse) { - this.pipeEvents = pipeEvents; + final short responseType, + final SubscriptionPollPayload payload, + final SubscriptionCommitContext commitContext) { + this.pipeEvents = new SubscriptionPipeEmptyEvent(); + this.response = new SubscriptionEventSingleResponse(responseType, payload, commitContext); + this.commitContext = commitContext; + } - final int responseLength = getResponseLength(initialResponse.getResponseType()); - this.responses = new SubscriptionPollResponse[responseLength]; - this.responses[0] = initialResponse; + @TestOnly + public SubscriptionEvent(final SubscriptionPollResponse response) { + this(response.getResponseType(), response.getPayload(), response.getCommitContext()); + } - this.byteBuffers = new ByteBuffer[responseLength]; - this.commitContext = initialResponse.getCommitContext(); + /** + * Constructs a {@link SubscriptionEvent} with the response type of {@link + * SubscriptionEventTabletResponse}. + */ + public SubscriptionEvent( + final SubscriptionPipeTabletEventBatch batch, final SubscriptionCommitContext commitContext) { + this.pipeEvents = new SubscriptionPipeTabletBatchEvents(batch); + this.response = new SubscriptionEventTabletResponse(batch, commitContext); + this.commitContext = commitContext; } /** - * Constructs a {@link SubscriptionEvent} with a list of responses. - * - * @param pipeEvents The underlying pipe events corresponding to this {@link SubscriptionEvent}. - * @param responses A list of responses that can be of types {@link TabletsPayload}, {@link - * TerminationPayload}, or {@link ErrorPayload}. All responses are already generated at the - * time of construction, so {@link SubscriptionEvent#prefetchRemainingResponses()} is not - * required. + * Constructs a {@link SubscriptionEvent} with the response type of {@link + * SubscriptionEventTsFileResponse}. */ public SubscriptionEvent( - final SubscriptionPipeEvents pipeEvents, final List responses) { + final SubscriptionPipeEvents pipeEvents, + final File tsFile, + final SubscriptionCommitContext commitContext) { this.pipeEvents = pipeEvents; + this.response = new SubscriptionEventTsFileResponse(tsFile, commitContext); + this.commitContext = commitContext; - final int responseLength = responses.size(); - this.responses = new SubscriptionPollResponse[responseLength]; - for (int i = 0; i < responseLength; i++) { - this.responses[i] = responses.get(i); - } - - this.byteBuffers = new ByteBuffer[responseLength]; - this.commitContext = this.responses[0].getCommitContext(); - } - - private int getResponseLength(final short responseType) { - if (!Objects.equals(SubscriptionPollResponseType.FILE_INIT.getType(), responseType)) { - LOGGER.warn("unexpected response type: {}", responseType); - return 1; - } - final long fileLength = pipeEvents.getTsFile().length(); - final long readFileBufferSize = - SubscriptionConfig.getInstance().getSubscriptionReadFileBufferSize(); - final int length = (int) (fileLength / readFileBufferSize); - // add for init, last piece and seal - return (fileLength % readFileBufferSize != 0) ? length + 3 : length + 2; + this.fileName = tsFile.getName(); } public SubscriptionPollResponse getCurrentResponse() { - return getResponse(currentResponseIndex); - } - - private SubscriptionPollResponse getResponse(final int index) { - return responses[index]; + return response.getCurrentResponse(); } public SubscriptionCommitContext getCommitContext() { @@ -158,7 +135,7 @@ public boolean isCommittable() { // event with invalid commit id is uncommittable return false; } - return currentResponseIndex >= responses.length - 1; + return response.isCommittable(); } public void ack() { @@ -172,10 +149,12 @@ public void ack() { */ public void cleanUp() { // reset serialized responses - resetResponseByteBuffer(true); + response.cleanUp(); // clean up pipe events pipeEvents.cleanUp(); + + // TODO: clean more fields } //////////////////////////// pollable //////////////////////////// @@ -224,8 +203,8 @@ private boolean canRecycle() { } public void nack() { - // reset current response index - currentResponseIndex = 0; + // nack response + response.nack(); // reset lastPolledTimestamp makes this event pollable lastPolledTimestamp.set(INVALID_TIMESTAMP); @@ -241,131 +220,31 @@ public String getLastPolledConsumerId() { //////////////////////////// prefetch & fetch //////////////////////////// - /** - * @param index the index of response to be prefetched - */ - private void prefetchResponse(final int index) throws IOException { - if (index >= responses.length || index <= 0) { - return; - } - - if (Objects.nonNull(responses[index])) { - return; - } - - final SubscriptionPollResponse previousResponse = this.getResponse(index - 1); - final short responseType = previousResponse.getResponseType(); - final SubscriptionPollPayload payload = previousResponse.getPayload(); - if (!SubscriptionPollResponseType.isValidatedResponseType(responseType)) { - LOGGER.warn("unexpected response type: {}", responseType); - return; - } - - switch (SubscriptionPollResponseType.valueOf(responseType)) { - case FILE_INIT: - responses[index] = generateSubscriptionPollResponseWithPieceOrSealPayload(0); - break; - case FILE_PIECE: - responses[index] = - generateSubscriptionPollResponseWithPieceOrSealPayload( - ((FilePiecePayload) payload).getNextWritingOffset()); - break; - case FILE_SEAL: - // not need to prefetch - return; - default: - LOGGER.warn("unexpected message type: {}", responseType); - } - } - public void prefetchRemainingResponses() throws IOException { - for (int currentIndex = currentResponseIndex; - currentIndex < responses.length - 1; - currentIndex++) { - if (Objects.isNull(responses[currentIndex + 1])) { - prefetchResponse(currentIndex + 1); - return; - } - } + response.prefetchRemainingResponses(); } public void fetchNextResponse() throws IOException { - if (currentResponseIndex >= responses.length - 1) { - LOGGER.warn("No more responses when fetching next response for {}, do nothing.", this); - return; - } - if (Objects.isNull(responses[currentResponseIndex + 1])) { - prefetchRemainingResponses(); - } - currentResponseIndex++; + response.fetchNextResponse(); } //////////////////////////// byte buffer //////////////////////////// public void trySerializeRemainingResponses() { - for (int currentIndex = currentResponseIndex; - currentIndex < responses.length - 1; - currentIndex++) { - if (Objects.nonNull(responses[currentIndex + 1]) && trySerializeResponse(currentIndex + 1)) { - break; - } - } - } - - public boolean trySerializeCurrentResponse() { - return trySerializeResponse(currentResponseIndex); + // TODO: consider memory usage + response.trySerializeRemainingResponses(); } - /** - * @param index the index of response to be serialized - * @return {@code true} if a serialization operation was actually performed - */ - private boolean trySerializeResponse(final int index) { - if (index >= responses.length) { - return false; - } - - if (Objects.isNull(responses[index])) { - return false; - } - - if (Objects.nonNull(byteBuffers[index])) { - return false; - } - - final Optional optionalByteBuffer = - SubscriptionEventBinaryCache.getInstance().trySerialize(responses[index]); - if (optionalByteBuffer.isPresent()) { - byteBuffers[index] = optionalByteBuffer.get(); - return true; - } - - return false; + public void trySerializeCurrentResponse() { + response.trySerializeCurrentResponse(); } public ByteBuffer getCurrentResponseByteBuffer() throws IOException { - if (Objects.nonNull(byteBuffers[currentResponseIndex])) { - return byteBuffers[currentResponseIndex]; - } - - return byteBuffers[currentResponseIndex] = - SubscriptionEventBinaryCache.getInstance().serialize(getCurrentResponse()); + return response.getCurrentResponseByteBuffer(); } - public void resetResponseByteBuffer(final boolean resetAll) { - if (resetAll) { - SubscriptionEventBinaryCache.getInstance() - .invalidateAll( - Arrays.stream(responses).filter(Objects::nonNull).collect(Collectors.toList())); - // maybe friendly for gc - Arrays.fill(byteBuffers, null); - } else { - if (Objects.nonNull(responses[currentResponseIndex])) { - SubscriptionEventBinaryCache.getInstance().invalidate(responses[currentResponseIndex]); - } - // maybe friendly for gc - byteBuffers[currentResponseIndex] = null; - } + public void invalidateCurrentResponseByteBuffer() { + response.invalidateCurrentResponseByteBuffer(); } public int getCurrentResponseSize() throws IOException { @@ -374,69 +253,24 @@ public int getCurrentResponseSize() throws IOException { return byteBuffer.limit() - byteBuffer.position(); } - /////////////////////////////// tsfile /////////////////////////////// - - private @NonNull SubscriptionPollResponse generateSubscriptionPollResponseWithPieceOrSealPayload( - final long writingOffset) throws IOException { - final File tsFile = pipeEvents.getTsFile(); - - final long readFileBufferSize = - SubscriptionConfig.getInstance().getSubscriptionReadFileBufferSize(); - final byte[] readBuffer = new byte[(int) readFileBufferSize]; - try (final RandomAccessFile reader = new RandomAccessFile(tsFile, "r")) { - while (true) { - reader.seek(writingOffset); - final int readLength = reader.read(readBuffer); - if (readLength == -1) { - break; - } - - final byte[] filePiece = - readLength == readFileBufferSize - ? readBuffer - : Arrays.copyOfRange(readBuffer, 0, readLength); - - // generate subscription poll response with piece payload - return new SubscriptionPollResponse( - SubscriptionPollResponseType.FILE_PIECE.getType(), - new FilePiecePayload(tsFile.getName(), writingOffset + readLength, filePiece), - commitContext); - } - - // generate subscription poll response with seal payload - return new SubscriptionPollResponse( - SubscriptionPollResponseType.FILE_SEAL.getType(), - new FileSealPayload(tsFile.getName(), tsFile.length()), - commitContext); - } - } + //////////////////////////// tsfile //////////////////////////// public String getFileName() { - return pipeEvents.getTsFile().getName(); + return fileName; } - /////////////////////////////// APIs provided for metric framework /////////////////////////////// + //////////////////////////// APIs provided for metric framework //////////////////////////// public int getPipeEventCount() { return pipeEvents.getPipeEventCount(); } - /////////////////////////////// object /////////////////////////////// + //////////////////////////// object //////////////////////////// @Override public String toString() { - return "SubscriptionEvent{responses=" - + Arrays.toString(responses) - + ", responses' byte buffer size=" - + Arrays.stream(byteBuffers) - .map( - byteBuffer -> - Objects.isNull(byteBuffer) - ? "" - : byteBuffer.limit() - byteBuffer.position()) - .collect(Collectors.toList()) - + ", currentResponseIndex=" - + currentResponseIndex + return "SubscriptionEvent{response=" + + response + ", lastPolledConsumerId=" + lastPolledConsumerId + ", lastPolledTimestamp=" diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatch.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatch.java index 758d9b315675..a396ff8ee2a0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatch.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatch.java @@ -22,17 +22,25 @@ import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.db.subscription.broker.SubscriptionPrefetchingQueue; import org.apache.iotdb.db.subscription.event.SubscriptionEvent; +import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; +import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent; import org.checkerframework.checker.nullness.qual.NonNull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.function.Consumer; +import java.util.stream.Collectors; public abstract class SubscriptionPipeEventBatch { + private static final Logger LOGGER = LoggerFactory.getLogger(SubscriptionPipeEventBatch.class); + private final int regionId; protected final SubscriptionPrefetchingQueue prefetchingQueue; @@ -40,6 +48,7 @@ public abstract class SubscriptionPipeEventBatch { protected final long maxBatchSizeInBytes; protected volatile List events = null; + protected final List enrichedEvents = new ArrayList<>(); protected SubscriptionPipeEventBatch( final int regionId, @@ -52,27 +61,60 @@ protected SubscriptionPipeEventBatch( this.maxBatchSizeInBytes = maxBatchSizeInBytes; } + /////////////////////////////// ack & clean /////////////////////////////// + + public abstract void ack(); + + public abstract void cleanUp(); + + /////////////////////////////// APIs /////////////////////////////// + /** * @return {@code true} if there are subscription events consumed. */ - public abstract boolean onEvent(final Consumer consumer) throws Exception; + protected synchronized boolean onEvent(final Consumer consumer) + throws Exception { + if (shouldEmit() && !enrichedEvents.isEmpty()) { + if (Objects.isNull(events)) { + events = generateSubscriptionEvents(); + } + if (Objects.nonNull(events)) { + events.forEach(consumer); + return true; + } + return false; + } + return false; + } /** * @return {@code true} if there are subscription events consumed. */ - public abstract boolean onEvent( + protected synchronized boolean onEvent( final @NonNull EnrichedEvent event, final Consumer consumer) - throws Exception; + throws Exception { + if (event instanceof TabletInsertionEvent) { + onTabletInsertionEvent((TabletInsertionEvent) event); // no exceptions will be thrown + enrichedEvents.add(event); + } else if (event instanceof TsFileInsertionEvent) { + onTsFileInsertionEvent((TsFileInsertionEvent) event); + enrichedEvents.add(event); + } else { + LOGGER.warn( + "SubscriptionPipeEventBatch {} ignore EnrichedEvent {} when batching.", this, event); + } + return onEvent(consumer); + } - public abstract void cleanUp(); + /////////////////////////////// utility /////////////////////////////// - public int getRegionId() { - return regionId; - } + protected abstract void onTabletInsertionEvent(final TabletInsertionEvent event) throws Exception; - public boolean isSealed() { - return Objects.nonNull(events); - } + protected abstract void onTsFileInsertionEvent(final TsFileInsertionEvent event) throws Exception; + + protected abstract boolean shouldEmit(); + + protected abstract List generateSubscriptionEvents() throws Exception; /////////////////////////////// stringify /////////////////////////////// @@ -82,6 +124,28 @@ protected Map coreReportMessage() { result.put("prefetchingQueue", prefetchingQueue.coreReportMessage().toString()); result.put("maxDelayInMs", String.valueOf(maxDelayInMs)); result.put("maxBatchSizeInBytes", String.valueOf(maxBatchSizeInBytes)); + // TODO: stringify subscription events? + result.put("enrichedEvents", formatEnrichedEvents(enrichedEvents, 4)); return result; } + + private static String formatEnrichedEvents( + final List enrichedEvents, final int threshold) { + final List eventMessageList = + enrichedEvents.stream() + .limit(threshold) + .map(EnrichedEvent::coreReportMessage) + .collect(Collectors.toList()); + if (eventMessageList.size() > threshold) { + eventMessageList.add( + String.format("omit the remaining %s event(s)...", eventMessageList.size() - threshold)); + } + return eventMessageList.toString(); + } + + //////////////////////////// APIs provided for metric framework //////////////////////////// + + public int getPipeEventCount() { + return enrichedEvents.size(); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java index 871b1c12587f..b54607c868b4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java @@ -20,43 +20,31 @@ package org.apache.iotdb.db.subscription.event.batch; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; -import org.apache.iotdb.commons.subscription.config.SubscriptionConfig; import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent; import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil; import org.apache.iotdb.db.subscription.broker.SubscriptionPrefetchingTabletQueue; import org.apache.iotdb.db.subscription.event.SubscriptionEvent; -import org.apache.iotdb.db.subscription.event.pipe.SubscriptionPipeTabletBatchEvents; import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; -import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext; -import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponse; -import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponseType; -import org.apache.iotdb.rpc.subscription.payload.poll.TabletsPayload; +import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent; import org.apache.tsfile.write.record.Tablet; -import org.checkerframework.checker.nullness.qual.NonNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Collections; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.function.Consumer; -import java.util.stream.Collectors; public class SubscriptionPipeTabletEventBatch extends SubscriptionPipeEventBatch { private static final Logger LOGGER = LoggerFactory.getLogger(SubscriptionPipeTabletEventBatch.class); - private static final long READ_TABLET_BUFFER_SIZE = - SubscriptionConfig.getInstance().getSubscriptionReadTabletBufferSize(); - - private final List enrichedEvents = new ArrayList<>(); - private final List tablets = new ArrayList<>(); - + private volatile List tablets = new LinkedList<>(); private long firstEventProcessingTime = Long.MIN_VALUE; private long totalBufferSize = 0; @@ -68,28 +56,36 @@ public SubscriptionPipeTabletEventBatch( super(regionId, prefetchingQueue, maxDelayInMs, maxBatchSizeInBytes); } - @Override - public synchronized boolean onEvent(final Consumer consumer) { - if (shouldEmit() && !enrichedEvents.isEmpty()) { - if (Objects.isNull(events)) { - events = generateSubscriptionEvents(); + public LinkedList moveTablets() { + if (Objects.isNull(tablets)) { + tablets = new ArrayList<>(); + for (final EnrichedEvent enrichedEvent : enrichedEvents) { + if (enrichedEvent instanceof TsFileInsertionEvent) { + onTsFileInsertionEvent((TsFileInsertionEvent) enrichedEvent); + } else if (enrichedEvent instanceof TabletInsertionEvent) { + onTabletInsertionEvent((TabletInsertionEvent) enrichedEvent); + } else { + LOGGER.warn( + "SubscriptionPipeTabletEventBatch {} ignore EnrichedEvent {} when moving.", + this, + enrichedEvent); + } } - if (Objects.nonNull(events)) { - events.forEach(consumer); - return true; - } - return false; } - return false; + final LinkedList result = new LinkedList<>(tablets); + firstEventProcessingTime = Long.MIN_VALUE; + totalBufferSize = 0; + tablets = null; // reset to null for gc & subsequent move + return result; } + /////////////////////////////// ack & clean /////////////////////////////// + @Override - public synchronized boolean onEvent( - final @NonNull EnrichedEvent event, final Consumer consumer) { - if (event instanceof TabletInsertionEvent) { - onEventInternal((TabletInsertionEvent) event); // no exceptions will be thrown + public synchronized void ack() { + for (final EnrichedEvent enrichedEvent : enrichedEvents) { + enrichedEvent.decreaseReferenceCount(this.getClass().getName(), true); } - return onEvent(consumer); } @Override @@ -99,65 +95,42 @@ public synchronized void cleanUp() { enrichedEvent.clearReferenceCount(this.getClass().getName()); } enrichedEvents.clear(); - tablets.clear(); + if (Objects.nonNull(tablets)) { + tablets.clear(); + } } - public synchronized void ack() { - for (final EnrichedEvent enrichedEvent : enrichedEvents) { - enrichedEvent.decreaseReferenceCount(this.getClass().getName(), true); + /////////////////////////////// utility /////////////////////////////// + + @Override + protected void onTabletInsertionEvent(final TabletInsertionEvent event) { + constructBatch(event); + if (firstEventProcessingTime == Long.MIN_VALUE) { + firstEventProcessingTime = System.currentTimeMillis(); } } - /////////////////////////////// utility /////////////////////////////// + @Override + protected void onTsFileInsertionEvent(final TsFileInsertionEvent event) { + for (final TabletInsertionEvent tabletInsertionEvent : event.toTabletInsertionEvents()) { + onTabletInsertionEvent(tabletInsertionEvent); + } + } - private List generateSubscriptionEvents() { + @Override + protected List generateSubscriptionEvents() { if (tablets.isEmpty()) { return null; } - final SubscriptionCommitContext commitContext = - prefetchingQueue.generateSubscriptionCommitContext(); - final List responses = new ArrayList<>(); - final List currentTablets = new ArrayList<>(); - long currentTotalBufferSize = 0; - for (final Tablet tablet : tablets) { - final long bufferSize = PipeMemoryWeightUtil.calculateTabletSizeInBytes(tablet); - if (bufferSize > READ_TABLET_BUFFER_SIZE) { - LOGGER.warn("Detect large tablet with {} byte(s).", bufferSize); - responses.add( - new SubscriptionPollResponse( - SubscriptionPollResponseType.TABLETS.getType(), - new TabletsPayload(Collections.singletonList(tablet), responses.size() + 1), - commitContext)); - continue; - } - if (currentTotalBufferSize + bufferSize > READ_TABLET_BUFFER_SIZE) { - responses.add( - new SubscriptionPollResponse( - SubscriptionPollResponseType.TABLETS.getType(), - new TabletsPayload(new ArrayList<>(currentTablets), responses.size() + 1), - commitContext)); - currentTablets.clear(); - currentTotalBufferSize = 0; - } - currentTablets.add(tablet); - currentTotalBufferSize += bufferSize; - } - responses.add( - new SubscriptionPollResponse( - SubscriptionPollResponseType.TABLETS.getType(), - new TabletsPayload(new ArrayList<>(currentTablets), -tablets.size()), - commitContext)); return Collections.singletonList( - new SubscriptionEvent(new SubscriptionPipeTabletBatchEvents(this), responses)); + new SubscriptionEvent(this, prefetchingQueue.generateSubscriptionCommitContext())); } - private void onEventInternal(final TabletInsertionEvent event) { - constructBatch(event); - enrichedEvents.add((EnrichedEvent) event); - if (firstEventProcessingTime == Long.MIN_VALUE) { - firstEventProcessingTime = System.currentTimeMillis(); - } + @Override + protected boolean shouldEmit() { + return totalBufferSize >= maxBatchSizeInBytes + || System.currentTimeMillis() - firstEventProcessingTime >= maxDelayInMs; } private void constructBatch(final TabletInsertionEvent event) { @@ -173,11 +146,6 @@ private void constructBatch(final TabletInsertionEvent event) { .orElse(0L); } - private boolean shouldEmit() { - return totalBufferSize >= maxBatchSizeInBytes - || System.currentTimeMillis() - firstEventProcessingTime >= maxDelayInMs; - } - private List convertToTablets(final TabletInsertionEvent tabletInsertionEvent) { if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) { return ((PipeInsertNodeTabletInsertionEvent) tabletInsertionEvent).convertToTablets(); @@ -203,30 +171,11 @@ public String toString() { @Override protected Map coreReportMessage() { final Map coreReportMessage = super.coreReportMessage(); - coreReportMessage.put("enrichedEvents", formatEnrichedEvents(enrichedEvents, 4)); - coreReportMessage.put("size of tablets", String.valueOf(tablets.size())); + coreReportMessage.put( + "size of tablets", + (Objects.nonNull(tablets) ? String.valueOf(tablets.size()) : "")); coreReportMessage.put("firstEventProcessingTime", String.valueOf(firstEventProcessingTime)); coreReportMessage.put("totalBufferSize", String.valueOf(totalBufferSize)); return coreReportMessage; } - - private static String formatEnrichedEvents( - final List enrichedEvents, final int threshold) { - final List eventMessageList = - enrichedEvents.stream() - .limit(threshold) - .map(EnrichedEvent::coreReportMessage) - .collect(Collectors.toList()); - if (eventMessageList.size() > threshold) { - eventMessageList.add( - String.format("omit the remaining %s event(s)...", eventMessageList.size() - threshold)); - } - return eventMessageList.toString(); - } - - //////////////////////////// APIs provided for metric framework //////////////////////////// - - public int getPipeEventCount() { - return enrichedEvents.size(); - } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java index e38887e19f2f..dc96fc476daa 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java @@ -25,25 +25,24 @@ import org.apache.iotdb.db.subscription.event.SubscriptionEvent; import org.apache.iotdb.db.subscription.event.pipe.SubscriptionPipeTsFileBatchEvents; import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; -import org.apache.iotdb.rpc.subscription.payload.poll.FileInitPayload; +import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent; import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext; -import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponse; -import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponseType; -import org.checkerframework.checker.nullness.qual.NonNull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.File; import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Consumer; public class SubscriptionPipeTsFileEventBatch extends SubscriptionPipeEventBatch { + private static final Logger LOGGER = + LoggerFactory.getLogger(SubscriptionPipeTsFileEventBatch.class); + private final PipeTabletEventTsFileBatch batch; - private final List enrichedEvents; public SubscriptionPipeTsFileEventBatch( final int regionId, @@ -52,36 +51,11 @@ public SubscriptionPipeTsFileEventBatch( final long maxBatchSizeInBytes) { super(regionId, prefetchingQueue, maxDelayInMs, maxBatchSizeInBytes); this.batch = new PipeTabletEventTsFileBatch(maxDelayInMs, maxBatchSizeInBytes); - this.enrichedEvents = new ArrayList<>(); - } - - @Override - public synchronized boolean onEvent(final Consumer consumer) throws Exception { - if (batch.shouldEmit() && !enrichedEvents.isEmpty()) { - if (Objects.isNull(events)) { - events = generateSubscriptionEvents(); - } - if (Objects.nonNull(events)) { - events.forEach(consumer); - return true; - } - return false; - } - return false; } @Override - public synchronized boolean onEvent( - final @NonNull EnrichedEvent event, final Consumer consumer) - throws Exception { - if (event instanceof TabletInsertionEvent) { - batch.onEvent((TabletInsertionEvent) event); // no exceptions will be thrown - enrichedEvents.add(event); - event.decreaseReferenceCount( - SubscriptionPipeTsFileEventBatch.class.getName(), - false); // missing releaseLastEvent decreases reference count - } - return onEvent(consumer); + public synchronized void ack() { + batch.decreaseEventsReferenceCount(this.getClass().getName(), true); } @Override @@ -91,13 +65,27 @@ public synchronized void cleanUp() { enrichedEvents.clear(); } - public synchronized void ack() { - batch.decreaseEventsReferenceCount(this.getClass().getName(), true); + /////////////////////////////// utility /////////////////////////////// + + @Override + protected void onTabletInsertionEvent(final TabletInsertionEvent event) throws Exception { + batch.onEvent(event); // no exceptions will be thrown + ((EnrichedEvent) event) + .decreaseReferenceCount( + SubscriptionPipeTsFileEventBatch.class.getName(), + false); // missing releaseLastEvent decreases reference count } - /////////////////////////////// utility /////////////////////////////// + @Override + protected void onTsFileInsertionEvent(final TsFileInsertionEvent event) { + LOGGER.warn( + "SubscriptionPipeTsFileEventBatch {} ignore TsFileInsertionEvent {} when batching.", + this, + event); + } - private List generateSubscriptionEvents() throws Exception { + @Override + protected List generateSubscriptionEvents() throws Exception { if (batch.isEmpty()) { return null; } @@ -110,15 +98,16 @@ private List generateSubscriptionEvents() throws Exception { prefetchingQueue.generateSubscriptionCommitContext(); events.add( new SubscriptionEvent( - new SubscriptionPipeTsFileBatchEvents(this, tsFile, referenceCount), - new SubscriptionPollResponse( - SubscriptionPollResponseType.FILE_INIT.getType(), - new FileInitPayload(tsFile.getName()), - commitContext))); + new SubscriptionPipeTsFileBatchEvents(this, referenceCount), tsFile, commitContext)); } return events; } + @Override + protected boolean shouldEmit() { + return batch.shouldEmit(); + } + /////////////////////////////// stringify /////////////////////////////// @Override @@ -132,10 +121,4 @@ protected Map coreReportMessage() { coreReportMessage.put("batch", batch.toString()); return coreReportMessage; } - - //////////////////////////// APIs provided for metric framework //////////////////////////// - - public int getPipeEventCount() { - return enrichedEvents.size(); - } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/cache/CachedSubscriptionPollResponse.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/cache/CachedSubscriptionPollResponse.java new file mode 100644 index 000000000000..54dbcceaccb0 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/cache/CachedSubscriptionPollResponse.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.subscription.event.cache; + +import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext; +import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollPayload; +import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponse; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Map; +import java.util.Objects; + +public class CachedSubscriptionPollResponse extends SubscriptionPollResponse { + + private volatile ByteBuffer byteBuffer; // cached serialized response + + public CachedSubscriptionPollResponse( + final short responseType, + final SubscriptionPollPayload payload, + final SubscriptionCommitContext commitContext) { + super(responseType, payload, commitContext); + } + + public CachedSubscriptionPollResponse(final SubscriptionPollResponse response) { + super(response.getResponseType(), response.getPayload(), response.getCommitContext()); + } + + public ByteBuffer getByteBuffer() { + return byteBuffer; + } + + public void invalidateByteBuffer() { + // maybe friendly for gc + byteBuffer = null; + } + + public static ByteBuffer serialize(final CachedSubscriptionPollResponse response) + throws IOException { + return response.serialize(); + } + + private ByteBuffer serialize() throws IOException { + return Objects.nonNull(byteBuffer) + ? byteBuffer + : (byteBuffer = SubscriptionPollResponse.serialize(this)); + } + + /////////////////////////////// stringify /////////////////////////////// + + @Override + public String toString() { + return "CachedSubscriptionPollResponse" + coreReportMessage(); + } + + @Override + protected Map coreReportMessage() { + final Map coreReportMessage = super.coreReportMessage(); + coreReportMessage.put( + "sizeof(byteBuffer)", + Objects.isNull(byteBuffer) + ? "" + : String.valueOf(byteBuffer.limit() - byteBuffer.position())); + return coreReportMessage; + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEventBinaryCache.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/cache/SubscriptionPollResponseCache.java similarity index 78% rename from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEventBinaryCache.java rename to iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/cache/SubscriptionPollResponseCache.java index 49d90f255684..8b0036216875 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEventBinaryCache.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/cache/SubscriptionPollResponseCache.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.iotdb.db.subscription.event; +package org.apache.iotdb.db.subscription.event.cache; import org.apache.iotdb.commons.subscription.config.SubscriptionConfig; import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager; @@ -35,65 +35,66 @@ import java.nio.ByteBuffer; import java.util.Optional; -/** This class is used to cache {@link SubscriptionPollResponse} in {@link SubscriptionEvent}. */ -class SubscriptionEventBinaryCache { +/** + * This class is used to control memory usage of cache {@link SubscriptionPollResponse} in {@link + * CachedSubscriptionPollResponse}. + */ +public class SubscriptionPollResponseCache { - private static final Logger LOGGER = LoggerFactory.getLogger(SubscriptionEventBinaryCache.class); + private static final Logger LOGGER = LoggerFactory.getLogger(SubscriptionPollResponseCache.class); private final AtomicDouble memoryUsageCheatFactor = new AtomicDouble(1); - private final LoadingCache cache; + private final LoadingCache cache; - ByteBuffer serialize(final SubscriptionPollResponse response) throws IOException { + public ByteBuffer serialize(final CachedSubscriptionPollResponse response) throws IOException { try { return this.cache.get(response); } catch (final Exception e) { LOGGER.warn( - "SubscriptionEventBinaryCache raised an exception while serializing SubscriptionPollResponse: {}", + "SubscriptionEventBinaryCache raised an exception while serializing CachedSubscriptionPollResponse: {}", response, e); throw new IOException(e); } } - Optional trySerialize(final SubscriptionPollResponse response) { + public Optional trySerialize(final CachedSubscriptionPollResponse response) { try { return Optional.of(serialize(response)); } catch (final IOException e) { LOGGER.warn( - "Subscription: something unexpected happened when serializing SubscriptionPollResponse: {}", + "Subscription: something unexpected happened when serializing CachedSubscriptionPollResponse: {}", response, e); return Optional.empty(); } } - void invalidate(final SubscriptionPollResponse response) { + public void invalidate(final CachedSubscriptionPollResponse response) { this.cache.invalidate(response); - } - - void invalidateAll(final Iterable responses) { - this.cache.invalidateAll(responses); + response.invalidateByteBuffer(); } //////////////////////////// singleton //////////////////////////// private static class SubscriptionEventBinaryCacheHolder { - private static final SubscriptionEventBinaryCache INSTANCE = new SubscriptionEventBinaryCache(); + private static final SubscriptionPollResponseCache INSTANCE = + new SubscriptionPollResponseCache(); private SubscriptionEventBinaryCacheHolder() { // empty constructor } } - static SubscriptionEventBinaryCache getInstance() { - return SubscriptionEventBinaryCache.SubscriptionEventBinaryCacheHolder.INSTANCE; + public static SubscriptionPollResponseCache getInstance() { + return SubscriptionPollResponseCache.SubscriptionEventBinaryCacheHolder.INSTANCE; } - private SubscriptionEventBinaryCache() { + private SubscriptionPollResponseCache() { final long initMemorySizeInBytes = - PipeDataNodeResourceManager.memory().getTotalMemorySizeInBytes() / 20; + PipeDataNodeResourceManager.memory().getTotalMemorySizeInBytes() / 5; final long maxMemorySizeInBytes = (long) (PipeDataNodeResourceManager.memory().getTotalMemorySizeInBytes() @@ -129,13 +130,13 @@ private SubscriptionEventBinaryCache() { Caffeine.newBuilder() .maximumWeight(allocatedMemoryBlock.getMemoryUsageInBytes()) .weigher( - (Weigher) + (Weigher) (message, buffer) -> { // TODO: overflow return (int) (buffer.capacity() * memoryUsageCheatFactor.get()); }) .recordStats() // TODO: metrics // NOTE: lambda CAN NOT be replaced with method reference - .build(response -> SubscriptionPollResponse.serialize(response)); + .build(response -> CachedSubscriptionPollResponse.serialize(response)); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeEmptyEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeEmptyEvent.java index 3e74e03d8f4c..ef6cc39a60c2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeEmptyEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeEmptyEvent.java @@ -19,15 +19,8 @@ package org.apache.iotdb.db.subscription.event.pipe; -import java.io.File; - public class SubscriptionPipeEmptyEvent implements SubscriptionPipeEvents { - @Override - public File getTsFile() { - return null; - } - @Override public void ack() {} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeEvents.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeEvents.java index 489c4cf8120c..05f011699ecc 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeEvents.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeEvents.java @@ -19,15 +19,8 @@ package org.apache.iotdb.db.subscription.event.pipe; -import java.io.File; - public interface SubscriptionPipeEvents { - /** - * @return {@code null} if the pipe events do not contain the corresponding tsfile. - */ - File getTsFile(); - void ack(); void cleanUp(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTabletBatchEvents.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTabletBatchEvents.java index 226367405eba..f518c7b01946 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTabletBatchEvents.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTabletBatchEvents.java @@ -21,8 +21,6 @@ import org.apache.iotdb.db.subscription.event.batch.SubscriptionPipeTabletEventBatch; -import java.io.File; - public class SubscriptionPipeTabletBatchEvents implements SubscriptionPipeEvents { private final SubscriptionPipeTabletEventBatch batch; @@ -31,11 +29,6 @@ public SubscriptionPipeTabletBatchEvents(final SubscriptionPipeTabletEventBatch this.batch = batch; } - @Override - public File getTsFile() { - return null; - } - @Override public void ack() { batch.ack(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFileBatchEvents.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFileBatchEvents.java index 5cae21f5ac8a..16151a16a501 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFileBatchEvents.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFileBatchEvents.java @@ -21,31 +21,21 @@ import org.apache.iotdb.db.subscription.event.batch.SubscriptionPipeTsFileEventBatch; -import java.io.File; import java.util.concurrent.atomic.AtomicInteger; public class SubscriptionPipeTsFileBatchEvents implements SubscriptionPipeEvents { private final SubscriptionPipeTsFileEventBatch batch; - private final File tsFile; private final AtomicInteger referenceCount; // shared between the same batch private final int count; // snapshot the initial reference count, used for event count calculation public SubscriptionPipeTsFileBatchEvents( - final SubscriptionPipeTsFileEventBatch batch, - final File tsFile, - final AtomicInteger referenceCount) { + final SubscriptionPipeTsFileEventBatch batch, final AtomicInteger referenceCount) { this.batch = batch; - this.tsFile = tsFile; this.referenceCount = referenceCount; this.count = Math.max(1, referenceCount.get()); } - @Override - public File getTsFile() { - return tsFile; - } - @Override public void ack() { if (referenceCount.decrementAndGet() == 0) { @@ -66,8 +56,6 @@ public void cleanUp() { public String toString() { return "SubscriptionPipeTsFileBatchEvents{batch=" + batch - + ", tsFile=" - + tsFile + ", referenceCount=" + referenceCount + "}"; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFilePlainEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFilePlainEvent.java index 111006fa6d32..8effb654d732 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFilePlainEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFilePlainEvent.java @@ -21,8 +21,6 @@ import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent; -import java.io.File; - public class SubscriptionPipeTsFilePlainEvent implements SubscriptionPipeEvents { private final PipeTsFileInsertionEvent tsFileInsertionEvent; @@ -31,11 +29,6 @@ public SubscriptionPipeTsFilePlainEvent(final PipeTsFileInsertionEvent tsFileIns this.tsFileInsertionEvent = tsFileInsertionEvent; } - @Override - public File getTsFile() { - return tsFileInsertionEvent.getTsFile(); - } - @Override public void ack() { tsFileInsertionEvent.decreaseReferenceCount(this.getClass().getName(), true); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventExtendableResponse.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventExtendableResponse.java new file mode 100644 index 000000000000..07dfaabdf4ff --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventExtendableResponse.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.subscription.event.response; + +import org.apache.iotdb.db.subscription.event.cache.CachedSubscriptionPollResponse; +import org.apache.iotdb.db.subscription.event.cache.SubscriptionPollResponseCache; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Deque; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentLinkedDeque; + +/** + * The {@code SubscriptionEventExtendableResponse} class represents a subscription event response + * that can dynamically change as new responses are fetched. It maintains a list of {@link + * CachedSubscriptionPollResponse} objects and provides methods for managing and serializing these + * responses. + */ +public abstract class SubscriptionEventExtendableResponse + implements SubscriptionEventResponse { + + private static final Logger LOGGER = + LoggerFactory.getLogger(SubscriptionEventTabletResponse.class); + + private final Deque responses; + protected volatile boolean hasNoMore = false; + + protected SubscriptionEventExtendableResponse() { + this.responses = new ConcurrentLinkedDeque<>(); + } + + @Override + public CachedSubscriptionPollResponse getCurrentResponse() { + return peekFirst(); + } + + @Override + public void fetchNextResponse() throws IOException { + prefetchRemainingResponses(); + if (Objects.isNull(poll())) { + LOGGER.warn( + "SubscriptionEventExtendableResponse {} is empty when fetching next response (broken invariant)", + this); + } + } + + @Override + public void trySerializeCurrentResponse() { + SubscriptionPollResponseCache.getInstance().trySerialize(getCurrentResponse()); + } + + @Override + public void trySerializeRemainingResponses() { + responses.stream() + .skip(1) + .filter(response -> Objects.isNull(response.getByteBuffer())) + .findFirst() + .ifPresent(response -> SubscriptionPollResponseCache.getInstance().trySerialize(response)); + } + + @Override + public ByteBuffer getCurrentResponseByteBuffer() throws IOException { + return SubscriptionPollResponseCache.getInstance().serialize(getCurrentResponse()); + } + + @Override + public void invalidateCurrentResponseByteBuffer() { + SubscriptionPollResponseCache.getInstance().invalidate(getCurrentResponse()); + } + + @Override + public void cleanUp() { + CachedSubscriptionPollResponse response; + while (Objects.nonNull(response = poll())) { + SubscriptionPollResponseCache.getInstance().invalidate(response); + } + + hasNoMore = false; + } + + @Override + public boolean isCommittable() { + return hasNoMore && size() == 1; + } + + /////////////////////////////// utility /////////////////////////////// + + protected void offer(final CachedSubscriptionPollResponse response) { + responses.addLast(response); + } + + protected CachedSubscriptionPollResponse poll() { + return responses.isEmpty() ? null : responses.removeFirst(); + } + + protected CachedSubscriptionPollResponse peekFirst() { + return responses.isEmpty() ? null : responses.getFirst(); + } + + protected CachedSubscriptionPollResponse peekLast() { + return responses.isEmpty() ? null : responses.getLast(); + } + + protected int size() { + return responses.size(); + } + + protected boolean isEmpty() { + return responses.isEmpty(); + } + + /////////////////////////////// stringify /////////////////////////////// + + @Override + public String toString() { + return "SubscriptionEventExtendableResponse" + coreReportMessage(); + } + + protected Map coreReportMessage() { + final Map result = new HashMap<>(); + final CachedSubscriptionPollResponse currentResponse = getCurrentResponse(); + result.put( + "currentResponse", + Objects.nonNull(currentResponse) ? currentResponse.toString() : ""); + result.put("hasNoMore", String.valueOf(hasNoMore)); + return result; + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventResponse.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventResponse.java new file mode 100644 index 000000000000..211f0905afa9 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventResponse.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.subscription.event.response; + +import java.io.IOException; +import java.nio.ByteBuffer; + +public interface SubscriptionEventResponse { + + /////////////////////////////// response /////////////////////////////// + + E getCurrentResponse(); + + void prefetchRemainingResponses() throws IOException; + + void fetchNextResponse() throws IOException; + + /////////////////////////////// byte buffer /////////////////////////////// + + void trySerializeCurrentResponse(); + + void trySerializeRemainingResponses(); + + ByteBuffer getCurrentResponseByteBuffer() throws IOException; + + void invalidateCurrentResponseByteBuffer(); + + /////////////////////////////// lifecycle /////////////////////////////// + + void nack(); + + void cleanUp(); + + boolean isCommittable(); +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventSingleResponse.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventSingleResponse.java new file mode 100644 index 000000000000..dbc48ebda00c --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventSingleResponse.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.subscription.event.response; + +import org.apache.iotdb.db.subscription.event.cache.CachedSubscriptionPollResponse; +import org.apache.iotdb.db.subscription.event.cache.SubscriptionPollResponseCache; +import org.apache.iotdb.rpc.subscription.payload.poll.ErrorPayload; +import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext; +import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollPayload; +import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponse; +import org.apache.iotdb.rpc.subscription.payload.poll.TerminationPayload; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +/** + * The {@link SubscriptionEventSingleResponse} class represents a single subscription event response + * that wraps a cached {@link SubscriptionPollResponse}. The actual payload of the response can be + * either a {@link TerminationPayload} or an {@link ErrorPayload}. + */ +public class SubscriptionEventSingleResponse + implements SubscriptionEventResponse { + + private final CachedSubscriptionPollResponse response; + + public SubscriptionEventSingleResponse( + final short responseType, + final SubscriptionPollPayload payload, + final SubscriptionCommitContext commitContext) { + this.response = new CachedSubscriptionPollResponse(responseType, payload, commitContext); + } + + public SubscriptionEventSingleResponse(final SubscriptionPollResponse response) { + this.response = new CachedSubscriptionPollResponse(response); + } + + @Override + public CachedSubscriptionPollResponse getCurrentResponse() { + return response; + } + + @Override + public void prefetchRemainingResponses() { + // do nothing + } + + @Override + public void fetchNextResponse() { + // do nothing + } + + @Override + public void trySerializeCurrentResponse() { + SubscriptionPollResponseCache.getInstance().trySerialize(response); + } + + @Override + public void trySerializeRemainingResponses() { + // do nothing + } + + @Override + public ByteBuffer getCurrentResponseByteBuffer() throws IOException { + return SubscriptionPollResponseCache.getInstance().serialize(response); + } + + @Override + public void invalidateCurrentResponseByteBuffer() { + SubscriptionPollResponseCache.getInstance().invalidate(response); + } + + @Override + public void nack() { + invalidateCurrentResponseByteBuffer(); + } + + @Override + public void cleanUp() { + invalidateCurrentResponseByteBuffer(); + } + + @Override + public boolean isCommittable() { + return true; + } + + /////////////////////////////// stringify /////////////////////////////// + + @Override + public String toString() { + return "SubscriptionEventSingleResponse" + coreReportMessage(); + } + + protected Map coreReportMessage() { + final Map result = new HashMap<>(); + final CachedSubscriptionPollResponse currentResponse = getCurrentResponse(); + result.put( + "currentResponse", + Objects.nonNull(currentResponse) ? currentResponse.toString() : ""); + return result; + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventTabletResponse.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventTabletResponse.java new file mode 100644 index 000000000000..4fd13517891c --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventTabletResponse.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.subscription.event.response; + +import org.apache.iotdb.commons.subscription.config.SubscriptionConfig; +import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil; +import org.apache.iotdb.db.subscription.event.batch.SubscriptionPipeTabletEventBatch; +import org.apache.iotdb.db.subscription.event.cache.CachedSubscriptionPollResponse; +import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext; +import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponseType; +import org.apache.iotdb.rpc.subscription.payload.poll.TabletsPayload; + +import org.apache.tsfile.write.record.Tablet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +/** + * The {@code SubscriptionEventTabletResponse} class extends {@link + * SubscriptionEventExtendableResponse} to handle subscription responses specifically for tablet + * data. The actual payload of the response includes a {@link TabletsPayload}, which contains the + * tablet information being processed. + */ +public class SubscriptionEventTabletResponse extends SubscriptionEventExtendableResponse { + + private static final Logger LOGGER = + LoggerFactory.getLogger(SubscriptionEventTabletResponse.class); + + private static final long READ_TABLET_BUFFER_SIZE = + SubscriptionConfig.getInstance().getSubscriptionReadTabletBufferSize(); + + private final SubscriptionPipeTabletEventBatch batch; + private final SubscriptionCommitContext commitContext; + + private volatile LinkedList tablets; + private volatile int tabletsSize; + private final AtomicInteger nextOffset = new AtomicInteger(0); + + public SubscriptionEventTabletResponse( + final SubscriptionPipeTabletEventBatch batch, final SubscriptionCommitContext commitContext) { + this.batch = batch; + this.commitContext = commitContext; + + init(batch); + } + + @Override + public void prefetchRemainingResponses() { + if (hasNoMore) { + return; + } + + offer(generateNextTabletResponse()); + } + + @Override + public void nack() { + cleanUp(); + init(batch); + } + + @Override + public void cleanUp() { + super.cleanUp(); + + tablets = null; + tabletsSize = 0; + nextOffset.set(0); + } + + /////////////////////////////// utility /////////////////////////////// + + private void init(final SubscriptionPipeTabletEventBatch batch) { + if (!isEmpty()) { + LOGGER.warn( + "SubscriptionEventTabletResponse {} is not empty when initializing (broken invariant)", + this); + return; + } + + tablets = batch.moveTablets(); + tabletsSize = tablets.size(); + offer(generateNextTabletResponse()); + } + + private synchronized CachedSubscriptionPollResponse generateNextTabletResponse() { + final List currentTablets = new ArrayList<>(); + final AtomicLong currentTotalBufferSize = new AtomicLong(); + + Tablet currentTablet; + while (!tablets.isEmpty() && Objects.nonNull(currentTablet = tablets.removeFirst())) { + final long bufferSize = PipeMemoryWeightUtil.calculateTabletSizeInBytes(currentTablet); + if (bufferSize > READ_TABLET_BUFFER_SIZE) { + LOGGER.warn("Detect large tablet with {} byte(s).", bufferSize); + tablets.addAll(currentTablets); // re-enqueue previous tablets + currentTablets.clear(); + currentTotalBufferSize.set(0); + return new CachedSubscriptionPollResponse( + SubscriptionPollResponseType.TABLETS.getType(), + new TabletsPayload( + Collections.singletonList(currentTablet), nextOffset.incrementAndGet()), + commitContext); + } + if (currentTotalBufferSize.get() + bufferSize > READ_TABLET_BUFFER_SIZE) { + final CachedSubscriptionPollResponse response = + new CachedSubscriptionPollResponse( + SubscriptionPollResponseType.TABLETS.getType(), + new TabletsPayload(new ArrayList<>(currentTablets), nextOffset.incrementAndGet()), + commitContext); + tablets.add(currentTablet); // re-enqueue current tablet + currentTablets.clear(); + currentTotalBufferSize.set(0); + return response; + } + currentTablets.add(currentTablet); + currentTotalBufferSize.addAndGet(bufferSize); + } + + final CachedSubscriptionPollResponse response; + if (currentTablets.isEmpty()) { + response = + new CachedSubscriptionPollResponse( + SubscriptionPollResponseType.TABLETS.getType(), + new TabletsPayload(Collections.emptyList(), -tabletsSize), + commitContext); + hasNoMore = true; + } else { + response = + new CachedSubscriptionPollResponse( + SubscriptionPollResponseType.TABLETS.getType(), + new TabletsPayload(new ArrayList<>(currentTablets), nextOffset.incrementAndGet()), + commitContext); + } + currentTablets.clear(); + currentTotalBufferSize.set(0); + return response; + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventTsFileResponse.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventTsFileResponse.java new file mode 100644 index 000000000000..01b6611dd06e --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventTsFileResponse.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.subscription.event.response; + +import org.apache.iotdb.commons.subscription.config.SubscriptionConfig; +import org.apache.iotdb.db.subscription.event.cache.CachedSubscriptionPollResponse; +import org.apache.iotdb.rpc.subscription.payload.poll.FileInitPayload; +import org.apache.iotdb.rpc.subscription.payload.poll.FilePiecePayload; +import org.apache.iotdb.rpc.subscription.payload.poll.FileSealPayload; +import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext; +import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollPayload; +import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponse; +import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponseType; + +import org.checkerframework.checker.nullness.qual.NonNull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.util.Arrays; +import java.util.Objects; +import java.util.Optional; + +/** + * The {@code SubscriptionEventTsFileResponse} class extends {@link + * SubscriptionEventExtendableResponse} to manage subscription responses related to time series + * files. The actual payload can include {@link FileInitPayload}, {@link FilePiecePayload}, and + * {@link FileSealPayload}, allowing for detailed control over file data streaming. + */ +public class SubscriptionEventTsFileResponse extends SubscriptionEventExtendableResponse { + + private static final Logger LOGGER = + LoggerFactory.getLogger(SubscriptionEventTsFileResponse.class); + + private final File tsFile; + private final SubscriptionCommitContext commitContext; + + public SubscriptionEventTsFileResponse( + final File tsFile, final SubscriptionCommitContext commitContext) { + super(); + + this.tsFile = tsFile; + this.commitContext = commitContext; + + init(); + } + + @Override + public void prefetchRemainingResponses() throws IOException { + if (hasNoMore) { + return; + } + + generateNextTsFileResponse().ifPresent(super::offer); + } + + @Override + public void nack() { + cleanUp(); + init(); + } + + @Override + public void cleanUp() { + super.cleanUp(); + } + + /////////////////////////////// utility /////////////////////////////// + + private void init() { + if (!isEmpty()) { + LOGGER.warn( + "SubscriptionEventTsFileResponse {} is not empty when initializing (broken invariant)", + this); + return; + } + + offer( + new CachedSubscriptionPollResponse( + SubscriptionPollResponseType.FILE_INIT.getType(), + new FileInitPayload(tsFile.getName()), + commitContext)); + } + + private synchronized Optional generateNextTsFileResponse() + throws IOException { + final SubscriptionPollResponse previousResponse = peekLast(); + if (Objects.isNull(previousResponse)) { + LOGGER.warn( + "SubscriptionEventTsFileResponse {} is empty when generating next response (broken invariant)", + this); + return Optional.empty(); + } + final short responseType = previousResponse.getResponseType(); + final SubscriptionPollPayload payload = previousResponse.getPayload(); + if (!SubscriptionPollResponseType.isValidatedResponseType(responseType)) { + LOGGER.warn("unexpected response type: {}", responseType); + return Optional.empty(); + } + + switch (SubscriptionPollResponseType.valueOf(responseType)) { + case FILE_INIT: + return Optional.of(generateResponseWithPieceOrSealPayload(0)); + case FILE_PIECE: + return Optional.of( + generateResponseWithPieceOrSealPayload( + ((FilePiecePayload) payload).getNextWritingOffset())); + case FILE_SEAL: + // not need to prefetch + break; + default: + LOGGER.warn("unexpected message type: {}", responseType); + } + + return Optional.empty(); + } + + private @NonNull CachedSubscriptionPollResponse generateResponseWithPieceOrSealPayload( + final long writingOffset) throws IOException { + final long readFileBufferSize = + SubscriptionConfig.getInstance().getSubscriptionReadFileBufferSize(); + final byte[] readBuffer = new byte[(int) readFileBufferSize]; + try (final RandomAccessFile reader = new RandomAccessFile(tsFile, "r")) { + while (true) { + reader.seek(writingOffset); + final int readLength = reader.read(readBuffer); + if (readLength == -1) { + break; + } + + final byte[] filePiece = + readLength == readFileBufferSize + ? readBuffer + : Arrays.copyOfRange(readBuffer, 0, readLength); + + // generate subscription poll response with piece payload + return new CachedSubscriptionPollResponse( + SubscriptionPollResponseType.FILE_PIECE.getType(), + new FilePiecePayload(tsFile.getName(), writingOffset + readLength, filePiece), + commitContext); + } + + // generate subscription poll response with seal payload + hasNoMore = true; + return new CachedSubscriptionPollResponse( + SubscriptionPollResponseType.FILE_SEAL.getType(), + new FileSealPayload(tsFile.getName(), tsFile.length()), + commitContext); + } + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java index 97401c73808c..d6962d225436 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java @@ -421,7 +421,7 @@ private TPipeSubscribeResp handlePipeSubscribePoll(final PipeSubscribePollReq re SubscriptionPrefetchingQueue.generatePrefetchingQueueId( commitContext.getConsumerGroupId(), commitContext.getTopicName()), size); - event.resetResponseByteBuffer(false); + event.invalidateCurrentResponseByteBuffer(); LOGGER.info( "Subscription: consumer {} poll {} successfully with request: {}", consumerConfig, diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/subscription/SubscriptionStatesTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/subscription/SubscriptionStatesTest.java index 492d53f2256c..f381d873cf31 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/subscription/SubscriptionStatesTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/subscription/SubscriptionStatesTest.java @@ -21,7 +21,6 @@ import org.apache.iotdb.db.subscription.broker.SubscriptionStates; import org.apache.iotdb.db.subscription.event.SubscriptionEvent; -import org.apache.iotdb.db.subscription.event.pipe.SubscriptionPipeEmptyEvent; import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext; import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponse; import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponseType; @@ -54,28 +53,24 @@ public void testFilter() { final SubscriptionEvent event1 = new SubscriptionEvent( - new SubscriptionPipeEmptyEvent(), new SubscriptionPollResponse( SubscriptionPollResponseType.TABLETS.getType(), payload, new SubscriptionCommitContext(-1, -1, "topic1", "cg1", 0))); final SubscriptionEvent event2 = new SubscriptionEvent( - new SubscriptionPipeEmptyEvent(), new SubscriptionPollResponse( SubscriptionPollResponseType.TABLETS.getType(), payload, new SubscriptionCommitContext(-1, -1, "topic2", "cg1", 0))); final SubscriptionEvent event3 = new SubscriptionEvent( - new SubscriptionPipeEmptyEvent(), new SubscriptionPollResponse( SubscriptionPollResponseType.TABLETS.getType(), payload, new SubscriptionCommitContext(-1, -1, "topic3", "cg1", 0))); final SubscriptionEvent event4 = new SubscriptionEvent( - new SubscriptionPipeEmptyEvent(), new SubscriptionPollResponse( SubscriptionPollResponseType.TABLETS.getType(), payload,