Skip to content

Commit

Permalink
Fix wrong deviceId after PartialPath.concatAsMeasurementPath (apache#…
Browse files Browse the repository at this point in the history
…14750)

* Fix incorrect DeviceId after concatenation

* add it

* revert config
  • Loading branch information
jt2594838 authored Jan 22, 2025
1 parent 1cfd139 commit a681582
Show file tree
Hide file tree
Showing 7 changed files with 285 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,11 @@ public DataNodeConfig setLoadTsFileAnalyzeSchemaMemorySizeInBytes(
String.valueOf(loadTsFileAnalyzeSchemaMemorySizeInBytes));
return this;
}

@Override
public DataNodeConfig setCompactionScheduleInterval(long compactionScheduleInterval) {
properties.setProperty(
"compaction_schedule_interval_in_ms", String.valueOf(compactionScheduleInterval));
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,9 @@ public DataNodeConfig setLoadTsFileAnalyzeSchemaMemorySizeInBytes(
long loadTsFileAnalyzeSchemaMemorySizeInBytes) {
return this;
}

@Override
public DataNodeConfig setCompactionScheduleInterval(long compactionScheduleInterval) {
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,6 @@ public interface DataNodeConfig {

DataNodeConfig setLoadTsFileAnalyzeSchemaMemorySizeInBytes(
long loadTsFileAnalyzeSchemaMemorySizeInBytes);

DataNodeConfig setCompactionScheduleInterval(long compactionScheduleInterval);
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ public static void setUpClass() {
.setPartitionInterval(1000)
.setMemtableSizeThreshold(10000);
// Adjust MemTable threshold size to make it flush automatically
EnvFactory.getEnv().getConfig().getDataNodeConfig().setCompactionScheduleInterval(5000);
EnvFactory.getEnv().initClusterEnvironment();
}

Expand Down Expand Up @@ -994,7 +995,13 @@ public void testConcurrentFlushAndSequentialDeletion()
threadPool.submit(
() ->
write(
writtenPointCounter, threadPool, fileNumMax, pointPerFile, deviceNum, testNum));
writtenPointCounter,
threadPool,
fileNumMax,
pointPerFile,
deviceNum,
testNum,
true));
int deletionRange = 150;
int deletionInterval = 1500;
Future<Void> deletionThread =
Expand Down Expand Up @@ -1041,7 +1048,13 @@ public void testConcurrentFlushAndRandomDeletion()
threadPool.submit(
() ->
write(
writtenPointCounter, threadPool, fileNumMax, pointPerFile, deviceNum, testNum));
writtenPointCounter,
threadPool,
fileNumMax,
pointPerFile,
deviceNum,
testNum,
true));
int deletionRange = 100;
int minIntervalToRecord = 1000;
Future<Void> deletionThread =
Expand Down Expand Up @@ -1094,7 +1107,8 @@ public void testConcurrentFlushAndRandomDeletionWithRestart()
fileNumMax,
pointPerFile,
deviceNum,
testNum));
testNum,
true));
int deletionRange = 100;
int minIntervalToRecord = 1000;
Future<Void> deletionThread =
Expand Down Expand Up @@ -1155,7 +1169,8 @@ private Void write(
int fileNumMax,
int pointPerFile,
int deviceNum,
int testNum)
int testNum,
boolean roundRobinDevice)
throws SQLException {

try (Connection connection = EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT);
Expand All @@ -1165,20 +1180,37 @@ private Void write(
statement.execute("use test");

statement.execute(
"create table if not exists table" + testNum + "(deviceId STRING TAG, s0 INT32 field)");
"create table if not exists table"
+ testNum
+ "(city TAG, deviceId STRING TAG, s0 INT32 field)");

for (int i = 1; i <= fileNumMax; i++) {
for (int j = 0; j < pointPerFile; j++) {
long time = writtenPointCounter.get() + 1;
statement.execute(
String.format(
"INSERT INTO test.table"
+ testNum
+ "(time, deviceId, s0) VALUES(%d,'d"
+ (time % deviceNum)
+ "',%d)",
time,
time));
if (roundRobinDevice) {
statement.execute(
String.format(
"INSERT INTO test.table"
+ testNum
+ "(time, city, deviceId, s0) VALUES(%d, 'bj', 'd"
+ (time % deviceNum)
+ "',%d)",
time,
time));
} else {
for (int d = 0; d < deviceNum; d++) {
statement.execute(
String.format(
"INSERT INTO test.table"
+ testNum
+ "(time, city, deviceId, s0) VALUES(%d, 'bj', 'd"
+ d
+ "',%d)",
time,
time));
}
}

writtenPointCounter.incrementAndGet();
if (Thread.interrupted()) {
return null;
Expand Down Expand Up @@ -1393,6 +1425,144 @@ private Void randomDeletion(
return null;
}

private Void randomDeviceDeletion(
AtomicLong writtenPointCounter,
List<AtomicLong> deviceDeletedPointCounters,
ExecutorService allThreads,
int fileNumMax,
int pointPerFile,
int deletionRange,
int minIntervalToRecord,
int testNum)
throws SQLException, InterruptedException {
// delete random 'deletionRange' points each time
List<List<TimeRange>> allDeviceUndeletedRanges = new ArrayList<>();
for (int i = 0; i < deviceDeletedPointCounters.size(); i++) {
allDeviceUndeletedRanges.add(new ArrayList<>());
}
// pointPerFile * fileNumMax
long deletionEnd = (long) fileNumMax * pointPerFile - 1;
long nextRangeStart = 0;
Random random = new Random();

try (Connection connection = EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT);
Statement statement = connection.createStatement()) {

statement.execute("create database if not exists test");
statement.execute("use test");
while ((writtenPointCounter.get() < deletionEnd
|| allDeviceUndeletedRanges.stream().anyMatch(l -> !l.isEmpty()))
&& !Thread.interrupted()) {
// record the newly inserted interval if it is long enough
for (int i = 0; i < deviceDeletedPointCounters.size(); i++) {
long currentWrittenTime = writtenPointCounter.get();
List<TimeRange> deviceUndeletedRanges = allDeviceUndeletedRanges.get(i);

if (currentWrittenTime - nextRangeStart >= minIntervalToRecord) {
deviceUndeletedRanges.add(new TimeRange(nextRangeStart, currentWrittenTime));
nextRangeStart = currentWrittenTime + 1;
}
if (deviceUndeletedRanges.isEmpty()) {
Thread.sleep(10);
continue;
}
// pick up a random range
int rangeIndex = random.nextInt(deviceUndeletedRanges.size());
TimeRange timeRange = deviceUndeletedRanges.get(rangeIndex);
// delete a random part in the range
LOGGER.debug("Pick up a range [{}, {}]", timeRange.getMin(), timeRange.getMax());
long rangeDeletionStart;
long timeRangeLength = timeRange.getMax() - timeRange.getMin() + 1;
if (timeRangeLength == 1) {
rangeDeletionStart = timeRange.getMin();
} else {
rangeDeletionStart = random.nextInt((int) (timeRangeLength - 1)) + timeRange.getMin();
}
long rangeDeletionEnd = Math.min(rangeDeletionStart + deletionRange, timeRange.getMax());
LOGGER.debug("Deletion range [{}, {}]", rangeDeletionStart, rangeDeletionEnd);

statement.execute(
"delete from test.table"
+ testNum
+ " where time >= "
+ rangeDeletionStart
+ " and time <= "
+ rangeDeletionEnd
+ " and deviceId = 'd"
+ i
+ "'");
deviceDeletedPointCounters.get(i).addAndGet(rangeDeletionEnd - rangeDeletionStart + 1);
LOGGER.debug(
"Deleted range [{}, {}], written points: {}, deleted points: {}",
timeRange.getMin(),
timeRange.getMax(),
currentWrittenTime + 1,
deviceDeletedPointCounters.get(i).get());

// update the range
if (rangeDeletionStart == timeRange.getMin() && rangeDeletionEnd == timeRange.getMax()) {
// range fully deleted
deviceUndeletedRanges.remove(rangeIndex);
} else if (rangeDeletionStart == timeRange.getMin()) {
// prefix deleted
timeRange.setMin(rangeDeletionEnd + 1);
} else if (rangeDeletionEnd == timeRange.getMax()) {
// suffix deleted
timeRange.setMax(rangeDeletionStart - 1);
} else {
// split into two ranges
deviceUndeletedRanges.add(new TimeRange(rangeDeletionEnd + 1, timeRange.getMax()));
timeRange.setMax(rangeDeletionStart - 1);
}

// check the point count
try (ResultSet set =
statement.executeQuery(
"select count(*) from table"
+ testNum
+ " where time <= "
+ currentWrittenTime
+ " AND deviceId = 'd"
+ i
+ "'")) {
assertTrue(set.next());
long expectedCnt = currentWrittenTime + 1 - deviceDeletedPointCounters.get(i).get();
if (expectedCnt != set.getLong(1)) {
allDeviceUndeletedRanges.set(i, mergeRanges(deviceUndeletedRanges));
List<TimeRange> remainingRanges =
collectDataRanges(statement, currentWrittenTime, testNum);
LOGGER.debug("Expected ranges: {}", deviceUndeletedRanges);
LOGGER.debug("Remaining ranges: {}", remainingRanges);
fail(
String.format(
"Inconsistent number of points %d - %d", expectedCnt, set.getLong(1)));
}
}

Thread.sleep(10);
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return null;
} catch (SQLException e) {
if (e.getMessage().contains("Fail to reconnect")) {
// restart triggered, ignore
return null;
} else {
allThreads.shutdownNow();
throw e;
}
} catch (ParallelRequestTimeoutException ignored) {
// restart triggered, ignore
return null;
} catch (Throwable e) {
allThreads.shutdownNow();
throw e;
}
return null;
}

private Void restart(
AtomicLong writtenPointCounter, long targetPointNum, ExecutorService threadPool)
throws InterruptedException, SQLException {
Expand Down Expand Up @@ -1522,6 +1692,68 @@ public void deleteTableOfTheSameNameTest()
}
}

@Test
public void testConcurrentFlushAndRandomDeviceDeletion()
throws InterruptedException, ExecutionException, SQLException {
int testNum = 25;
try (Connection connection = EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT);
Statement statement = connection.createStatement()) {
statement.execute("drop database if exists test");
statement.execute(
"SET CONFIGURATION inner_compaction_task_selection_mods_file_threshold='1024'");
statement.execute("SET CONFIGURATION inner_seq_performer='FAST'");
} catch (Exception ignored) {
// remote mode cannot find the config file during SET CONFIGURATION
}

AtomicLong writtenPointCounter = new AtomicLong(-1);
int fileNumMax = 100;
int pointPerFile = 100;
int deviceNum = 4;
List<AtomicLong> deviceDeletedPointCounters = new ArrayList<>(deviceNum);
for (int i = 0; i < deviceNum; i++) {
deviceDeletedPointCounters.add(new AtomicLong(0));
}

ExecutorService threadPool = Executors.newCachedThreadPool();
Future<Void> writeThread =
threadPool.submit(
() ->
write(
writtenPointCounter,
threadPool,
fileNumMax,
pointPerFile,
deviceNum,
testNum,
false));
int deletionRange = 100;
int minIntervalToRecord = 1000;
Future<Void> deletionThread =
threadPool.submit(
() ->
randomDeviceDeletion(
writtenPointCounter,
deviceDeletedPointCounters,
threadPool,
fileNumMax,
pointPerFile,
deletionRange,
minIntervalToRecord,
testNum));
writeThread.get();
deletionThread.get();
threadPool.shutdown();
boolean success = threadPool.awaitTermination(1, TimeUnit.MINUTES);
assertTrue(success);

try (Connection connection = EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT);
Statement statement = connection.createStatement()) {
statement.execute("drop database if exists test");
statement.execute("SET CONFIGURATION inner_seq_performer='read_chunk'");
}
}

@Ignore("performance")
@Test
public void testDeletionWritePerformance() throws SQLException, IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1426,6 +1426,21 @@ private boolean loadCompactionTaskHotModifiedProps(TrimProperties properties) th
configModified |=
innerUnsequenceCompactionSelector != conf.getInnerUnsequenceCompactionSelector();

conf.setInnerSeqCompactionPerformer(
InnerSeqCompactionPerformer.getInnerSeqCompactionPerformer(
properties.getProperty(
"inner_seq_performer", conf.getInnerSeqCompactionPerformer().toString())));

conf.setInnerUnseqCompactionPerformer(
InnerUnseqCompactionPerformer.getInnerUnseqCompactionPerformer(
properties.getProperty(
"inner_unseq_performer", conf.getInnerUnseqCompactionPerformer().toString())));

conf.setCrossCompactionPerformer(
CrossCompactionPerformer.getCrossCompactionPerformer(
properties.getProperty(
"cross_performer", conf.getCrossCompactionPerformer().toString())));

// update inner_compaction_total_file_size_threshold
long innerCompactionFileSizeThresholdInByte =
conf.getInnerCompactionTotalFileSizeThresholdInByte();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1214,7 +1214,7 @@ enable_auto_repair_compaction=true
cross_selector=rewrite

# the compaction performer of cross space compaction task
# effectiveMode: restart
# effectiveMode: hot_reload
# Options: read_point, fast
cross_performer=fast

Expand All @@ -1224,7 +1224,7 @@ cross_performer=fast
inner_seq_selector=size_tiered_multi_target

# the performer of inner sequence space compaction task
# effectiveMode: restart
# effectiveMode: hot_reload
# Options: read_chunk, fast
inner_seq_performer=read_chunk

Expand All @@ -1234,7 +1234,7 @@ inner_seq_performer=read_chunk
inner_unseq_selector=size_tiered_multi_target

# the performer of inner unsequence space compaction task
# effectiveMode: restart
# effectiveMode: hot_reload
# Options: read_point, fast
inner_unseq_performer=fast

Expand Down
Loading

0 comments on commit a681582

Please sign in to comment.