Skip to content

Commit

Permalink
Rework auto begin/commit/rollback transaction during query lifetime (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
ray6080 authored Jan 2, 2025
1 parent 672ae95 commit 5ec9fee
Show file tree
Hide file tree
Showing 97 changed files with 617 additions and 543 deletions.
12 changes: 6 additions & 6 deletions extension/fts/src/function/create_fts_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,9 @@ static std::vector<std::string> bindProperties(const catalog::NodeTableCatalogEn

static void validateIndexNotExist(const ClientContext& context, table_id_t tableID,
const std::string& indexName) {
if (context.getCatalog()->containsIndex(context.getTx(), tableID, indexName)) {
if (context.getCatalog()->containsIndex(context.getTransaction(), tableID, indexName)) {
throw BinderException{stringFormat("Index: {} already exists in table: {}.", indexName,
context.getCatalog()->getTableName(context.getTx(), tableID))};
context.getCatalog()->getTableName(context.getTransaction(), tableID))};
}
}

Expand All @@ -79,7 +79,7 @@ static std::unique_ptr<TableFuncBindData> bindFunc(ClientContext* context,
static std::string createStopWordsTableIfNotExists(const ClientContext& context,
const std::string& stopWordsTableName) {
std::string query = "";
if (!context.getCatalog()->containsTable(context.getTx(), stopWordsTableName)) {
if (!context.getCatalog()->containsTable(context.getTransaction(), stopWordsTableName)) {
query += stringFormat("CREATE NODE TABLE `{}` (sw STRING, PRIMARY KEY(sw));",
stopWordsTableName);
for (auto i = 0u; i < FTSExtension::NUM_STOP_WORDS; i++) {
Expand All @@ -96,7 +96,7 @@ std::string createFTSIndexQuery(const ClientContext& context, const TableFuncBin
// statements in a single transaction there.
// Create the tokenize macro.
std::string query = "";
if (!context.getCatalog()->containsMacro(context.getTx(), "tokenize")) {
if (!context.getCatalog()->containsMacro(context.getTransaction(), "tokenize")) {
query += R"(CREATE MACRO tokenize(query) AS
string_split(lower(regexp_replace(
CAST(query as STRING),
Expand Down Expand Up @@ -208,7 +208,7 @@ static offset_t tableFunc(const TableFuncInput& input, TableFuncOutput& /*output
auto docTableName = FTSUtils::getDocsTableName(bindData.tableID, bindData.indexName);
;
auto docTableEntry = context.clientContext->getCatalog()->getTableCatalogEntry(
context.clientContext->getTx(), docTableName);
context.clientContext->getTransaction(), docTableName);
graph::GraphEntry entry{{docTableEntry}, {} /* relTableEntries */};
graph::OnDiskGraph graph(context.clientContext, entry);
auto sharedState = LenComputeSharedState{};
Expand All @@ -217,7 +217,7 @@ static offset_t tableFunc(const TableFuncInput& input, TableFuncOutput& /*output
std::vector<std::string>{CreateFTSFunction::DOC_LEN_PROP_NAME});
auto numDocs = sharedState.numDocs.load();
auto avgDocLen = numDocs == 0 ? 0 : (double)sharedState.totalLen.load() / numDocs;
context.clientContext->getCatalog()->createIndex(context.clientContext->getTx(),
context.clientContext->getCatalog()->createIndex(context.clientContext->getTransaction(),
std::make_unique<FTSIndexCatalogEntry>(bindData.tableID, bindData.indexName, numDocs,
avgDocLen, bindData.ftsConfig));
return 0;
Expand Down
2 changes: 1 addition & 1 deletion extension/fts/src/function/drop_fts_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ std::string dropFTSIndexQuery(const ClientContext& /*context*/, const TableFuncB
static offset_t tableFunc(const TableFuncInput& input, TableFuncOutput& /*output*/) {
auto& ftsBindData = *input.bindData->constPtrCast<FTSBindData>();
auto& context = *input.context;
context.clientContext->getCatalog()->dropIndex(input.context->clientContext->getTx(),
context.clientContext->getCatalog()->dropIndex(input.context->clientContext->getTransaction(),
ftsBindData.tableID, ftsBindData.indexName);
return 0;
}
Expand Down
9 changes: 5 additions & 4 deletions extension/fts/src/function/fts_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@ namespace fts_extension {

catalog::NodeTableCatalogEntry& FTSUtils::bindTable(const std::string& tableName,
main::ClientContext* context, std::string indexName, IndexOperation operation) {
if (!context->getCatalog()->containsTable(context->getTx(), tableName)) {
if (!context->getCatalog()->containsTable(context->getTransaction(), tableName)) {
throw common::BinderException{common::stringFormat("Table {} does not exist.", tableName)};
}
auto tableEntry = context->getCatalog()->getTableCatalogEntry(context->getTx(), tableName);
auto tableEntry =
context->getCatalog()->getTableCatalogEntry(context->getTransaction(), tableName);
if (tableEntry->getTableType() != common::TableType::NODE) {
switch (operation) {
case IndexOperation::CREATE:
Expand All @@ -34,8 +35,8 @@ catalog::NodeTableCatalogEntry& FTSUtils::bindTable(const std::string& tableName

void FTSUtils::validateIndexExistence(const main::ClientContext& context,
common::table_id_t tableID, std::string indexName) {
if (!context.getCatalog()->containsIndex(context.getTx(), tableID, indexName)) {
auto tableName = context.getCatalog()->getTableName(context.getTx(), tableID);
if (!context.getCatalog()->containsIndex(context.getTransaction(), tableID, indexName)) {
auto tableName = context.getCatalog()->getTableName(context.getTransaction(), tableID);
throw common::BinderException{common::stringFormat(
"Table: {} doesn't have an index with name: {}.", tableName, indexName)};
}
Expand Down
20 changes: 11 additions & 9 deletions extension/fts/src/function/query_fts_gds.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ void QFTSAlgorithm::exec(processor::ExecutionContext* executionContext) {

// Do vertex compute to get the terms
auto termsTableEntry = executionContext->clientContext->getCatalog()->getTableCatalogEntry(
executionContext->clientContext->getTx(), termsTableID);
executionContext->clientContext->getTransaction(), termsTableID);
const graph::GraphEntry entry{{termsTableEntry}, {} /* relTableEntries */};
graph::OnDiskGraph graph(executionContext->clientContext, entry);
auto termsComputeSharedState =
Expand All @@ -315,7 +315,7 @@ void QFTSAlgorithm::exec(processor::ExecutionContext* executionContext) {
&termsComputeSharedState.dfs);

auto clientContext = executionContext->clientContext;
auto transaction = clientContext->getTx();
auto transaction = clientContext->getTransaction();
auto catalog = clientContext->getCatalog();
auto mm = clientContext->getMemoryManager();
QFTSState qFTSState = QFTSState{std::move(frontierPair), std::move(edgeCompute), termsTableID};
Expand Down Expand Up @@ -381,17 +381,19 @@ void QFTSAlgorithm::bind(const GDSBindInput& input, main::ClientContext& context
auto& tableEntry =
FTSUtils::bindTable(inputTableName, &context, indexName, FTSUtils::IndexOperation::QUERY);
FTSUtils::validateIndexExistence(context, tableEntry.getTableID(), indexName);
auto& ftsIndexEntry = context.getCatalog()
->getIndex(context.getTx(), tableEntry.getTableID(), indexName)
->constCast<FTSIndexCatalogEntry>();
auto entry = context.getCatalog()->getTableCatalogEntry(context.getTx(), inputTableName);
auto& ftsIndexEntry =
context.getCatalog()
->getIndex(context.getTransaction(), tableEntry.getTableID(), indexName)
->constCast<FTSIndexCatalogEntry>();
auto entry =
context.getCatalog()->getTableCatalogEntry(context.getTransaction(), inputTableName);
auto nodeOutput = bindNodeOutput(input.binder, {entry});

auto termsEntry = context.getCatalog()->getTableCatalogEntry(context.getTx(),
auto termsEntry = context.getCatalog()->getTableCatalogEntry(context.getTransaction(),
FTSUtils::getTermsTableName(tableEntry.getTableID(), indexName));
auto docsEntry = context.getCatalog()->getTableCatalogEntry(context.getTx(),
auto docsEntry = context.getCatalog()->getTableCatalogEntry(context.getTransaction(),
FTSUtils::getDocsTableName(tableEntry.getTableID(), indexName));
auto appearsInEntry = context.getCatalog()->getTableCatalogEntry(context.getTx(),
auto appearsInEntry = context.getCatalog()->getTableCatalogEntry(context.getTransaction(),
FTSUtils::getAppearsInTableName(tableEntry.getTableID(), indexName));
auto graphEntry = graph::GraphEntry({termsEntry, docsEntry}, {appearsInEntry});
bindData = std::make_unique<QFTSGDSBindData>(std::move(terms), std::move(graphEntry),
Expand Down
2 changes: 2 additions & 0 deletions extension/fts/test/test_files/error.test
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ Binder exception: Table: person doesn't have an index with name: personIdx.
Binder exception: CREATE_FTS_INDEX is only supported in auto transaction mode.

-LOG DropFTSInManualTrx
-STATEMENT BEGIN TRANSACTION
---- ok
-STATEMENT CALL DROP_FTS_INDEX('person1', 'contentIdx')
---- error
Binder exception: DROP_FTS_INDEX is only supported in auto transaction mode.
Expand Down
2 changes: 1 addition & 1 deletion extension/httpfs/src/cached_file_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ std::unique_ptr<FileInfo> CachedFileManager::getCachedFileInfo(HTTPFileInfo* htt
}

void CachedFileManager::cleanUP(main::ClientContext* context) {
auto cacheDirForTrx = getCachedDirForTrx(context->getTx()->getID());
auto cacheDirForTrx = getCachedDirForTrx(context->getTransaction()->getID());
vfs->removeFileIfExists(cacheDirForTrx);
}

Expand Down
2 changes: 1 addition & 1 deletion extension/httpfs/src/httpfs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ void HTTPFileInfo::initialize(main::ClientContext* context) {
if (httpConfig.cacheFile) {
auto hfs = fileSystem->ptrCast<HTTPFileSystem>();
cachedFileInfo =
hfs->getCachedFileManager().getCachedFileInfo(this, context->getTx()->getID());
hfs->getCachedFileManager().getCachedFileInfo(this, context->getTransaction()->getID());
return;
}
initMetadata();
Expand Down
2 changes: 1 addition & 1 deletion src/binder/bind/bind_create_macro.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ std::unique_ptr<BoundStatement> Binder::bindCreateMacro(const Statement& stateme
auto& createMacro = ku_dynamic_cast<const CreateMacro&>(statement);
auto macroName = createMacro.getMacroName();
StringUtils::toUpper(macroName);
if (clientContext->getCatalog()->containsMacro(clientContext->getTx(), macroName)) {
if (clientContext->getCatalog()->containsMacro(clientContext->getTransaction(), macroName)) {
throw BinderException{stringFormat("Macro {} already exists.", macroName)};
}
parser::default_macro_args defaultArgs;
Expand Down
52 changes: 38 additions & 14 deletions src/binder/bind/bind_ddl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,36 @@ static void validateSerialNoDefault(const Expression& expr) {
}
}

// TODO(Ziyi/Xiyang/Guodong): Ideally this function can be removed, and we can make use of
// `implicitCastIfNecessary` to handle this. However, the current approach doesn't keep casted
// boundExpr, instead the ParsedExpression, which is not casted, in PropertyDefinition. We have to
// change PropertyDefinition to store boundExpr to get rid of this function.
static void tryResolvingDataTypeForDefaultNull(const ParsedPropertyDefinition& parsedDefinition,
const LogicalType& type) {
if (parsedDefinition.defaultExpr->getExpressionType() == ExpressionType::LITERAL) {
auto& literalVal =
common::ku_dynamic_cast<ParsedLiteralExpression*>(parsedDefinition.defaultExpr.get())
->getValueUnsafe();
if (literalVal.isNull() &&
literalVal.getDataType().getLogicalTypeID() == LogicalTypeID::ANY) {
literalVal.setDataType(type);
}
KU_ASSERT(literalVal.getDataType().getLogicalTypeID() != LogicalTypeID::ANY);
}
}

std::vector<PropertyDefinition> Binder::bindPropertyDefinitions(
const std::vector<ParsedPropertyDefinition>& parsedDefinitions, const std::string& tableName) {
std::vector<PropertyDefinition> definitions;
for (auto& parsedDefinition : parsedDefinitions) {
auto type = LogicalType::convertFromString(parsedDefinition.getType(), clientContext);
// For default null value, we may need to resolve its data type, as its type may not be
// resolved during parsing and thus may be ANY.
tryResolvingDataTypeForDefaultNull(parsedDefinition, type);
auto expr = parsedDefinition.defaultExpr->copy();
// This will check the type correctness of the default value expression
auto boundExpr = expressionBinder.implicitCastIfNecessary(
expressionBinder.bindExpression(*parsedDefinition.defaultExpr), type);
auto boundExpr =
expressionBinder.implicitCastIfNecessary(expressionBinder.bindExpression(*expr), type);
if (type.getLogicalTypeID() == LogicalTypeID::SERIAL) {
validateSerialNoDefault(*boundExpr);
expr = ParsedExpressionUtils::getSerialDefaultExpr(
Expand Down Expand Up @@ -187,7 +208,8 @@ std::unique_ptr<BoundStatement> Binder::bindCreateTable(const Statement& stateme
auto tableName = createTable->getInfo()->tableName;
switch (createTable->getInfo()->onConflict) {
case common::ConflictAction::ON_CONFLICT_THROW: {
if (clientContext->getCatalog()->containsTable(clientContext->getTx(), tableName)) {
if (clientContext->getCatalog()->containsTable(clientContext->getTransaction(),
tableName)) {
throw BinderException(tableName + " already exists in catalog.");
}
} break;
Expand All @@ -202,7 +224,7 @@ std::unique_ptr<BoundStatement> Binder::bindCreateType(const Statement& statemen
auto createType = statement.constPtrCast<CreateType>();
auto name = createType->getName();
LogicalType type = LogicalType::convertFromString(createType->getDataType(), clientContext);
if (clientContext->getCatalog()->containsType(clientContext->getTx(), name)) {
if (clientContext->getCatalog()->containsType(clientContext->getTransaction(), name)) {
throw BinderException{common::stringFormat("Duplicated type name: {}.", name)};
}
return std::make_unique<BoundCreateType>(std::move(name), std::move(type));
Expand All @@ -218,7 +240,8 @@ std::unique_ptr<BoundStatement> Binder::bindCreateSequence(const Statement& stat
int64_t maxValue = 0;
switch (info.onConflict) {
case common::ConflictAction::ON_CONFLICT_THROW: {
if (clientContext->getCatalog()->containsSequence(clientContext->getTx(), sequenceName)) {
if (clientContext->getCatalog()->containsSequence(clientContext->getTransaction(),
sequenceName)) {
throw BinderException(sequenceName + " already exists in catalog.");
}
} break;
Expand Down Expand Up @@ -274,7 +297,7 @@ void Binder::validateDropTable(const Statement& statement) {
auto& dropTable = statement.constCast<Drop>();
auto tableName = dropTable.getDropInfo().name;
auto catalog = clientContext->getCatalog();
auto validTable = catalog->containsTable(clientContext->getTx(), tableName);
auto validTable = catalog->containsTable(clientContext->getTransaction(), tableName);
if (!validTable) {
switch (dropTable.getDropInfo().conflictAction) {
case common::ConflictAction::ON_CONFLICT_THROW: {
Expand All @@ -287,11 +310,11 @@ void Binder::validateDropTable(const Statement& statement) {
KU_UNREACHABLE;
}
}
auto tableEntry = catalog->getTableCatalogEntry(clientContext->getTx(), tableName);
auto tableEntry = catalog->getTableCatalogEntry(clientContext->getTransaction(), tableName);
switch (tableEntry->getTableType()) {
case TableType::NODE: {
// Check node table is not referenced by rel table.
for (auto& relTableEntry : catalog->getRelTableEntries(clientContext->getTx())) {
for (auto& relTableEntry : catalog->getRelTableEntries(clientContext->getTransaction())) {
if (relTableEntry->isParent(tableEntry->getTableID())) {
throw BinderException(stringFormat("Cannot delete node table {} because it is "
"referenced by relationship table {}.",
Expand All @@ -301,7 +324,8 @@ void Binder::validateDropTable(const Statement& statement) {
} break;
case TableType::REL: {
// Check rel table is not referenced by rel group.
for (auto& relTableGroupEntry : catalog->getRelTableGroupEntries(clientContext->getTx())) {
for (auto& relTableGroupEntry :
catalog->getRelTableGroupEntries(clientContext->getTransaction())) {
if (relTableGroupEntry->isParent(tableEntry->getTableID())) {
throw BinderException(stringFormat("Cannot delete relationship table {} because it "
"is referenced by relationship group {}.",
Expand All @@ -316,7 +340,7 @@ void Binder::validateDropTable(const Statement& statement) {

void Binder::validateDropSequence(const parser::Statement& statement) {
auto& dropSequence = statement.constCast<Drop>();
if (!clientContext->getCatalog()->containsSequence(clientContext->getTx(),
if (!clientContext->getCatalog()->containsSequence(clientContext->getTransaction(),
dropSequence.getDropInfo().name)) {
switch (dropSequence.getDropInfo().conflictAction) {
case common::ConflictAction::ON_CONFLICT_THROW: {
Expand Down Expand Up @@ -379,7 +403,7 @@ std::unique_ptr<BoundStatement> Binder::bindRenameTable(const Statement& stateme
auto newName = extraInfo->newName;
validateTableExist(tableName);
auto catalog = clientContext->getCatalog();
if (catalog->containsTable(clientContext->getTx(), newName)) {
if (catalog->containsTable(clientContext->getTransaction(), newName)) {
throw BinderException("Table: " + newName + " already exists.");
}
auto boundExtraInfo = std::make_unique<BoundExtraRenameTableInfo>(newName);
Expand Down Expand Up @@ -448,7 +472,7 @@ std::unique_ptr<BoundStatement> Binder::bindAddProperty(const Statement& stateme
}
auto propertyName = extraInfo->propertyName;
validateTableExist(tableName);
auto tableEntry = catalog->getTableCatalogEntry(clientContext->getTx(), tableName);
auto tableEntry = catalog->getTableCatalogEntry(clientContext->getTransaction(), tableName);
validatePropertyDDLOnTable(tableEntry, "add");
validatePropertyNotExist(info->onConflict, tableEntry, propertyName);
auto defaultValue = std::move(extraInfo->defaultValue);
Expand Down Expand Up @@ -486,7 +510,7 @@ std::unique_ptr<BoundStatement> Binder::bindDropProperty(const Statement& statem
auto propertyName = extraInfo->propertyName;
validateTableExist(tableName);
auto catalog = clientContext->getCatalog();
auto tableEntry = catalog->getTableCatalogEntry(clientContext->getTx(), tableName);
auto tableEntry = catalog->getTableCatalogEntry(clientContext->getTransaction(), tableName);
validatePropertyDDLOnTable(tableEntry, "drop");
validatePropertyExist(info->onConflict, tableEntry, propertyName);
if (tableEntry->getTableType() == TableType::NODE &&
Expand All @@ -508,7 +532,7 @@ std::unique_ptr<BoundStatement> Binder::bindRenameProperty(const Statement& stat
auto newName = extraInfo->newName;
validateTableExist(tableName);
auto catalog = clientContext->getCatalog();
auto tableSchema = catalog->getTableCatalogEntry(clientContext->getTx(), tableName);
auto tableSchema = catalog->getTableCatalogEntry(clientContext->getTransaction(), tableName);
validatePropertyDDLOnTable(tableSchema, "rename");
validatePropertyExist(common::ConflictAction::ON_CONFLICT_THROW, tableSchema, propertyName);
validatePropertyNotExist(common::ConflictAction::ON_CONFLICT_THROW, tableSchema, newName);
Expand Down
5 changes: 3 additions & 2 deletions src/binder/bind/bind_export_database.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ bool Binder::bindExportTableData(ExportedTableData& tableData, const TableCatalo
if (!bindExportQuery(exportQuery, entry, catalog, tx)) {
return false;
}
auto parsedStatement = Parser::parseQuery(exportQuery, clientContext);
auto parsedStatement = Parser::parseQuery(exportQuery);
KU_ASSERT(parsedStatement.size() == 1);
auto parsedQuery = parsedStatement[0]->constPtrCast<RegularQuery>();
auto query = bindQuery(*parsedQuery);
Expand All @@ -108,7 +108,8 @@ std::unique_ptr<BoundStatement> Binder::bindExportDatabaseClause(const Statement
auto& exportDB = statement.constCast<ExportDB>();
auto boundFilePath =
clientContext->getVFSUnsafe()->expandPath(clientContext, exportDB.getFilePath());
auto exportData = getExportInfo(*clientContext->getCatalog(), clientContext->getTx(), this);
auto exportData =
getExportInfo(*clientContext->getCatalog(), clientContext->getTransaction(), this);
auto parsedOptions = bindParsingOptions(exportDB.getParsingOptionsRef());
auto fileTypeInfo = getFileType(parsedOptions);
switch (fileTypeInfo.fileType) {
Expand Down
Loading

0 comments on commit 5ec9fee

Please sign in to comment.