Skip to content

Commit

Permalink
Merge branch 'master' into fast-iot-ci
Browse files Browse the repository at this point in the history
  • Loading branch information
VGalaxies committed Oct 24, 2024
2 parents 7bc70f1 + 4e79200 commit 3a14c1b
Show file tree
Hide file tree
Showing 65 changed files with 1,794 additions and 633 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

Expand Down Expand Up @@ -954,7 +955,7 @@ public void testInsertTabletWithTTL()
Assert.assertTrue(record.getFields().get(0).getLongV() > timeLowerBound);
count++;
}
Assert.assertTrue(count > 0 && count < 4);
Assert.assertEquals(2, count);
}
}

Expand Down Expand Up @@ -1041,6 +1042,39 @@ public void testInsertUnsequenceData()
}
}

@Test
public void testInsertAllNullRow() throws SQLException {
try (Connection connection = EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT);
Statement st1 = connection.createStatement()) {
st1.execute("use \"test\"");
st1.execute("create table table5(d1 string id, s1 int32 measurement, s2 int32 measurement)");

st1.execute("insert into table5(time, d1,s1,s2) values(1,'a',1,null)");
// insert all null row
st1.execute("insert into table5(time, d1,s1,s2) values(2,'a',null,null)");

ResultSet rs1 = st1.executeQuery("select * from table5");
assertTrue(rs1.next());
assertEquals("1", rs1.getString("s1"));
assertNull(rs1.getString("s2"));
assertTrue(rs1.next());
assertNull(rs1.getString("s1"));
assertNull(rs1.getString("s2"));
assertFalse(rs1.next());

st1.execute("flush");

rs1 = st1.executeQuery("select * from table5");
assertTrue(rs1.next());
assertEquals("1", rs1.getString("s1"));
assertNull(rs1.getString("s2"));
assertTrue(rs1.next());
assertNull(rs1.getString("s1"));
assertNull(rs1.getString("s2"));
assertFalse(rs1.next());
}
}

