diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java index 009bae8249eb..d13c7cc43626 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java @@ -254,6 +254,12 @@ public CommonConfig setTimePartitionInterval(long timePartitionInterval) { return this; } + @Override + public CommonConfig setTTLCheckInterval(long ttlCheckInterval) { + setProperty("ttl_check_interval", String.valueOf(ttlCheckInterval)); + return this; + } + @Override public CommonConfig setTimePartitionOrigin(long timePartitionOrigin) { setProperty("time_partition_origin", String.valueOf(timePartitionOrigin)); @@ -381,6 +387,12 @@ public CommonConfig setSeriesSlotNum(int seriesSlotNum) { return this; } + @Override + public CommonConfig setSeriesPartitionExecutorClass(String seriesPartitionExecutorClass) { + setProperty("series_partition_executor_class", seriesPartitionExecutorClass); + return this; + } + @Override public CommonConfig setSchemaMemoryAllocate(String schemaMemoryAllocate) { setProperty("schema_memory_proportion", String.valueOf(schemaMemoryAllocate)); diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java index e456fda73114..b92d9a7fd45d 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java @@ -258,6 +258,13 @@ public CommonConfig setTimePartitionInterval(long timePartitionInterval) { return this; } + @Override + public CommonConfig setTTLCheckInterval(long ttlCheckInterval) { + cnConfig.setTTLCheckInterval(ttlCheckInterval); + dnConfig.setTTLCheckInterval(ttlCheckInterval); + return this; + } + @Override public CommonConfig setTimePartitionOrigin(long timePartitionOrigin) { cnConfig.setTimePartitionOrigin(timePartitionOrigin); @@ -382,6 +389,13 @@ public CommonConfig setSeriesSlotNum(int seriesSlotNum) { return this; } + @Override + public CommonConfig setSeriesPartitionExecutorClass(String seriesPartitionExecutorClass) { + cnConfig.setSeriesPartitionExecutorClass(seriesPartitionExecutorClass); + dnConfig.setSeriesPartitionExecutorClass(seriesPartitionExecutorClass); + return this; + } + @Override public CommonConfig setSchemaMemoryAllocate(String schemaMemoryAllocate) { dnConfig.setSchemaMemoryAllocate(schemaMemoryAllocate); diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java index 45c15e4e7a31..afb398a66794 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java @@ -185,6 +185,11 @@ public CommonConfig setTimePartitionInterval(long timePartitionInterval) { return this; } + @Override + public CommonConfig setTTLCheckInterval(long ttlCheckInterval) { + return this; + } + @Override public CommonConfig setTimePartitionOrigin(long timePartitionOrigin) { return this; @@ -269,6 +274,11 @@ public CommonConfig setSeriesSlotNum(int seriesSlotNum) { return this; } + @Override + public CommonConfig setSeriesPartitionExecutorClass(String seriesPartitionExecutorClass) { + return this; + } + @Override public CommonConfig setSchemaMemoryAllocate(String schemaMemoryAllocate) { return this; diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java index 57767167827c..0030d827b061 100644 --- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java @@ -88,6 +88,8 @@ public interface CommonConfig { CommonConfig setTimePartitionInterval(long timePartitionInterval); + CommonConfig setTTLCheckInterval(long ttlCheckInterval); + CommonConfig setTimePartitionOrigin(long timePartitionOrigin); CommonConfig setTimestampPrecision(String timestampPrecision); @@ -122,6 +124,8 @@ CommonConfig setEnableAutoLeaderBalanceForIoTConsensus( CommonConfig setSeriesSlotNum(int seriesSlotNum); + CommonConfig setSeriesPartitionExecutorClass(String seriesPartitionExecutorClass); + CommonConfig setSchemaMemoryAllocate(String schemaMemoryAllocate); CommonConfig setWriteMemoryProportion(String writeMemoryProportion); diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionTableAutoCleanTest.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionTableAutoCleanTest.java new file mode 100644 index 000000000000..a958cb9b48c3 --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionTableAutoCleanTest.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.it.partition; + +import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot; +import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient; +import org.apache.iotdb.commons.utils.TimePartitionUtils; +import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq; +import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp; +import org.apache.iotdb.it.env.EnvFactory; +import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.ClusterIT; +import org.apache.iotdb.rpc.TSStatusCode; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import java.sql.Connection; +import java.sql.Statement; +import java.util.TreeMap; +import java.util.concurrent.TimeUnit; + +@RunWith(IoTDBTestRunner.class) +@Category({ClusterIT.class}) +public class IoTDBPartitionTableAutoCleanTest { + + private static final int TEST_REPLICATION_FACTOR = 1; + private static final long TEST_TIME_PARTITION_INTERVAL = 604800000; + private static final long TEST_TTL_CHECK_INTERVAL = 5_000; + + private static final TTimePartitionSlot TEST_CURRENT_TIME_SLOT = + TimePartitionUtils.getCurrentTimePartitionSlot(); + private static final long TEST_TTL = 7 * TEST_TIME_PARTITION_INTERVAL; + + @Before + public void setUp() throws Exception { + EnvFactory.getEnv() + .getConfig() + .getCommonConfig() + .setSchemaReplicationFactor(TEST_REPLICATION_FACTOR) + .setDataReplicationFactor(TEST_REPLICATION_FACTOR) + .setTimePartitionInterval(TEST_TIME_PARTITION_INTERVAL) + .setTTLCheckInterval(TEST_TTL_CHECK_INTERVAL); + + // Init 1C1D environment + EnvFactory.getEnv().initClusterEnvironment(1, 1); + } + + @After + public void tearDown() { + EnvFactory.getEnv().cleanClusterEnvironment(); + } + + @Test + public void testAutoCleanPartitionTable() throws Exception { + try (Connection connection = EnvFactory.getEnv().getConnection(); + Statement statement = connection.createStatement()) { + // Create db1 + statement.execute("CREATE DATABASE root.db1"); + statement.execute("CREATE TIMESERIES root.db1.s WITH DATATYPE=INT64,ENCODING=PLAIN"); + // Insert expired data + statement.execute( + String.format( + "INSERT INTO root.db1(timestamp, s) VALUES (%d, %d)", + TEST_CURRENT_TIME_SLOT.getStartTime() - TEST_TTL * 2, -1)); + // Insert existed data + statement.execute( + String.format( + "INSERT INTO root.db1(timestamp, s) VALUES (%d, %d)", + TEST_CURRENT_TIME_SLOT.getStartTime(), 1)); + // Let db.TTL > device.TTL, the valid TTL should be the bigger one + statement.execute("SET TTL TO root.db1 " + TEST_TTL); + statement.execute("SET TTL TO root.db1.s " + 10); + // Create db2 + statement.execute("CREATE DATABASE root.db2"); + statement.execute("CREATE TIMESERIES root.db2.s WITH DATATYPE=INT64,ENCODING=PLAIN"); + // Insert expired data + statement.execute( + String.format( + "INSERT INTO root.db2(timestamp, s) VALUES (%d, %d)", + TEST_CURRENT_TIME_SLOT.getStartTime() - TEST_TTL * 2, -1)); + // Insert existed data + statement.execute( + String.format( + "INSERT INTO root.db2(timestamp, s) VALUES (%d, %d)", + TEST_CURRENT_TIME_SLOT.getStartTime(), 1)); + // Let db.TTL < device.TTL, the valid TTL should be the bigger one + statement.execute("SET TTL TO root.db2 " + 10); + statement.execute("SET TTL TO root.db2.s " + TEST_TTL); + } + + TDataPartitionReq req = new TDataPartitionReq(); + req.putToPartitionSlotsMap("root.db1", new TreeMap<>()); + req.putToPartitionSlotsMap("root.db2", new TreeMap<>()); + try (SyncConfigNodeIServiceClient client = + (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) { + for (int retry = 0; retry < 120; retry++) { + boolean partitionTableAutoCleaned = true; + TDataPartitionTableResp resp = client.getDataPartitionTable(req); + if (TSStatusCode.SUCCESS_STATUS.getStatusCode() == resp.getStatus().getCode()) { + partitionTableAutoCleaned = + resp.getDataPartitionTable().entrySet().stream() + .flatMap(e1 -> e1.getValue().entrySet().stream()) + .allMatch(e2 -> e2.getValue().size() == 1); + } + if (partitionTableAutoCleaned) { + return; + } + TimeUnit.SECONDS.sleep(1); + } + } + Assert.fail("The PartitionTable in the ConfigNode is not auto cleaned!"); + } +} diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java index d1de403421b6..787f196ff946 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java @@ -56,6 +56,7 @@ import org.apache.iotdb.confignode.consensus.request.write.model.DropModelPlan; import org.apache.iotdb.confignode.consensus.request.write.model.UpdateModelInfoPlan; import org.apache.iotdb.confignode.consensus.request.write.partition.AddRegionLocationPlan; +import org.apache.iotdb.confignode.consensus.request.write.partition.AutoCleanPartitionTablePlan; import org.apache.iotdb.confignode.consensus.request.write.partition.CreateDataPartitionPlan; import org.apache.iotdb.confignode.consensus.request.write.partition.CreateSchemaPartitionPlan; import org.apache.iotdb.confignode.consensus.request.write.partition.RemoveRegionLocationPlan; @@ -246,6 +247,9 @@ public static ConfigPhysicalPlan create(final ByteBuffer buffer) throws IOExcept case CreateDataPartition: plan = new CreateDataPartitionPlan(); break; + case AutoCleanPartitionTable: + plan = new AutoCleanPartitionTablePlan(); + break; case DeleteProcedure: plan = new DeleteProcedurePlan(); break; diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java index 0f09b7f3c326..6d19699fdf43 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java @@ -80,6 +80,7 @@ public enum ConfigPhysicalPlanType { CreateDataPartition((short) 404), GetOrCreateDataPartition((short) 405), GetNodePathsPartition((short) 406), + AutoCleanPartitionTable((short) 407), /** Procedure. */ UpdateProcedure((short) 500), diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/partition/AutoCleanPartitionTablePlan.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/partition/AutoCleanPartitionTablePlan.java new file mode 100644 index 000000000000..1b1bfbe38f02 --- /dev/null +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/partition/AutoCleanPartitionTablePlan.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.consensus.request.write.partition; + +import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot; +import org.apache.iotdb.commons.utils.BasicStructureSerDeUtil; +import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils; +import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan; +import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Map; +import java.util.Objects; +import java.util.TreeMap; + +public class AutoCleanPartitionTablePlan extends ConfigPhysicalPlan { + + Map databaseTTLMap; + TTimePartitionSlot currentTimeSlot; + + public AutoCleanPartitionTablePlan() { + super(ConfigPhysicalPlanType.AutoCleanPartitionTable); + } + + public AutoCleanPartitionTablePlan( + Map databaseTTLMap, TTimePartitionSlot currentTimeSlot) { + this(); + this.databaseTTLMap = databaseTTLMap; + this.currentTimeSlot = currentTimeSlot; + } + + public Map getDatabaseTTLMap() { + return databaseTTLMap; + } + + public TTimePartitionSlot getCurrentTimeSlot() { + return currentTimeSlot; + } + + @Override + protected void serializeImpl(DataOutputStream stream) throws IOException { + stream.writeShort(getType().getPlanType()); + stream.writeInt(databaseTTLMap.size()); + for (Map.Entry entry : databaseTTLMap.entrySet()) { + BasicStructureSerDeUtil.write(entry.getKey(), stream); + stream.writeLong(entry.getValue()); + } + ThriftCommonsSerDeUtils.serializeTTimePartitionSlot(currentTimeSlot, stream); + } + + @Override + protected void deserializeImpl(ByteBuffer buffer) throws IOException { + int size = buffer.getInt(); + databaseTTLMap = new TreeMap<>(); + for (int i = 0; i < size; i++) { + String key = BasicStructureSerDeUtil.readString(buffer); + long value = buffer.getLong(); + databaseTTLMap.put(key, value); + } + currentTimeSlot = ThriftCommonsSerDeUtils.deserializeTTimePartitionSlot(buffer); + } + + @Override + public boolean equals(Object o) { + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + AutoCleanPartitionTablePlan that = (AutoCleanPartitionTablePlan) o; + return Objects.equals(databaseTTLMap, that.databaseTTLMap) + && Objects.equals(currentTimeSlot, that.currentTimeSlot); + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), databaseTTLMap, currentTimeSlot); + } +} diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java index 967d51fbf044..2b2acd14019b 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java @@ -51,6 +51,7 @@ import org.apache.iotdb.confignode.consensus.request.write.region.CreateRegionGroupsPlan; import org.apache.iotdb.confignode.manager.partition.PartitionManager; import org.apache.iotdb.confignode.persistence.ProcedureInfo; +import org.apache.iotdb.confignode.procedure.PartitionTableAutoCleaner; import org.apache.iotdb.confignode.procedure.Procedure; import org.apache.iotdb.confignode.procedure.ProcedureExecutor; import org.apache.iotdb.confignode.procedure.ProcedureMetrics; @@ -186,6 +187,8 @@ public class ProcedureManager { private final long planSizeLimit; private ProcedureMetrics procedureMetrics; + private final PartitionTableAutoCleaner partitionTableCleaner; + private final ReentrantLock tableLock = new ReentrantLock(); public ProcedureManager(ConfigManager configManager, ProcedureInfo procedureInfo) { @@ -200,6 +203,7 @@ public ProcedureManager(ConfigManager configManager, ProcedureInfo procedureInfo .getConfigNodeRatisConsensusLogAppenderBufferSize() - IoTDBConstant.RAFT_LOG_BASIC_SIZE; this.procedureMetrics = new ProcedureMetrics(this); + this.partitionTableCleaner = new PartitionTableAutoCleaner<>(configManager); } public void startExecutor() { @@ -209,6 +213,7 @@ public void startExecutor() { executor.startCompletedCleaner( CONFIG_NODE_CONFIG.getProcedureCompletedCleanInterval(), CONFIG_NODE_CONFIG.getProcedureCompletedEvictTTL()); + executor.addInternalProcedure(partitionTableCleaner); store.start(); LOGGER.info("ProcedureManager is started successfully."); } @@ -222,6 +227,7 @@ public void stopExecutor() { store.stop(); LOGGER.info("ProcedureManager is stopped successfully."); } + executor.removeInternalProcedure(partitionTableCleaner); } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/TTLManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/TTLManager.java index be33d2cb128e..110d68a02c6c 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/TTLManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/TTLManager.java @@ -129,6 +129,17 @@ public int getTTLCount() { return ttlInfo.getTTLCount(); } + /** + * Get the maximum ttl of the subtree of the corresponding database. + * + * @param database the path of the database. + * @return the maximum ttl of the subtree of the corresponding database. return NULL_TTL if the + * TTL is not set or the database does not exist. + */ + public long getDatabaseMaxTTL(String database) { + return ttlInfo.getDatabaseMaxTTL(database); + } + /** Only used for upgrading from old database-level ttl to device-level ttl. */ public void setTTL(Map databaseTTLMap) throws IllegalPathException { ttlInfo.setTTL(databaseTTLMap); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java index 0fcd8f773475..29afb2c2c1a7 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java @@ -30,6 +30,8 @@ import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; import org.apache.iotdb.commons.concurrent.ThreadName; import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil; +import org.apache.iotdb.commons.conf.CommonConfig; +import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.commons.partition.DataPartitionTable; import org.apache.iotdb.commons.partition.SchemaPartitionTable; import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor; @@ -69,6 +71,7 @@ import org.apache.iotdb.confignode.exception.NotEnoughDataNodeException; import org.apache.iotdb.confignode.manager.IManager; import org.apache.iotdb.confignode.manager.ProcedureManager; +import org.apache.iotdb.confignode.manager.TTLManager; import org.apache.iotdb.confignode.manager.consensus.ConsensusManager; import org.apache.iotdb.confignode.manager.load.LoadManager; import org.apache.iotdb.confignode.manager.node.NodeManager; @@ -124,6 +127,7 @@ public class PartitionManager { CONF.getSchemaRegionGroupExtensionPolicy(); private static final RegionGroupExtensionPolicy DATA_REGION_GROUP_EXTENSION_POLICY = CONF.getDataRegionGroupExtensionPolicy(); + private static final CommonConfig COMMON_CONFIG = CommonDescriptor.getInstance().getConfig(); private final IManager configManager; private final PartitionInfo partitionInfo; @@ -133,15 +137,16 @@ public class PartitionManager { private static final String CONSENSUS_READ_ERROR = "Failed in the read API executing the consensus layer due to: "; - private static final String CONSENSUS_WRITE_ERROR = + public static final String CONSENSUS_WRITE_ERROR = "Failed in the write API executing the consensus layer due to: "; - /** Region cleaner. */ // Monitor for leadership change private final Object scheduleMonitor = new Object(); + /** Region cleaner. */ // Try to delete Regions in every 10s private static final int REGION_MAINTAINER_WORK_INTERVAL = 10; + private final ScheduledExecutorService regionMaintainer; private Future currentRegionMaintainerFuture; @@ -1515,4 +1520,8 @@ private ProcedureManager getProcedureManager() { private NodeManager getNodeManager() { return configManager.getNodeManager(); } + + private TTLManager getTTLManager() { + return configManager.getTTLManager(); + } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/TTLInfo.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/TTLInfo.java index 5f84f08585f3..d4760439d206 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/TTLInfo.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/TTLInfo.java @@ -158,6 +158,22 @@ public int getTTLCount() { } } + /** + * Get the maximum ttl of the subtree of the corresponding database. + * + * @param database the path of the database. + * @return the maximum ttl of the subtree of the corresponding database. return NULL_TTL if the + * TTL is not set or the database does not exist. + */ + public long getDatabaseMaxTTL(String database) { + lock.readLock().lock(); + try { + return ttlCache.getDatabaseMaxTTL(database); + } finally { + lock.readLock().unlock(); + } + } + @Override public boolean processTakeSnapshot(File snapshotDir) throws TException, IOException { File snapshotFile = new File(snapshotDir, SNAPSHOT_FILENAME); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java index 0c3cc49bfea2..fb3a820d9414 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java @@ -90,6 +90,7 @@ import org.apache.iotdb.confignode.consensus.request.write.model.DropModelPlan; import org.apache.iotdb.confignode.consensus.request.write.model.UpdateModelInfoPlan; import org.apache.iotdb.confignode.consensus.request.write.partition.AddRegionLocationPlan; +import org.apache.iotdb.confignode.consensus.request.write.partition.AutoCleanPartitionTablePlan; import org.apache.iotdb.confignode.consensus.request.write.partition.CreateDataPartitionPlan; import org.apache.iotdb.confignode.consensus.request.write.partition.CreateSchemaPartitionPlan; import org.apache.iotdb.confignode.consensus.request.write.partition.RemoveRegionLocationPlan; @@ -442,6 +443,8 @@ public TSStatus executeNonQueryPlan(ConfigPhysicalPlan physicalPlan) return partitionInfo.createSchemaPartition((CreateSchemaPartitionPlan) physicalPlan); case CreateDataPartition: return partitionInfo.createDataPartition((CreateDataPartitionPlan) physicalPlan); + case AutoCleanPartitionTable: + return partitionInfo.autoCleanPartitionTable((AutoCleanPartitionTablePlan) physicalPlan); case UpdateProcedure: return procedureInfo.updateProcedure((UpdateProcedurePlan) physicalPlan); case DeleteProcedure: diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java index bb9e26e9dda3..fda7e2913153 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java @@ -610,6 +610,16 @@ public Map getLastDataAllotTable() { return dataPartitionTable.getLastDataAllotTable(); } + /** + * Remove PartitionTable where the TimeSlot is expired. + * + * @param TTL The Time To Live + * @param currentTimeSlot The current TimeSlot + */ + public void autoCleanPartitionTable(long TTL, TTimePartitionSlot currentTimeSlot) { + dataPartitionTable.autoCleanPartitionTable(TTL, currentTimeSlot); + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java index 492aceb2feca..3fd5adfea46e 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java @@ -42,6 +42,7 @@ import org.apache.iotdb.confignode.consensus.request.write.database.PreDeleteDatabasePlan; import org.apache.iotdb.confignode.consensus.request.write.datanode.UpdateDataNodePlan; import org.apache.iotdb.confignode.consensus.request.write.partition.AddRegionLocationPlan; +import org.apache.iotdb.confignode.consensus.request.write.partition.AutoCleanPartitionTablePlan; import org.apache.iotdb.confignode.consensus.request.write.partition.CreateDataPartitionPlan; import org.apache.iotdb.confignode.consensus.request.write.partition.CreateSchemaPartitionPlan; import org.apache.iotdb.confignode.consensus.request.write.partition.RemoveRegionLocationPlan; @@ -501,6 +502,24 @@ public TSStatus createDataPartition(CreateDataPartitionPlan plan) { return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); } + /** + * Remove PartitionTable where the TimeSlot is expired. + * + * @param plan Including TTL and current TimeSlot + */ + public TSStatus autoCleanPartitionTable(AutoCleanPartitionTablePlan plan) { + plan.getDatabaseTTLMap() + .forEach( + (database, ttl) -> { + if (isDatabaseExisted(database) && 0 < ttl && ttl < Long.MAX_VALUE) { + databasePartitionTables + .get(database) + .autoCleanPartitionTable(ttl, plan.getCurrentTimeSlot()); + } + }); + return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); + } + /** Get SchemaNodeManagementPartition through matched Database. */ public DataSet getSchemaNodeManagementPartition(List matchedDatabases) { SchemaNodeManagementResp schemaNodeManagementResp = new SchemaNodeManagementResp(); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/PartitionTableAutoCleaner.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/PartitionTableAutoCleaner.java new file mode 100644 index 000000000000..4363610f7ca7 --- /dev/null +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/PartitionTableAutoCleaner.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.procedure; + +import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot; +import org.apache.iotdb.commons.conf.CommonConfig; +import org.apache.iotdb.commons.conf.CommonDescriptor; +import org.apache.iotdb.commons.utils.TimePartitionUtils; +import org.apache.iotdb.confignode.consensus.request.write.partition.AutoCleanPartitionTablePlan; +import org.apache.iotdb.confignode.manager.ConfigManager; +import org.apache.iotdb.consensus.exception.ConsensusException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Map; + +import static org.apache.iotdb.confignode.manager.partition.PartitionManager.CONSENSUS_WRITE_ERROR; + +/** A cleaner that automatically deletes the expired mapping within the partition table. */ +public class PartitionTableAutoCleaner extends InternalProcedure { + + private static final Logger LOGGER = LoggerFactory.getLogger(PartitionTableAutoCleaner.class); + + private static final CommonConfig COMMON_CONFIG = CommonDescriptor.getInstance().getConfig(); + private final ConfigManager configManager; + + public PartitionTableAutoCleaner(ConfigManager configManager) { + super(COMMON_CONFIG.getTTLCheckInterval()); + this.configManager = configManager; + } + + @Override + protected void periodicExecute(Env env) { + List databases = configManager.getClusterSchemaManager().getDatabaseNames(null); + Map databaseTTLMap = + configManager.getClusterSchemaManager().getTTLInfoForUpgrading(); + for (String database : databases) { + long subTreeMaxTTL = configManager.getTTLManager().getDatabaseMaxTTL(database); + databaseTTLMap.put( + database, Math.max(subTreeMaxTTL, databaseTTLMap.getOrDefault(database, -1L))); + long databaseTTL = databaseTTLMap.get(database); + if (!configManager.getPartitionManager().isDatabaseExist(database) + || databaseTTL < 0 + || databaseTTL == Long.MAX_VALUE) { + // Remove the entry if the database or the TTL does not exist + databaseTTLMap.remove(database); + } + } + if (!databaseTTLMap.isEmpty()) { + // Only clean the partition table when necessary + TTimePartitionSlot currentTimePartitionSlot = + TimePartitionUtils.getCurrentTimePartitionSlot(); + try { + configManager + .getConsensusManager() + .write(new AutoCleanPartitionTablePlan(databaseTTLMap, currentTimePartitionSlot)); + } catch (ConsensusException e) { + LOGGER.warn(CONSENSUS_WRITE_ERROR, e); + } + } + } +} diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java index 84e2b7f3f3a3..4f2ffdec2ccd 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java @@ -278,7 +278,7 @@ public void startCompletedCleaner(long cleanTimeInterval, long cleanEvictTTL) { new CompletedProcedureRecycler(store, completed, cleanTimeInterval, cleanEvictTTL)); } - private void addInternalProcedure(InternalProcedure interalProcedure) { + public void addInternalProcedure(InternalProcedure interalProcedure) { if (interalProcedure == null) { return; } diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java index a19fce43e612..9561c7ccf8a0 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java @@ -62,6 +62,7 @@ import org.apache.iotdb.commons.trigger.TriggerInformation; import org.apache.iotdb.commons.udf.UDFInformation; import org.apache.iotdb.commons.udf.UDFType; +import org.apache.iotdb.commons.utils.TimePartitionUtils; import org.apache.iotdb.confignode.consensus.request.write.auth.AuthorPlan; import org.apache.iotdb.confignode.consensus.request.write.confignode.ApplyConfigNodePlan; import org.apache.iotdb.confignode.consensus.request.write.confignode.RemoveConfigNodePlan; @@ -85,6 +86,7 @@ import org.apache.iotdb.confignode.consensus.request.write.function.DropTreeModelFunctionPlan; import org.apache.iotdb.confignode.consensus.request.write.function.UpdateFunctionPlan; import org.apache.iotdb.confignode.consensus.request.write.partition.AddRegionLocationPlan; +import org.apache.iotdb.confignode.consensus.request.write.partition.AutoCleanPartitionTablePlan; import org.apache.iotdb.confignode.consensus.request.write.partition.CreateDataPartitionPlan; import org.apache.iotdb.confignode.consensus.request.write.partition.CreateSchemaPartitionPlan; import org.apache.iotdb.confignode.consensus.request.write.partition.RemoveRegionLocationPlan; @@ -179,6 +181,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.TreeMap; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -446,6 +449,22 @@ public void CreateDataPartitionPlanTest() throws IOException { Assert.assertEquals(req0, req1); } + @Test + public void AutoCleanPartitionTablePlan() throws IOException { + Map databaseTTLMap = new TreeMap<>(); + databaseTTLMap.put("root.db1", -1L); // NULL_TTL + databaseTTLMap.put("root.db2", 3600L * 1000 * 24); // 1d_TTL + databaseTTLMap.put("root.db3", 3600L * 1000 * 24 * 30); // 1m_TTL + TTimePartitionSlot currentTimeSlot = + new TTimePartitionSlot(TimePartitionUtils.getTimePartitionSlot(System.currentTimeMillis())); + AutoCleanPartitionTablePlan req0 = + new AutoCleanPartitionTablePlan(databaseTTLMap, currentTimeSlot); + AutoCleanPartitionTablePlan req1 = + (AutoCleanPartitionTablePlan) + ConfigPhysicalPlan.Factory.create(req0.serializeToByteBuffer()); + Assert.assertEquals(req0, req1); + } + @Test public void AuthorPlanTest() throws IOException, IllegalPathException { AuthorPlan req0; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index ccf2adb4ca97..ec6884eeccfb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -542,9 +542,6 @@ public class IoTDBConfig { /** The interval of compaction task schedulation in each virtual database. The unit is ms. */ private long compactionScheduleIntervalInMs = 60_000L; - /** The interval of ttl check task in each database. The unit is ms. Default is 2 hours. */ - private long ttlCheckInterval = 7_200_000L; - /** The number of threads to be set up to check ttl. */ private int ttlCheckerNum = 1; @@ -3061,18 +3058,10 @@ public void setCompactionScheduleIntervalInMs(long compactionScheduleIntervalInM this.compactionScheduleIntervalInMs = compactionScheduleIntervalInMs; } - public long getTTlCheckInterval() { - return ttlCheckInterval; - } - public int getTTlCheckerNum() { return ttlCheckerNum; } - public void setTtlCheckInterval(long ttlCheckInterval) { - this.ttlCheckInterval = ttlCheckInterval; - } - public long getMaxExpiredTime() { return maxExpiredTime; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java index 5982de8c9e83..2d567c401c30 100755 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java @@ -734,11 +734,6 @@ public void loadProperties(TrimProperties properties) throws BadNodeUrlException "inner_compaction_task_selection_mods_file_threshold", Long.toString(conf.getInnerCompactionTaskSelectionModsFileThreshold())))); - conf.setTtlCheckInterval( - Long.parseLong( - properties.getProperty( - "ttl_check_interval", Long.toString(conf.getTTlCheckInterval())))); - conf.setMaxExpiredTime( Long.parseLong( properties.getProperty("max_expired_time", Long.toString(conf.getMaxExpiredTime())))); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/TTLScheduleTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/TTLScheduleTask.java index d7757bb3ff95..393a9f6d2dc6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/TTLScheduleTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/TTLScheduleTask.java @@ -19,8 +19,8 @@ package org.apache.iotdb.db.storageengine.dataregion.compaction.schedule; +import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.commons.conf.IoTDBConstant; -import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.storageengine.StorageEngine; import org.apache.iotdb.db.storageengine.dataregion.DataRegion; @@ -39,7 +39,7 @@ public class TTLScheduleTask implements Callable { private final int workerNum; private final long ttlCheckInterval = - IoTDBDescriptor.getInstance().getConfig().getTTlCheckInterval(); + CommonDescriptor.getInstance().getConfig().getTTLCheckInterval(); public TTLScheduleTask(List dataRegionList, int workerId, int workerNum) { this.dataRegionList = dataRegionList; diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java index 2ce7124917e2..bb0fb08c1b85 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java @@ -133,6 +133,9 @@ public class CommonConfig { /** The maximum number of TTL rules stored in the system, the default is 1000. */ private int ttlRuleCapacity = 1000; + /** The interval of ttl check task in each database. The unit is ms. Default is 2 hours. */ + private long ttlCheckInterval = 7_200_000L; + /** Thrift socket and connection timeout between data node and config node. */ private int cnConnectionTimeoutInMS = (int) TimeUnit.SECONDS.toMillis(60); @@ -504,6 +507,14 @@ public void setTTlRuleCapacity(int ttlRuleCapacity) { this.ttlRuleCapacity = ttlRuleCapacity; } + public long getTTLCheckInterval() { + return ttlCheckInterval; + } + + public void setTTLCheckInterval(long ttlCheckInterval) { + this.ttlCheckInterval = ttlCheckInterval; + } + public int getCnConnectionTimeoutInMS() { return cnConnectionTimeoutInMS; } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java index 1026ed6aa0ac..ee9516ee7c85 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java @@ -104,6 +104,11 @@ public void loadCommonProps(TrimProperties properties) throws IOException { } config.setTierTTLInMs(tierTTL); + config.setTTLCheckInterval( + Long.parseLong( + properties.getProperty( + "ttl_check_interval", Long.toString(config.getTTLCheckInterval())))); + config.setSyncDir(properties.getProperty("dn_sync_dir", config.getSyncDir()).trim()); config.setWalDirs( diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionTable.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionTable.java index 958f1432cc58..64e1233daec7 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionTable.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionTable.java @@ -264,6 +264,18 @@ public Map getLastDataAllotTable() { return result; } + /** + * Remove PartitionTable where the TimeSlot is expired. + * + * @param TTL The Time To Live + * @param currentTimeSlot The current TimeSlot + */ + public void autoCleanPartitionTable(long TTL, TTimePartitionSlot currentTimeSlot) { + dataPartitionMap.forEach( + (seriesPartitionSlot, seriesPartitionTable) -> + seriesPartitionTable.autoCleanPartitionTable(TTL, currentTimeSlot)); + } + public void serialize(OutputStream outputStream, TProtocol protocol) throws IOException, TException { ReadWriteIOUtils.write(dataPartitionMap.size(), outputStream); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/SeriesPartitionTable.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/SeriesPartitionTable.java index 494da226e83c..579d2fa99eff 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/SeriesPartitionTable.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/SeriesPartitionTable.java @@ -28,6 +28,8 @@ import org.apache.thrift.TException; import org.apache.thrift.protocol.TProtocol; import org.apache.tsfile.utils.ReadWriteIOUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.InputStream; @@ -45,6 +47,8 @@ public class SeriesPartitionTable { + private static final Logger LOGGER = LoggerFactory.getLogger(SeriesPartitionTable.class); + private final ConcurrentSkipListMap> seriesPartitionMap; @@ -235,6 +239,18 @@ public TConsensusGroupId getLastConsensusGroupId() { return lastEntry.getValue().get(lastEntry.getValue().size() - 1); } + /** + * Remove PartitionTable where the TimeSlot is expired. + * + * @param TTL The Time To Live + * @param currentTimeSlot The current TimeSlot + */ + public void autoCleanPartitionTable(long TTL, TTimePartitionSlot currentTimeSlot) { + seriesPartitionMap + .entrySet() + .removeIf(entry -> entry.getKey().getStartTime() + TTL < currentTimeSlot.getStartTime()); + } + public void serialize(OutputStream outputStream, TProtocol protocol) throws IOException, TException { ReadWriteIOUtils.write(seriesPartitionMap.size(), outputStream); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/ttl/TTLCache.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/ttl/TTLCache.java index 468bfe60066e..0adc61ca9af8 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/ttl/TTLCache.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/ttl/TTLCache.java @@ -35,8 +35,10 @@ import java.io.OutputStream; import java.util.ArrayList; import java.util.HashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Queue; /** TTL Cache Tree, which is a prefix B+ tree with each node storing TTL. */ @NotThreadSafe @@ -172,6 +174,31 @@ public long getLastNodeTTL(String[] nodes) { return node.ttl; } + /** + * Get the maximum ttl of the subtree of the corresponding database. + * + * @param database the path of the database. + * @return the maximum ttl of the subtree of the corresponding database. return NULL_TTL if the + * TTL is not set or the database does not exist. + */ + public long getDatabaseMaxTTL(String database) { + CacheNode node = ttlCacheTree.getChild(database); + if (node == null) { + return NULL_TTL; + } + Queue queue = new LinkedList<>(); + queue.add(node); + long maxTTL = node.ttl; + while (!queue.isEmpty()) { + CacheNode current = queue.poll(); + for (CacheNode child : current.getChildren().values()) { + queue.add(child); + maxTTL = Math.max(maxTTL, child.ttl); + } + } + return maxTTL; + } + /** * @return key is path contains wildcard between each node */ diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/TimePartitionUtils.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/TimePartitionUtils.java index 7b331fddaac0..ed01e8a5f3f6 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/TimePartitionUtils.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/TimePartitionUtils.java @@ -34,6 +34,9 @@ public class TimePartitionUtils { private static long timePartitionOrigin = CommonDescriptor.getInstance().getConfig().getTimePartitionOrigin(); + private static String timestampPrecision = + CommonDescriptor.getInstance().getConfig().getTimestampPrecision(); + /** Time range for dividing database, the time unit is the same with IoTDB's TimestampPrecision */ private static long timePartitionInterval = CommonDescriptor.getInstance().getConfig().getTimePartitionInterval(); @@ -71,6 +74,16 @@ public class TimePartitionUtils { } } + public static TTimePartitionSlot getCurrentTimePartitionSlot() { + if ("ms".equals(timestampPrecision)) { + return getTimePartitionSlot(System.currentTimeMillis()); + } else if ("us".equals(timestampPrecision)) { + return getTimePartitionSlot(System.currentTimeMillis() * 1000); + } else { + return getTimePartitionSlot(System.currentTimeMillis() * 1000_000); + } + } + public static TTimePartitionSlot getTimePartitionSlot(long time) { TTimePartitionSlot timePartitionSlot = new TTimePartitionSlot(); timePartitionSlot.setStartTime(getTimePartitionLowerBound(time));