Skip to content

Commit

Permalink
Let the DataPartitionTable be automatically cleanable (#14737)
Browse files Browse the repository at this point in the history
* seems finished

* Use periodic procedure 4 partition table cleaner

* Update ThreadName.java
  • Loading branch information
CRZbulabula authored Jan 23, 2025
1 parent 7f3b281 commit 3369e7a
Show file tree
Hide file tree
Showing 27 changed files with 542 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,12 @@ public CommonConfig setTimePartitionInterval(long timePartitionInterval) {
return this;
}

@Override
public CommonConfig setTTLCheckInterval(long ttlCheckInterval) {
setProperty("ttl_check_interval", String.valueOf(ttlCheckInterval));
return this;
}

@Override
public CommonConfig setTimePartitionOrigin(long timePartitionOrigin) {
setProperty("time_partition_origin", String.valueOf(timePartitionOrigin));
Expand Down Expand Up @@ -381,6 +387,12 @@ public CommonConfig setSeriesSlotNum(int seriesSlotNum) {
return this;
}

@Override
public CommonConfig setSeriesPartitionExecutorClass(String seriesPartitionExecutorClass) {
setProperty("series_partition_executor_class", seriesPartitionExecutorClass);
return this;
}

@Override
public CommonConfig setSchemaMemoryAllocate(String schemaMemoryAllocate) {
setProperty("schema_memory_proportion", String.valueOf(schemaMemoryAllocate));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,13 @@ public CommonConfig setTimePartitionInterval(long timePartitionInterval) {
return this;
}

@Override
public CommonConfig setTTLCheckInterval(long ttlCheckInterval) {
cnConfig.setTTLCheckInterval(ttlCheckInterval);
dnConfig.setTTLCheckInterval(ttlCheckInterval);
return this;
}

@Override
public CommonConfig setTimePartitionOrigin(long timePartitionOrigin) {
cnConfig.setTimePartitionOrigin(timePartitionOrigin);
Expand Down Expand Up @@ -382,6 +389,13 @@ public CommonConfig setSeriesSlotNum(int seriesSlotNum) {
return this;
}

@Override
public CommonConfig setSeriesPartitionExecutorClass(String seriesPartitionExecutorClass) {
cnConfig.setSeriesPartitionExecutorClass(seriesPartitionExecutorClass);
dnConfig.setSeriesPartitionExecutorClass(seriesPartitionExecutorClass);
return this;
}

@Override
public CommonConfig setSchemaMemoryAllocate(String schemaMemoryAllocate) {
dnConfig.setSchemaMemoryAllocate(schemaMemoryAllocate);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,11 @@ public CommonConfig setTimePartitionInterval(long timePartitionInterval) {
return this;
}

@Override
public CommonConfig setTTLCheckInterval(long ttlCheckInterval) {
return this;
}

@Override
public CommonConfig setTimePartitionOrigin(long timePartitionOrigin) {
return this;
Expand Down Expand Up @@ -269,6 +274,11 @@ public CommonConfig setSeriesSlotNum(int seriesSlotNum) {
return this;
}

@Override
public CommonConfig setSeriesPartitionExecutorClass(String seriesPartitionExecutorClass) {
return this;
}

@Override
public CommonConfig setSchemaMemoryAllocate(String schemaMemoryAllocate) {
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ public interface CommonConfig {

CommonConfig setTimePartitionInterval(long timePartitionInterval);

CommonConfig setTTLCheckInterval(long ttlCheckInterval);

CommonConfig setTimePartitionOrigin(long timePartitionOrigin);

CommonConfig setTimestampPrecision(String timestampPrecision);
Expand Down Expand Up @@ -122,6 +124,8 @@ CommonConfig setEnableAutoLeaderBalanceForIoTConsensus(

CommonConfig setSeriesSlotNum(int seriesSlotNum);

CommonConfig setSeriesPartitionExecutorClass(String seriesPartitionExecutorClass);

CommonConfig setSchemaMemoryAllocate(String schemaMemoryAllocate);

CommonConfig setWriteMemoryProportion(String writeMemoryProportion);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.confignode.it.partition;

import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
import org.apache.iotdb.commons.utils.TimePartitionUtils;
import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq;
import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp;
import org.apache.iotdb.it.env.EnvFactory;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.ClusterIT;
import org.apache.iotdb.rpc.TSStatusCode;

import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;

import java.sql.Connection;
import java.sql.Statement;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;

@RunWith(IoTDBTestRunner.class)
@Category({ClusterIT.class})
public class IoTDBPartitionTableAutoCleanTest {

private static final int TEST_REPLICATION_FACTOR = 1;
private static final long TEST_TIME_PARTITION_INTERVAL = 604800000;
private static final long TEST_TTL_CHECK_INTERVAL = 5_000;

private static final TTimePartitionSlot TEST_CURRENT_TIME_SLOT =
TimePartitionUtils.getCurrentTimePartitionSlot();
private static final long TEST_TTL = 7 * TEST_TIME_PARTITION_INTERVAL;

@Before
public void setUp() throws Exception {
EnvFactory.getEnv()
.getConfig()
.getCommonConfig()
.setSchemaReplicationFactor(TEST_REPLICATION_FACTOR)
.setDataReplicationFactor(TEST_REPLICATION_FACTOR)
.setTimePartitionInterval(TEST_TIME_PARTITION_INTERVAL)
.setTTLCheckInterval(TEST_TTL_CHECK_INTERVAL);

// Init 1C1D environment
EnvFactory.getEnv().initClusterEnvironment(1, 1);
}

@After
public void tearDown() {
EnvFactory.getEnv().cleanClusterEnvironment();
}

@Test
public void testAutoCleanPartitionTable() throws Exception {
try (Connection connection = EnvFactory.getEnv().getConnection();
Statement statement = connection.createStatement()) {
// Create db1
statement.execute("CREATE DATABASE root.db1");
statement.execute("CREATE TIMESERIES root.db1.s WITH DATATYPE=INT64,ENCODING=PLAIN");
// Insert expired data
statement.execute(
String.format(
"INSERT INTO root.db1(timestamp, s) VALUES (%d, %d)",
TEST_CURRENT_TIME_SLOT.getStartTime() - TEST_TTL * 2, -1));
// Insert existed data
statement.execute(
String.format(
"INSERT INTO root.db1(timestamp, s) VALUES (%d, %d)",
TEST_CURRENT_TIME_SLOT.getStartTime(), 1));
// Let db.TTL > device.TTL, the valid TTL should be the bigger one
statement.execute("SET TTL TO root.db1 " + TEST_TTL);
statement.execute("SET TTL TO root.db1.s " + 10);
// Create db2
statement.execute("CREATE DATABASE root.db2");
statement.execute("CREATE TIMESERIES root.db2.s WITH DATATYPE=INT64,ENCODING=PLAIN");
// Insert expired data
statement.execute(
String.format(
"INSERT INTO root.db2(timestamp, s) VALUES (%d, %d)",
TEST_CURRENT_TIME_SLOT.getStartTime() - TEST_TTL * 2, -1));
// Insert existed data
statement.execute(
String.format(
"INSERT INTO root.db2(timestamp, s) VALUES (%d, %d)",
TEST_CURRENT_TIME_SLOT.getStartTime(), 1));
// Let db.TTL < device.TTL, the valid TTL should be the bigger one
statement.execute("SET TTL TO root.db2 " + 10);
statement.execute("SET TTL TO root.db2.s " + TEST_TTL);
}

TDataPartitionReq req = new TDataPartitionReq();
req.putToPartitionSlotsMap("root.db1", new TreeMap<>());
req.putToPartitionSlotsMap("root.db2", new TreeMap<>());
try (SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
for (int retry = 0; retry < 120; retry++) {
boolean partitionTableAutoCleaned = true;
TDataPartitionTableResp resp = client.getDataPartitionTable(req);
if (TSStatusCode.SUCCESS_STATUS.getStatusCode() == resp.getStatus().getCode()) {
partitionTableAutoCleaned =
resp.getDataPartitionTable().entrySet().stream()
.flatMap(e1 -> e1.getValue().entrySet().stream())
.allMatch(e2 -> e2.getValue().size() == 1);
}
if (partitionTableAutoCleaned) {
return;
}
TimeUnit.SECONDS.sleep(1);
}
}
Assert.fail("The PartitionTable in the ConfigNode is not auto cleaned!");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.iotdb.confignode.consensus.request.write.model.DropModelPlan;
import org.apache.iotdb.confignode.consensus.request.write.model.UpdateModelInfoPlan;
import org.apache.iotdb.confignode.consensus.request.write.partition.AddRegionLocationPlan;
import org.apache.iotdb.confignode.consensus.request.write.partition.AutoCleanPartitionTablePlan;
import org.apache.iotdb.confignode.consensus.request.write.partition.CreateDataPartitionPlan;
import org.apache.iotdb.confignode.consensus.request.write.partition.CreateSchemaPartitionPlan;
import org.apache.iotdb.confignode.consensus.request.write.partition.RemoveRegionLocationPlan;
Expand Down Expand Up @@ -246,6 +247,9 @@ public static ConfigPhysicalPlan create(final ByteBuffer buffer) throws IOExcept
case CreateDataPartition:
plan = new CreateDataPartitionPlan();
break;
case AutoCleanPartitionTable:
plan = new AutoCleanPartitionTablePlan();
break;
case DeleteProcedure:
plan = new DeleteProcedurePlan();
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ public enum ConfigPhysicalPlanType {
CreateDataPartition((short) 404),
GetOrCreateDataPartition((short) 405),
GetNodePathsPartition((short) 406),
AutoCleanPartitionTable((short) 407),

/** Procedure. */
UpdateProcedure((short) 500),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.confignode.consensus.request.write.partition;

import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.utils.BasicStructureSerDeUtil;
import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;

import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.Objects;
import java.util.TreeMap;

public class AutoCleanPartitionTablePlan extends ConfigPhysicalPlan {

Map<String, Long> databaseTTLMap;
TTimePartitionSlot currentTimeSlot;

public AutoCleanPartitionTablePlan() {
super(ConfigPhysicalPlanType.AutoCleanPartitionTable);
}

public AutoCleanPartitionTablePlan(
Map<String, Long> databaseTTLMap, TTimePartitionSlot currentTimeSlot) {
this();
this.databaseTTLMap = databaseTTLMap;
this.currentTimeSlot = currentTimeSlot;
}

public Map<String, Long> getDatabaseTTLMap() {
return databaseTTLMap;
}

public TTimePartitionSlot getCurrentTimeSlot() {
return currentTimeSlot;
}

@Override
protected void serializeImpl(DataOutputStream stream) throws IOException {
stream.writeShort(getType().getPlanType());
stream.writeInt(databaseTTLMap.size());
for (Map.Entry<String, Long> entry : databaseTTLMap.entrySet()) {
BasicStructureSerDeUtil.write(entry.getKey(), stream);
stream.writeLong(entry.getValue());
}
ThriftCommonsSerDeUtils.serializeTTimePartitionSlot(currentTimeSlot, stream);
}

@Override
protected void deserializeImpl(ByteBuffer buffer) throws IOException {
int size = buffer.getInt();
databaseTTLMap = new TreeMap<>();
for (int i = 0; i < size; i++) {
String key = BasicStructureSerDeUtil.readString(buffer);
long value = buffer.getLong();
databaseTTLMap.put(key, value);
}
currentTimeSlot = ThriftCommonsSerDeUtils.deserializeTTimePartitionSlot(buffer);
}

@Override
public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) {
return false;
}
if (!super.equals(o)) {
return false;
}
AutoCleanPartitionTablePlan that = (AutoCleanPartitionTablePlan) o;
return Objects.equals(databaseTTLMap, that.databaseTTLMap)
&& Objects.equals(currentTimeSlot, that.currentTimeSlot);
}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), databaseTTLMap, currentTimeSlot);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.iotdb.confignode.consensus.request.write.region.CreateRegionGroupsPlan;
import org.apache.iotdb.confignode.manager.partition.PartitionManager;
import org.apache.iotdb.confignode.persistence.ProcedureInfo;
import org.apache.iotdb.confignode.procedure.PartitionTableAutoCleaner;
import org.apache.iotdb.confignode.procedure.Procedure;
import org.apache.iotdb.confignode.procedure.ProcedureExecutor;
import org.apache.iotdb.confignode.procedure.ProcedureMetrics;
Expand Down Expand Up @@ -186,6 +187,8 @@ public class ProcedureManager {
private final long planSizeLimit;
private ProcedureMetrics procedureMetrics;

private final PartitionTableAutoCleaner partitionTableCleaner;

private final ReentrantLock tableLock = new ReentrantLock();

public ProcedureManager(ConfigManager configManager, ProcedureInfo procedureInfo) {
Expand All @@ -200,6 +203,7 @@ public ProcedureManager(ConfigManager configManager, ProcedureInfo procedureInfo
.getConfigNodeRatisConsensusLogAppenderBufferSize()
- IoTDBConstant.RAFT_LOG_BASIC_SIZE;
this.procedureMetrics = new ProcedureMetrics(this);
this.partitionTableCleaner = new PartitionTableAutoCleaner<>(configManager);
}

public void startExecutor() {
Expand All @@ -209,6 +213,7 @@ public void startExecutor() {
executor.startCompletedCleaner(
CONFIG_NODE_CONFIG.getProcedureCompletedCleanInterval(),
CONFIG_NODE_CONFIG.getProcedureCompletedEvictTTL());
executor.addInternalProcedure(partitionTableCleaner);
store.start();
LOGGER.info("ProcedureManager is started successfully.");
}
Expand All @@ -222,6 +227,7 @@ public void stopExecutor() {
store.stop();
LOGGER.info("ProcedureManager is stopped successfully.");
}
executor.removeInternalProcedure(partitionTableCleaner);
}
}

Expand Down
Loading

0 comments on commit 3369e7a

Please sign in to comment.