Skip to content

Commit

Permalink
Pipe: incorporate batch data into TsFileInsertionEventScanParser memo…
Browse files Browse the repository at this point in the history
…ry control & Subscription: close data container in tsfile event & bind memory block for tablet response & generate empty tablet as initial response & offer subsequent tablet response before ack & expose prefetch and backdoor configs & best-effort disorder control (#14752)
  • Loading branch information
VGalaxies authored Jan 24, 2025
1 parent 7af1b52 commit 93e83c6
Show file tree
Hide file tree
Showing 27 changed files with 669 additions and 277 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,17 @@ public Map<String, String> getAttributesWithSourceLooseRangeOrStrict() {
}
}

public Map<String, String> getAttributesWithSourcePrefix() {
final Map<String, String> attributesWithProcessorPrefix = new HashMap<>();
attributes.forEach(
(key, value) -> {
if (key.toLowerCase().startsWith("source")) {
attributesWithProcessorPrefix.put(key, value);
}
});
return attributesWithProcessorPrefix;
}

/////////////////////////////// processor attributes mapping ///////////////////////////////

public Map<String, String> getAttributesWithProcessorPrefix() {
Expand All @@ -185,4 +196,15 @@ public Map<String, String> getAttributesWithProcessorPrefix() {
public Map<String, String> getAttributesWithSinkFormat() {
return SINK_HYBRID_FORMAT_CONFIG; // default to hybrid
}

public Map<String, String> getAttributesWithSinkPrefix() {
final Map<String, String> attributesWithProcessorPrefix = new HashMap<>();
attributes.forEach(
(key, value) -> {
if (key.toLowerCase().startsWith("sink")) {
attributesWithProcessorPrefix.put(key, value);
}
});
return attributesWithProcessorPrefix;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,14 @@ public class TabletsPayload implements SubscriptionPollPayload {
private transient List<Tablet> tablets = new ArrayList<>();

/**
* The field to be filled in the next {@link PollTabletsPayload} request. If negative, it
* indicates all tablets have been fetched, and -nextOffset represents the total number of
* tablets.
* The field to be filled in the next {@link PollTabletsPayload} request.
*
* <ul>
* <li>If nextOffset is 1, it indicates that the current payload is the first payload (its
* tablets are empty) and the fetching should continue.
* <li>If nextOffset is negative, it indicates all tablets have been fetched, and -nextOffset
* represents the total number of tablets.
* </ul>
*/
private transient int nextOffset;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.tsfile.parser.TsFileInsertionEventParser;
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.exception.PipeException;
Expand Down Expand Up @@ -71,6 +72,7 @@ public class TsFileInsertionEventScanParser extends TsFileInsertionEventParser {

private IChunkReader chunkReader;
private BatchData data;
private final PipeMemoryBlock allocatedMemoryBlockForBatchData;

private boolean currentIsMultiPage;
private IDeviceID currentDevice;
Expand Down Expand Up @@ -101,6 +103,10 @@ public TsFileInsertionEventScanParser(
this.endTime = endTime;
filter = Objects.nonNull(timeFilterExpression) ? timeFilterExpression.getFilter() : null;

// Allocate empty memory block, will be resized later.
this.allocatedMemoryBlockForBatchData =
PipeDataNodeResourceManager.memory().forceAllocateForTabletWithRetry(0);

try {
tsFileSequenceReader = new TsFileSequenceReader(tsFile.getAbsolutePath(), false, false);
tsFileSequenceReader.position((long) TSFileConfig.MAGIC_STRING.getBytes().length + 1);
Expand Down Expand Up @@ -265,6 +271,10 @@ private void prepareData() throws IOException {

do {
data = chunkReader.nextPageData();
PipeDataNodeResourceManager.memory()
.forceResize(
allocatedMemoryBlockForBatchData,
PipeMemoryWeightUtil.calculateBatchDataRamBytesUsed(data));
} while (!data.hasCurrent() && chunkReader.hasNextSatisfiedPage());
} while (!data.hasCurrent());
}
Expand Down Expand Up @@ -484,4 +494,13 @@ private boolean recordAlignedChunk(final List<Chunk> valueChunkList, final byte
}
return false;
}

@Override
public void close() {
super.close();

if (allocatedMemoryBlockForBatchData != null) {
allocatedMemoryBlockForBatchData.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -251,4 +251,52 @@ public static long calculateTabletSizeInBytes(Tablet tablet) {

return totalSizeInBytes;
}

public static int calculateBatchDataRamBytesUsed(BatchData batchData) {
int totalSizeInBytes = 0;

// timestamp
totalSizeInBytes += 8;

// values
final TSDataType type = batchData.getDataType();
if (type != null) {
if (type == TSDataType.VECTOR && batchData.getVector() != null) {
for (int i = 0; i < batchData.getVector().length; ++i) {
final TsPrimitiveType primitiveType = batchData.getVector()[i];
if (primitiveType == null || primitiveType.getDataType() == null) {
continue;
}
// consider variable references (plus 8) and memory alignment (round up to 8)
totalSizeInBytes += roundUpToMultiple(primitiveType.getSize() + 8, 8);
}
} else {
if (type.isBinary()) {
final Binary binary = batchData.getBinary();
// refer to org.apache.tsfile.utils.TsPrimitiveType.TsBinary.getSize
totalSizeInBytes +=
roundUpToMultiple((binary == null ? 8 : binary.getLength() + 8) + 8, 8);
} else {
totalSizeInBytes += roundUpToMultiple(TsPrimitiveType.getByType(type).getSize() + 8, 8);
}
}
}

return batchData.length() * totalSizeInBytes;
}

/**
* Rounds up the given integer num to the nearest multiple of n.
*
* @param num The integer to be rounded up.
* @param n The specified multiple.
* @return The nearest multiple of n greater than or equal to num.
*/
private static int roundUpToMultiple(int num, int n) {
if (n == 0) {
throw new IllegalArgumentException("The multiple n must be greater than 0");
}
// Calculate the rounded up value to the nearest multiple of n
return ((num + n - 1) / n) * n;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
Expand All @@ -68,7 +68,7 @@ public abstract class SubscriptionPrefetchingQueue {
private final AtomicLong commitIdGenerator;

/** A queue containing a series of prefetched pollable {@link SubscriptionEvent}. */
protected final LinkedBlockingQueue<SubscriptionEvent> prefetchingQueue;
protected final PriorityBlockingQueue<SubscriptionEvent> prefetchingQueue;

/**
* A map that tracks in-flight {@link SubscriptionEvent}, keyed by consumer id and commit context.
Expand Down Expand Up @@ -112,7 +112,7 @@ public SubscriptionPrefetchingQueue(
this.inputPendingQueue = inputPendingQueue;
this.commitIdGenerator = commitIdGenerator;

this.prefetchingQueue = new LinkedBlockingQueue<>();
this.prefetchingQueue = new PriorityBlockingQueue<>();
this.inFlightEvents = new ConcurrentHashMap<>();
this.batches = new SubscriptionPipeEventBatches(this, maxDelayInMs, maxBatchSizeInBytes);

Expand Down Expand Up @@ -259,9 +259,11 @@ public boolean executePrefetch() {
if (isClosed()) {
return false;
}
// TODO: more refined behavior (prefetch/serialize/...) control
if (states.shouldPrefetch()) {
tryPrefetch();
remapInFlightEventsSnapshot(committedCleaner, pollableNacker, responsePrefetcher);
remapInFlightEventsSnapshot(
committedCleaner, pollableNacker, responsePrefetcher, responseSerializer);
return true;
} else {
remapInFlightEventsSnapshot(committedCleaner, pollableNacker);
Expand All @@ -286,9 +288,18 @@ private final void remapInFlightEventsSnapshot(
}
}

protected void enqueueEventToPrefetchingQueue(final SubscriptionEvent event) {
event.trySerializeCurrentResponse();
prefetchingQueue.add(event);
public void prefetchEvent(@NonNull final SubscriptionEvent thisEvent) {
final SubscriptionEvent thatEvent = prefetchingQueue.peek();
if (Objects.nonNull(thatEvent)) {
if (thisEvent.compareTo(thatEvent) < 0) {
// disorder causes:
// 1. prefetch nacked event
// 2. late cross-event of dataset payload
states.markDisorderCause();
}
}

prefetchingQueue.add(thisEvent);
}

/**
Expand Down Expand Up @@ -376,14 +387,14 @@ private void tryPrefetch() {
* @return {@code true} if there are subscription events prefetched.
*/
protected boolean onEvent(final TabletInsertionEvent event) {
return batches.onEvent((EnrichedEvent) event, this::enqueueEventToPrefetchingQueue);
return batches.onEvent((EnrichedEvent) event, this::prefetchEvent);
}

/**
* @return {@code true} if there are subscription events prefetched.
*/
protected boolean onEvent() {
return batches.onEvent(this::enqueueEventToPrefetchingQueue);
return batches.onEvent(this::prefetchEvent);
}

/////////////////////////////// commit ///////////////////////////////
Expand Down Expand Up @@ -449,7 +460,7 @@ private boolean ackInternal(
this);
}

ev.ack(this::enqueueEventToPrefetchingQueue);
ev.ack();
ev.recordCommittedTimestamp(); // now committed
acked.set(true);

Expand Down Expand Up @@ -655,12 +666,12 @@ private interface RemappingFunction<V> {
(ev) -> {
if (ev.eagerlyPollable()) {
ev.nack(); // now pollable (the nack operation here is actually unnecessary)
enqueueEventToPrefetchingQueue(ev);
prefetchEvent(ev);
// no need to log warn for eagerly pollable event
return null; // remove this entry
} else if (ev.pollable()) {
ev.nack(); // now pollable
enqueueEventToPrefetchingQueue(ev);
prefetchEvent(ev);
LOGGER.warn(
"Subscription: SubscriptionPrefetchingQueue {} recycle event {} from in flight events, nack and enqueue it to prefetching queue",
this,
Expand All @@ -672,9 +683,19 @@ private interface RemappingFunction<V> {

private final RemappingFunction<SubscriptionEvent> responsePrefetcher =
(ev) -> {
// prefetch and serialize the remaining responses
// prefetch the remaining responses
try {
ev.prefetchRemainingResponses();
} catch (final Exception ignored) {
}
return ev;
};

private final RemappingFunction<SubscriptionEvent> responseSerializer =
(ev) -> {
// serialize the responses
try {
ev.trySerializeCurrentResponse();
ev.trySerializeRemainingResponses();
} catch (final Exception ignored) {
}
Expand Down
Loading

0 comments on commit 93e83c6

Please sign in to comment.