private List<Integer> checkHeader(
ResultSetMetaData resultSetMetaData, String expectedHeaderStrings, int[] expectedTypes)
throws SQLException {
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
import org.apache.iotdb.commons.pipe.agent.runtime.PipePeriodicalJobExecutor;
import org.apache.iotdb.commons.pipe.agent.runtime.PipePeriodicalPhantomReferenceCleaner;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
Expand All @@ -49,6 +50,9 @@ public class PipeConfigNodeRuntimeAgent implements IService {
private final PipePeriodicalJobExecutor pipePeriodicalJobExecutor =
new PipePeriodicalJobExecutor();

private final PipePeriodicalPhantomReferenceCleaner pipePeriodicalPhantomReferenceCleaner =
new PipePeriodicalPhantomReferenceCleaner();

@Override
public synchronized void start() {
PipeConfig.getInstance().printAllConfigs();
Expand All @@ -65,6 +69,10 @@ public synchronized void start() {
// Start periodical job executor
pipePeriodicalJobExecutor.start();

if (PipeConfig.getInstance().getPipeEventReferenceTrackingEnabled()) {
pipePeriodicalPhantomReferenceCleaner.start();
}

isShutdown.set(false);
LOGGER.info("PipeRuntimeConfigNodeAgent started");
}
Expand Down Expand Up @@ -159,4 +167,9 @@ private void report(
public void registerPeriodicalJob(String id, Runnable periodicalJob, long intervalInSeconds) {
pipePeriodicalJobExecutor.register(id, periodicalJob, intervalInSeconds);
}

public void registerPhantomReferenceCleanJob(
String id, Runnable periodicalJob, long intervalInSeconds) {
pipePeriodicalPhantomReferenceCleaner.register(id, periodicalJob, intervalInSeconds);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public PipeConfigNodePhantomReferenceManager() {
super();

PipeConfigNodeAgent.runtime()
.registerPeriodicalJob(
.registerPhantomReferenceCleanJob(
"PipePhantomReferenceManager#gcHook()",
// NOTE: lambda CAN NOT be replaced with method reference
() -> super.gcHook(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
import org.apache.iotdb.commons.pipe.agent.runtime.PipePeriodicalJobExecutor;
import org.apache.iotdb.commons.pipe.agent.runtime.PipePeriodicalPhantomReferenceCleaner;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
Expand Down Expand Up @@ -62,6 +63,9 @@ public class PipeDataNodeRuntimeAgent implements IService {
private final PipePeriodicalJobExecutor pipePeriodicalJobExecutor =
new PipePeriodicalJobExecutor();

private final PipePeriodicalPhantomReferenceCleaner pipePeriodicalPhantomReferenceCleaner =
new PipePeriodicalPhantomReferenceCleaner();

//////////////////////////// System Service Interface ////////////////////////////

public synchronized void preparePipeResources(
Expand All @@ -87,6 +91,10 @@ public synchronized void start() throws StartupException {
PipeConfig.getInstance().getPipeStuckRestartIntervalSeconds());
pipePeriodicalJobExecutor.start();

if (PipeConfig.getInstance().getPipeEventReferenceTrackingEnabled()) {
pipePeriodicalPhantomReferenceCleaner.start();
}

isShutdown.set(false);
}

Expand Down Expand Up @@ -225,4 +233,9 @@ public void stopPeriodicalJobExecutor() {
public void clearPeriodicalJobExecutor() {
pipePeriodicalJobExecutor.clear();
}

public void registerPhantomReferenceCleanJob(
String id, Runnable periodicalJob, long intervalInSeconds) {
pipePeriodicalPhantomReferenceCleaner.register(id, periodicalJob, intervalInSeconds);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -118,11 +118,11 @@ protected void createPipeTask(
throws IllegalPathException {
if (pipeTaskMeta.getLeaderNodeId() == CONFIG.getDataNodeId()) {
final PipeParameters extractorParameters = pipeStaticMeta.getExtractorParameters();
final DataRegionId dataRegionId = new DataRegionId(consensusGroupId);
final boolean needConstructDataRegionTask =
StorageEngine.getInstance()
.getAllDataRegionIds()
.contains(new DataRegionId(consensusGroupId))
&& DataRegionListeningFilter.shouldDataRegionBeListened(extractorParameters);
StorageEngine.getInstance().getAllDataRegionIds().contains(dataRegionId)
&& DataRegionListeningFilter.shouldDataRegionBeListened(
extractorParameters, dataRegionId);
final boolean needConstructSchemaRegionTask =
SchemaEngine.getInstance()
.getAllSchemaRegionIds()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,11 @@ public Map<Integer, PipeTask> build() throws IllegalPathException {

if (pipeTaskMeta.getLeaderNodeId() == CONFIG.getDataNodeId()) {
final PipeParameters extractorParameters = pipeStaticMeta.getExtractorParameters();
final DataRegionId dataRegionId = new DataRegionId(consensusGroupId);
final boolean needConstructDataRegionTask =
dataRegionIds.contains(new DataRegionId(consensusGroupId))
&& DataRegionListeningFilter.shouldDataRegionBeListened(extractorParameters);
dataRegionIds.contains(dataRegionId)
&& DataRegionListeningFilter.shouldDataRegionBeListened(
extractorParameters, dataRegionId);
final boolean needConstructSchemaRegionTask =
schemaRegionIds.contains(new SchemaRegionId(consensusGroupId))
&& !SchemaRegionListeningFilter.parseListeningPlanTypeSet(extractorParameters)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,16 @@
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement;

import org.apache.tsfile.utils.PublicBAOS;
import org.apache.tsfile.utils.ReadWriteIOUtils;

import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Objects;

public class PipeTransferTabletBinaryReqV2 extends PipeTransferTabletBinaryReq {
Expand Down Expand Up @@ -71,6 +74,16 @@ public InsertBaseStatement constructStatement() {

// Table model
statement.setWriteToTable(true);
if (statement instanceof InsertRowsStatement) {
List<InsertRowStatement> rowStatements =
((InsertRowsStatement) statement).getInsertRowStatementList();
if (rowStatements != null && !rowStatements.isEmpty()) {
for (InsertRowStatement insertRowStatement : rowStatements) {
insertRowStatement.setWriteToTable(true);
insertRowStatement.setDatabaseName(dataBaseName);
}
}
}
statement.setDatabaseName(dataBaseName);
return statement;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,16 @@
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement;

import org.apache.tsfile.utils.PublicBAOS;
import org.apache.tsfile.utils.ReadWriteIOUtils;

import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Objects;

public class PipeTransferTabletInsertNodeReqV2 extends PipeTransferTabletInsertNodeReq {
Expand Down Expand Up @@ -71,6 +74,16 @@ public InsertBaseStatement constructStatement() {

// Table model
statement.setWriteToTable(true);
if (statement instanceof InsertRowsStatement) {
List<InsertRowStatement> rowStatements =
((InsertRowsStatement) statement).getInsertRowStatementList();
if (rowStatements != null && !rowStatements.isEmpty()) {
for (InsertRowStatement insertRowStatement : rowStatements) {
insertRowStatement.setWriteToTable(true);
insertRowStatement.setDatabaseName(dataBaseName);
}
}
}
statement.setDatabaseName(dataBaseName);
return statement;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ public boolean internallyIncreaseResourceReferenceCount(final String holderMessa
PipeDataNodeResourceManager.wal().pin(walEntryHandler);
if (Objects.nonNull(pipeName)) {
PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
.increaseInsertionEventCount(pipeName + "_" + creationTime);
.increaseTabletEventCount(pipeName + "_" + creationTime);
}
return true;
} catch (final Exception e) {
Expand Down Expand Up @@ -196,7 +196,7 @@ public boolean internallyDecreaseResourceReferenceCount(final String holderMessa
} finally {
if (Objects.nonNull(pipeName)) {
PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
.decreaseInsertionEventCount(pipeName + "_" + creationTime);
.decreaseTabletEventCount(pipeName + "_" + creationTime);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ public boolean internallyIncreaseResourceReferenceCount(final String holderMessa
PipeMemoryWeightUtil.calculateTabletSizeInBytes(tablet));
if (Objects.nonNull(pipeName)) {
PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
.increaseInsertionEventCount(pipeName + "_" + creationTime);
.increaseTabletEventCount(pipeName + "_" + creationTime);
}
return true;
}
Expand All @@ -180,7 +180,7 @@ public boolean internallyIncreaseResourceReferenceCount(final String holderMessa
public boolean internallyDecreaseResourceReferenceCount(final String holderMessage) {
if (Objects.nonNull(pipeName)) {
PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
.decreaseInsertionEventCount(pipeName + "_" + creationTime);
.decreaseTabletEventCount(pipeName + "_" + creationTime);
}
allocatedMemoryBlock.close();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,13 @@

package org.apache.iotdb.db.pipe.extractor.dataregion;

import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.pipe.agent.task.PipeTask;
import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern;
import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern;
import org.apache.iotdb.db.storageengine.StorageEngine;
import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;

Expand Down Expand Up @@ -57,12 +61,35 @@ public class DataRegionListeningFilter {
}
}

public static boolean shouldDataRegionBeListened(PipeParameters parameters)
throws IllegalPathException {
public static boolean shouldDataRegionBeListened(
PipeParameters parameters, DataRegionId dataRegionId) throws IllegalPathException {
final Pair<Boolean, Boolean> insertionDeletionListeningOptionPair =
parseInsertionDeletionListeningOptionPair(parameters);
return insertionDeletionListeningOptionPair.getLeft()
|| insertionDeletionListeningOptionPair.getRight();
final boolean hasSpecificListeningOption =
insertionDeletionListeningOptionPair.getLeft()
|| insertionDeletionListeningOptionPair.getRight();
if (!hasSpecificListeningOption) {
return false;
}

final DataRegion dataRegion = StorageEngine.getInstance().getDataRegion(dataRegionId);
if (dataRegion == null) {
return true;
}

final String databaseRawName = dataRegion.getDatabaseName();
final String databaseTreeModel =
databaseRawName.startsWith("root.") ? databaseRawName : "root." + databaseRawName;
final String databaseTableModel =
databaseRawName.startsWith("root.") ? databaseRawName.substring(5) : databaseRawName;

final TreePattern treePattern = TreePattern.parsePipePatternFromSourceParameters(parameters);
final TablePattern tablePattern = TablePattern.parsePipePatternFromSourceParameters(parameters);

return treePattern.isTreeModelDataAllowedToBeCaptured()
&& treePattern.mayOverlapWithDb(databaseTreeModel)
|| tablePattern.isTableModelDataAllowedToBeCaptured()
&& tablePattern.matchesDatabase(databaseTableModel);
}

public static Pair<Boolean, Boolean> parseInsertionDeletionListeningOptionPair(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,16 +129,16 @@ public void register(final IoTDBSchemaRegionExtractor extractor) {
}
}

public void increaseInsertionEventCount(final String pipeID) {
public void increaseTabletEventCount(final String pipeID) {
remainingEventAndTimeOperatorMap
.computeIfAbsent(pipeID, k -> new PipeDataNodeRemainingEventAndTimeOperator())
.increaseInsertionEventCount();
.increaseTabletEventCount();
}

public void decreaseInsertionEventCount(final String pipeID) {
public void decreaseTabletEventCount(final String pipeID) {
remainingEventAndTimeOperatorMap
.computeIfAbsent(pipeID, k -> new PipeDataNodeRemainingEventAndTimeOperator())
.decreaseInsertionEventCount();
.decreaseTabletEventCount();
}

public void increaseTsFileEventCount(final String pipeID) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class PipeDataNodeRemainingEventAndTimeOperator extends PipeRemainingOperator {
private final Set<IoTDBSchemaRegionExtractor> schemaRegionExtractors =
Collections.newSetFromMap(new ConcurrentHashMap<>());

private final AtomicInteger insertionEventCount = new AtomicInteger(0);
private final AtomicInteger tabletEventCount = new AtomicInteger(0);
private final AtomicInteger tsfileEventCount = new AtomicInteger(0);
private final AtomicInteger heartbeatEventCount = new AtomicInteger(0);

Expand All @@ -58,12 +58,12 @@ class PipeDataNodeRemainingEventAndTimeOperator extends PipeRemainingOperator {

//////////////////////////// Remaining event & time calculation ////////////////////////////

void increaseInsertionEventCount() {
tsfileEventCount.incrementAndGet();
void increaseTabletEventCount() {
tabletEventCount.incrementAndGet();
}

void decreaseInsertionEventCount() {
tsfileEventCount.decrementAndGet();
void decreaseTabletEventCount() {
tabletEventCount.decrementAndGet();
}

void increaseTsFileEventCount() {
Expand All @@ -84,6 +84,7 @@ void decreaseHeartbeatEventCount() {

long getRemainingEvents() {
return tsfileEventCount.get()
+ tabletEventCount.get()
+ heartbeatEventCount.get()
+ schemaRegionExtractors.stream()
.map(IoTDBSchemaRegionExtractor::getUnTransferredEventCount)
Expand All @@ -105,7 +106,7 @@ long getRemainingEvents() {
final double invocationValue = collectInvocationHistogram.getMean();
// Do not take heartbeat event into account
final double totalDataRegionWriteEventCount =
tsfileEventCount.get() * Math.max(invocationValue, 1) + insertionEventCount.get();
tsfileEventCount.get() * Math.max(invocationValue, 1) + tabletEventCount.get();

dataRegionCommitMeter.updateAndGet(
meter -> {
Expand Down
Loading

0 comments on commit 3a14c1b

Please sign in to comment.