Skip to content

Commit

Permalink
Merge branch 'apache:master' into fix-snapshot
Browse files Browse the repository at this point in the history
  • Loading branch information
luoluoyuyu authored Oct 24, 2024
2 parents 5e54f31 + 4e79200 commit 9530f55
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,16 @@
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement;

import org.apache.tsfile.utils.PublicBAOS;
import org.apache.tsfile.utils.ReadWriteIOUtils;

import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Objects;

public class PipeTransferTabletBinaryReqV2 extends PipeTransferTabletBinaryReq {
Expand Down Expand Up @@ -71,6 +74,16 @@ public InsertBaseStatement constructStatement() {

// Table model
statement.setWriteToTable(true);
if (statement instanceof InsertRowsStatement) {
List<InsertRowStatement> rowStatements =
((InsertRowsStatement) statement).getInsertRowStatementList();
if (rowStatements != null && !rowStatements.isEmpty()) {
for (InsertRowStatement insertRowStatement : rowStatements) {
insertRowStatement.setWriteToTable(true);
insertRowStatement.setDatabaseName(dataBaseName);
}
}
}
statement.setDatabaseName(dataBaseName);
return statement;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,16 @@
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement;

import org.apache.tsfile.utils.PublicBAOS;
import org.apache.tsfile.utils.ReadWriteIOUtils;

import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Objects;

public class PipeTransferTabletInsertNodeReqV2 extends PipeTransferTabletInsertNodeReq {
Expand Down Expand Up @@ -71,6 +74,16 @@ public InsertBaseStatement constructStatement() {

// Table model
statement.setWriteToTable(true);
if (statement instanceof InsertRowsStatement) {
List<InsertRowStatement> rowStatements =
((InsertRowsStatement) statement).getInsertRowStatementList();
if (rowStatements != null && !rowStatements.isEmpty()) {
for (InsertRowStatement insertRowStatement : rowStatements) {
insertRowStatement.setWriteToTable(true);
insertRowStatement.setDatabaseName(dataBaseName);
}
}
}
statement.setDatabaseName(dataBaseName);
return statement;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ public SessionInfo getSessionInfoOfTableModel(IClientSession session) {
return new SessionInfo(
session.getId(),
session.getUsername(),
session.getZoneId(),
ZoneId.systemDefault(),
session.getClientVersion(),
session.getDatabaseName(),
IClientSession.SqlDialect.TABLE);
Expand All @@ -397,7 +397,7 @@ public SessionInfo getSessionInfoOfPipeReceiver(IClientSession session, String d
return new SessionInfo(
session.getId(),
session.getUsername(),
session.getZoneId(),
ZoneId.systemDefault(),
session.getClientVersion(),
databaseName,
IClientSession.SqlDialect.TABLE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ && isNumericType(argumentTypes.get(0))
throw new SemanticException(
"Scalar function "
+ functionName.toLowerCase(Locale.ENGLISH)
+ " only supports text or string data type.");
+ " only accepts two or three arguments and they must be text or string data type.");
}
return STRING;
} else if (TableBuiltinScalarFunction.SUBSTRING
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,11 @@
import java.nio.file.DirectoryNotEmptyException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
Expand Down Expand Up @@ -452,27 +455,47 @@ private void loadAll(boolean isGeneratedByPipe, ProgressIndex progressIndex)
if (isClosed) {
throw new IOException(String.format(MESSAGE_WRITER_MANAGER_HAS_BEEN_CLOSED, taskDir));
}

for (Map.Entry<DataPartitionInfo, ModificationFile> entry :
dataPartition2ModificationFile.entrySet()) {
entry.getValue().close();
}
for (Map.Entry<DataPartitionInfo, TsFileIOWriter> entry : dataPartition2Writer.entrySet()) {
TsFileIOWriter writer = entry.getValue();
if (writer.isWritingChunkGroup()) {
writer.endChunkGroup();
}
writer.endFile();

DataRegion dataRegion = entry.getKey().getDataRegion();
dataRegion.loadNewTsFile(generateResource(writer, progressIndex), true, isGeneratedByPipe);

// Metrics
dataRegion
.getNonSystemDatabaseName()
.ifPresent(
databaseName ->
updateWritePointCountMetrics(
dataRegion, databaseName, getTsFileWritePointCount(writer), false));

final List<Map.Entry<DataPartitionInfo, TsFileIOWriter>> dataPartition2WriterList =
new ArrayList<>(dataPartition2Writer.entrySet());
Collections.shuffle(dataPartition2WriterList);

final AtomicReference<Exception> exception = new AtomicReference<>();
dataPartition2WriterList.parallelStream()
.forEach(
entry -> {
try {
final TsFileIOWriter writer = entry.getValue();
if (writer.isWritingChunkGroup()) {
writer.endChunkGroup();
}
writer.endFile();

final DataRegion dataRegion = entry.getKey().getDataRegion();
dataRegion.loadNewTsFile(
generateResource(writer, progressIndex), true, isGeneratedByPipe);

// Metrics
dataRegion
.getNonSystemDatabaseName()
.ifPresent(
databaseName ->
updateWritePointCountMetrics(
dataRegion,
databaseName,
getTsFileWritePointCount(writer),
false));
} catch (final Exception e) {
exception.set(e);
}
});
if (exception.get() != null) {
throw new LoadFileException(exception.get());
}
}

Expand Down

0 comments on commit 9530f55

Please sign in to comment.