From 2eb060d185441e8595998eb5c53d8ffc0b646fa2 Mon Sep 17 00:00:00 2001 From: Zhenyu Luo Date: Thu, 24 Oct 2024 12:06:08 +0800 Subject: [PATCH 1/3] Pipe: Fix the Pipe receiver's LoadFile and InsertRowsStatement NPE issues (#13871) --- .../request/PipeTransferTabletBinaryReqV2.java | 13 +++++++++++++ .../request/PipeTransferTabletInsertNodeReqV2.java | 13 +++++++++++++ .../iotdb/db/protocol/session/SessionManager.java | 4 ++-- 3 files changed, 28 insertions(+), 2 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletBinaryReqV2.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletBinaryReqV2.java index d3a241dd8806..8b23c3d93921 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletBinaryReqV2.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletBinaryReqV2.java @@ -27,6 +27,8 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode; import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement; +import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowStatement; +import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement; import org.apache.tsfile.utils.PublicBAOS; import org.apache.tsfile.utils.ReadWriteIOUtils; @@ -34,6 +36,7 @@ import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.List; import java.util.Objects; public class PipeTransferTabletBinaryReqV2 extends PipeTransferTabletBinaryReq { @@ -71,6 +74,16 @@ public InsertBaseStatement constructStatement() { // Table model statement.setWriteToTable(true); + if (statement instanceof InsertRowsStatement) { + List rowStatements = + ((InsertRowsStatement) statement).getInsertRowStatementList(); + if (rowStatements != null && !rowStatements.isEmpty()) { + for (InsertRowStatement insertRowStatement : rowStatements) { + insertRowStatement.setWriteToTable(true); + insertRowStatement.setDatabaseName(dataBaseName); + } + } + } statement.setDatabaseName(dataBaseName); return statement; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletInsertNodeReqV2.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletInsertNodeReqV2.java index 8a130cf4da6d..25c2fcaa844e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletInsertNodeReqV2.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletInsertNodeReqV2.java @@ -28,6 +28,8 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode; import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement; +import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowStatement; +import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement; import org.apache.tsfile.utils.PublicBAOS; import org.apache.tsfile.utils.ReadWriteIOUtils; @@ -35,6 +37,7 @@ import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.List; import java.util.Objects; public class PipeTransferTabletInsertNodeReqV2 extends PipeTransferTabletInsertNodeReq { @@ -71,6 +74,16 @@ public InsertBaseStatement constructStatement() { // Table model statement.setWriteToTable(true); + if (statement instanceof InsertRowsStatement) { + List rowStatements = + ((InsertRowsStatement) statement).getInsertRowStatementList(); + if (rowStatements != null && !rowStatements.isEmpty()) { + for (InsertRowStatement insertRowStatement : rowStatements) { + insertRowStatement.setWriteToTable(true); + insertRowStatement.setDatabaseName(dataBaseName); + } + } + } statement.setDatabaseName(dataBaseName); return statement; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/SessionManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/SessionManager.java index e255b76621a6..5681affe8cf8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/SessionManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/SessionManager.java @@ -387,7 +387,7 @@ public SessionInfo getSessionInfoOfTableModel(IClientSession session) { return new SessionInfo( session.getId(), session.getUsername(), - session.getZoneId(), + ZoneId.systemDefault(), session.getClientVersion(), session.getDatabaseName(), IClientSession.SqlDialect.TABLE); @@ -397,7 +397,7 @@ public SessionInfo getSessionInfoOfPipeReceiver(IClientSession session, String d return new SessionInfo( session.getId(), session.getUsername(), - session.getZoneId(), + ZoneId.systemDefault(), session.getClientVersion(), databaseName, IClientSession.SqlDialect.TABLE); From dbb99bc88dea50c6effffae4081abbba1ea5f76a Mon Sep 17 00:00:00 2001 From: Steve Yurong Su Date: Thu, 24 Oct 2024 12:29:42 +0800 Subject: [PATCH 2/3] Load: Parallelly load files into different target data partitions (#13893) --- .../storageengine/load/LoadTsFileManager.java | 57 +++++++++++++------ 1 file changed, 40 insertions(+), 17 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java index 775dad1df467..f785d0992ebd 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java @@ -61,8 +61,11 @@ import java.nio.file.DirectoryNotEmptyException; import java.nio.file.Files; import java.nio.file.Path; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; @@ -452,27 +455,47 @@ private void loadAll(boolean isGeneratedByPipe, ProgressIndex progressIndex) if (isClosed) { throw new IOException(String.format(MESSAGE_WRITER_MANAGER_HAS_BEEN_CLOSED, taskDir)); } + for (Map.Entry entry : dataPartition2ModificationFile.entrySet()) { entry.getValue().close(); } - for (Map.Entry entry : dataPartition2Writer.entrySet()) { - TsFileIOWriter writer = entry.getValue(); - if (writer.isWritingChunkGroup()) { - writer.endChunkGroup(); - } - writer.endFile(); - - DataRegion dataRegion = entry.getKey().getDataRegion(); - dataRegion.loadNewTsFile(generateResource(writer, progressIndex), true, isGeneratedByPipe); - - // Metrics - dataRegion - .getNonSystemDatabaseName() - .ifPresent( - databaseName -> - updateWritePointCountMetrics( - dataRegion, databaseName, getTsFileWritePointCount(writer), false)); + + final List> dataPartition2WriterList = + new ArrayList<>(dataPartition2Writer.entrySet()); + Collections.shuffle(dataPartition2WriterList); + + final AtomicReference exception = new AtomicReference<>(); + dataPartition2WriterList.parallelStream() + .forEach( + entry -> { + try { + final TsFileIOWriter writer = entry.getValue(); + if (writer.isWritingChunkGroup()) { + writer.endChunkGroup(); + } + writer.endFile(); + + final DataRegion dataRegion = entry.getKey().getDataRegion(); + dataRegion.loadNewTsFile( + generateResource(writer, progressIndex), true, isGeneratedByPipe); + + // Metrics + dataRegion + .getNonSystemDatabaseName() + .ifPresent( + databaseName -> + updateWritePointCountMetrics( + dataRegion, + databaseName, + getTsFileWritePointCount(writer), + false)); + } catch (final Exception e) { + exception.set(e); + } + }); + if (exception.get() != null) { + throw new LoadFileException(exception.get()); } } From 4e792002b285bd0bd9d8386117c40df974a78afa Mon Sep 17 00:00:00 2001 From: Lin Xintao Date: Thu, 24 Oct 2024 15:35:49 +0800 Subject: [PATCH 3/3] fix replace function error message --- .../queryengine/plan/relational/metadata/TableMetadataImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableMetadataImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableMetadataImpl.java index 7a366cf12c03..ea96580b5b61 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableMetadataImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableMetadataImpl.java @@ -213,7 +213,7 @@ && isNumericType(argumentTypes.get(0)) throw new SemanticException( "Scalar function " + functionName.toLowerCase(Locale.ENGLISH) - + " only supports text or string data type."); + + " only accepts two or three arguments and they must be text or string data type."); } return STRING; } else if (TableBuiltinScalarFunction.SUBSTRING