Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/apache/iotdb into table-t…
Browse files Browse the repository at this point in the history
…ry-fix
  • Loading branch information
Caideyipi committed Oct 28, 2024
2 parents b3a00e8 + ccc44e2 commit eebc6ed
Show file tree
Hide file tree
Showing 24 changed files with 1,024 additions and 509 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public Map<String, String> getAttributesWithTimeRange() {
}

public Map<String, String> getAttributesWithRealtimeMode() {
return REALTIME_STREAM_MODE_CONFIG;
return REALTIME_STREAM_MODE_CONFIG; // default to stream (hybrid)
}

public Map<String, String> getAttributesWithSourceMode() {
Expand Down Expand Up @@ -136,9 +136,6 @@ public Map<String, String> getAttributesWithProcessorPrefix() {
}

public Map<String, String> getAttributesWithSinkFormat() {
return TopicConstant.FORMAT_TS_FILE_HANDLER_VALUE.equalsIgnoreCase(
attributes.getOrDefault(TopicConstant.FORMAT_KEY, TopicConstant.FORMAT_DEFAULT_VALUE))
? Collections.emptyMap()
: SINK_TABLET_FORMAT_CONFIG;
return Collections.emptyMap(); // default to hybrid
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;

public class SubscriptionPollResponse {

Expand Down Expand Up @@ -110,16 +112,18 @@ public static SubscriptionPollResponse deserialize(final ByteBuffer buffer) {
return new SubscriptionPollResponse(responseType, payload, commitContext);
}

/////////////////////////////// object ///////////////////////////////
/////////////////////////////// stringify ///////////////////////////////

@Override
public String toString() {
return "SubscriptionPollResponse{responseType="
+ SubscriptionPollResponseType.valueOf(responseType).toString()
+ ", payload="
+ payload
+ ", commitContext="
+ commitContext
+ "}";
return "SubscriptionPollResponse" + coreReportMessage();
}

protected Map<String, String> coreReportMessage() {
final Map<String, String> result = new HashMap<>();
result.put("responseType", SubscriptionPollResponseType.valueOf(responseType).toString());
result.put("payload", payload.toString());
result.put("commitContext", commitContext.toString());
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,11 @@
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
import org.apache.iotdb.db.subscription.agent.SubscriptionAgent;
import org.apache.iotdb.db.subscription.event.SubscriptionEvent;
import org.apache.iotdb.db.subscription.event.pipe.SubscriptionPipeEmptyEvent;
import org.apache.iotdb.db.subscription.metric.SubscriptionPrefetchingQueueMetrics;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.rpc.subscription.config.TopicConstant;
import org.apache.iotdb.rpc.subscription.exception.SubscriptionException;
import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext;
import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponse;
import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponseType;
import org.apache.iotdb.rpc.subscription.payload.poll.TerminationPayload;

Expand Down Expand Up @@ -103,16 +101,14 @@ public List<SubscriptionEvent> poll(
brokerId);
events.add(
new SubscriptionEvent(
new SubscriptionPipeEmptyEvent(),
new SubscriptionPollResponse(
SubscriptionPollResponseType.TERMINATION.getType(),
new TerminationPayload(),
new SubscriptionCommitContext(
IoTDBDescriptor.getInstance().getConfig().getDataNodeId(),
PipeDataNodeAgent.runtime().getRebootTimes(),
topicName,
brokerId,
INVALID_COMMIT_ID))));
SubscriptionPollResponseType.TERMINATION.getType(),
new TerminationPayload(),
new SubscriptionCommitContext(
IoTDBDescriptor.getInstance().getConfig().getDataNodeId(),
PipeDataNodeAgent.runtime().getRebootTimes(),
topicName,
brokerId,
INVALID_COMMIT_ID)));
continue;
}
// There are two reasons for not printing logs here:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,11 @@
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import org.apache.iotdb.db.subscription.event.SubscriptionEvent;
import org.apache.iotdb.db.subscription.event.batch.SubscriptionPipeEventBatches;
import org.apache.iotdb.db.subscription.event.pipe.SubscriptionPipeEmptyEvent;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
import org.apache.iotdb.rpc.subscription.payload.poll.ErrorPayload;
import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext;
import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponse;
import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponseType;

import com.google.common.collect.ImmutableSet;
Expand Down Expand Up @@ -288,6 +286,7 @@ public void executePrefetchInternal() {
}

protected void enqueueEventToPrefetchingQueue(final SubscriptionEvent event) {
// TODO: consider memory usage
event.trySerializeCurrentResponse();
prefetchingQueue.add(event);
}
Expand Down Expand Up @@ -532,16 +531,14 @@ public SubscriptionCommitContext generateSubscriptionCommitContext() {
protected SubscriptionEvent generateSubscriptionPollErrorResponse(final String errorMessage) {
// consider non-critical by default, meaning the client can retry
return new SubscriptionEvent(
new SubscriptionPipeEmptyEvent(),
new SubscriptionPollResponse(
SubscriptionPollResponseType.ERROR.getType(),
new ErrorPayload(errorMessage, false),
new SubscriptionCommitContext(
IoTDBDescriptor.getInstance().getConfig().getDataNodeId(),
PipeDataNodeAgent.runtime().getRebootTimes(),
topicName,
brokerId,
INVALID_COMMIT_ID)));
SubscriptionPollResponseType.ERROR.getType(),
new ErrorPayload(errorMessage, false),
new SubscriptionCommitContext(
IoTDBDescriptor.getInstance().getConfig().getDataNodeId(),
PipeDataNodeAgent.runtime().getRebootTimes(),
topicName,
brokerId,
INVALID_COMMIT_ID));
}

//////////////////////////// APIs provided for metric framework ////////////////////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.iotdb.db.subscription.broker;

import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.subscription.config.SubscriptionConfig;
import org.apache.iotdb.db.subscription.event.SubscriptionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
Expand Down Expand Up @@ -162,11 +163,7 @@ public SubscriptionEvent pollTablets(

@Override
protected boolean onEvent(final TsFileInsertionEvent event) {
LOGGER.warn(
"Subscription: SubscriptionPrefetchingTabletQueue {} ignore TsFileInsertionEvent {} when prefetching.",
this,
event);
return false;
return batches.onEvent((EnrichedEvent) event, this::enqueueEventToPrefetchingQueue);
}

/////////////////////////////// stringify ///////////////////////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,10 +240,8 @@ protected boolean onEvent(final TsFileInsertionEvent event) {
final SubscriptionEvent ev =
new SubscriptionEvent(
new SubscriptionPipeTsFilePlainEvent((PipeTsFileInsertionEvent) event),
new SubscriptionPollResponse(
SubscriptionPollResponseType.FILE_INIT.getType(),
new FileInitPayload(((PipeTsFileInsertionEvent) event).getTsFile().getName()),
commitContext));
((PipeTsFileInsertionEvent) event).getTsFile(),
commitContext);
super.enqueueEventToPrefetchingQueue(ev);
return true;
}
Expand Down
Loading

0 comments on commit eebc6ed

Please sign in to comment.