Skip to content
This repository has been archived by the owner on Apr 1, 2023. It is now read-only.

add logic to maintain cluster level information, namespace info, and table info in catalog manager #56

Merged
merged 10 commits into from
Dec 9, 2020
25 changes: 25 additions & 0 deletions src/k2/connector/yb/pggate/catalog/sql_catalog_entity.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ Copyright(c) 2020 Futurewei Cloud

#include "yb/pggate/catalog/sql_catalog_entity.h"

#include <glog/logging.h>

namespace k2pg {
namespace sql {
namespace catalog {
Expand All @@ -37,6 +39,29 @@ ClusterInfo::ClusterInfo(string cluster_id, uint64_t catalog_version, bool initd
ClusterInfo::~ClusterInfo() {
};

SessionTransactionContext::SessionTransactionContext(std::shared_ptr<K23SITxn> txn) {
txn_ = txn;
finished = false;
}

SessionTransactionContext::~SessionTransactionContext() {
if (!finished) {
// abort the transaction if it has been committed or aborted
EndTransaction(false);
finished = true;
}
}

void SessionTransactionContext::EndTransaction(bool should_commit) {
std::future<k2::EndResult> txn_result_future = txn_->endTxn(should_commit);
k2::EndResult txn_result = txn_result_future.get();
if (!txn_result.status.is2xxOK()) {
LOG(FATAL) << "Failed to commit transaction due to error code " << txn_result.status.code
<< " and message: " << txn_result.status.message;
throw std::runtime_error("Failed to end transaction, should_commit: " + should_commit);
}
}

} // namespace catalog
} // namespace sql
} // namespace k2pg
23 changes: 16 additions & 7 deletions src/k2/connector/yb/pggate/catalog/sql_catalog_entity.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,19 +148,28 @@ class NamespaceInfo {

class SessionTransactionContext {
public:
SessionTransactionContext() = default;
~SessionTransactionContext() = default;

void SetTxn(std::shared_ptr<K23SITxn> txn) {
txn_ = txn;
johnfangAFW marked this conversation as resolved.
Show resolved Hide resolved
}
SessionTransactionContext(std::shared_ptr<K23SITxn> txn);
~SessionTransactionContext();

std::shared_ptr<K23SITxn> GetTxn() {
return txn_;
}

void Commit() {
EndTransaction(true);
finished = true;
}

void Abort() {
EndTransaction(false);
finished = true;
}

private:
std::shared_ptr<K23SITxn> txn_;
void EndTransaction(bool should_commit);

std::shared_ptr<K23SITxn> txn_;
bool finished;
};

// mapping to the status code defined in yb's status.h (some are not applicable and thus, not included here)
Expand Down
59 changes: 24 additions & 35 deletions src/k2/connector/yb/pggate/catalog/sql_catalog_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -64,27 +64,27 @@ namespace catalog {
if (clresp.status.IsSucceeded()) {
LOG(INFO) << "Created cluster info record succeeded";
} else {
EndTransactionContext(ci_context, false);
ci_context->Abort();
LOG(FATAL) << "Failed to create cluster info record due to " << clresp.status.errorMessage;
return STATUS_FORMAT(IOError, "Failed to create cluster info record to error code $0 and message $1",
clresp.status.code, clresp.status.errorMessage);
}
}
} else {
EndTransactionContext(ci_context, false);
ci_context->Abort();
LOG(FATAL) << "Failed to read cluster info record";
return STATUS_FORMAT(IOError, "Failed to read cluster info record to error code $0 and message $1",
ciresp.status.code, ciresp.status.errorMessage);
}
// end the current transaction so that we use a different one for later operations
EndTransactionContext(ci_context, true);
ci_context->Commit();

// load namespaces
CreateNamespaceTableResult cnresp = namespace_info_handler_->CreateNamespaceTableIfNecessary();
johnfangAFW marked this conversation as resolved.
Show resolved Hide resolved
if (cnresp.status.IsSucceeded()) {
std::shared_ptr<SessionTransactionContext> ns_context = NewTransactionContext();
ListNamespacesResult nsresp = namespace_info_handler_->ListNamespaces(ns_context);
EndTransactionContext(ns_context, true);
ns_context->Commit();

if (nsresp.status.IsSucceeded()) {
if (!nsresp.namespaceInfos.empty()) {
Expand Down Expand Up @@ -128,7 +128,7 @@ namespace catalog {
if (!init_db_done_) {
std::shared_ptr<SessionTransactionContext> context = NewTransactionContext();
GetClusterInfoResult result = cluster_info_handler_->ReadClusterInfo(context, cluster_id_);
EndTransactionContext(context, true);
context->Commit();
if (result.status.IsSucceeded() && result.clusterInfo != nullptr) {
if (result.clusterInfo->IsInitdbDone()) {
init_db_done_.store(result.clusterInfo->IsInitdbDone(), std::memory_order_relaxed);
Expand Down Expand Up @@ -162,7 +162,7 @@ namespace catalog {
} else {
response.status = std::move(result.status);
}
EndTransactionContext(context, true);
context->Commit();
return response;
}

Expand All @@ -174,7 +174,7 @@ namespace catalog {
ListNamespacesResponse response;
std::shared_ptr<SessionTransactionContext> context = NewTransactionContext();
ListNamespacesResult result = namespace_info_handler_->ListNamespaces(context);
EndTransactionContext(context, true);
context->Commit();

if (result.status.IsSucceeded()) {
response.status.Succeed();
Expand All @@ -200,7 +200,7 @@ namespace catalog {
std::shared_ptr<SessionTransactionContext> context = NewTransactionContext();
// TODO: use a background task to refresh the namespace caches to avoid fetching from SKV on each call
GetNamespaceResult result = namespace_info_handler_->GetNamespace(context, request.namespaceId);
EndTransactionContext(context, true);
context->Commit();
if (result.status.IsSucceeded()) {
if (result.namespaceInfo != nullptr) {
response.namespace_info = result.namespaceInfo;
Expand Down Expand Up @@ -233,7 +233,7 @@ namespace catalog {
// this could be avoided by use a single or a quorum of catalog managers
std::shared_ptr<SessionTransactionContext> ns_context = NewTransactionContext();
ListNamespacesResult result = namespace_info_handler_->ListNamespaces(ns_context);
EndTransactionContext(ns_context, true);
ns_context->Commit();
if (result.status.IsSucceeded() && !result.namespaceInfos.empty()) {
// update namespace caches
UpdateNamespaceCache(result.namespaceInfos);
Expand Down Expand Up @@ -286,7 +286,7 @@ namespace catalog {
CreateUpdateTableResult result = table_info_handler_->CreateOrUpdateTable(context, new_table_info->namespace_id(), new_table_info);
if (result.status.IsSucceeded()) {
// commit transaction
EndTransactionContext(context, true);
context->Commit();
// update table caches
UpdateTableCache(new_table_info);
// increase catalog version
Expand All @@ -296,7 +296,7 @@ namespace catalog {
response.tableInfo = new_table_info;
} else {
// abort the transaction
EndTransactionContext(context, false);
context->Abort();
response.status = std::move(result.status);
}

Expand All @@ -311,7 +311,7 @@ namespace catalog {
// this could be avoided by use a single or a quorum of catalog managers
std::shared_ptr<SessionTransactionContext> ns_context = NewTransactionContext();
ListNamespacesResult result = namespace_info_handler_->ListNamespaces(ns_context);
EndTransactionContext(ns_context, true);
ns_context->Commit();
if (result.status.IsSucceeded() && !result.namespaceInfos.empty()) {
// update namespace caches
UpdateNamespaceCache(result.namespaceInfos);
Expand Down Expand Up @@ -429,7 +429,7 @@ namespace catalog {
// this could be avoided by use a single or a quorum of catalog managers
std::shared_ptr<SessionTransactionContext> ns_context = NewTransactionContext();
ListNamespacesResult result = namespace_info_handler_->ListNamespaces(ns_context);
EndTransactionContext(ns_context, true);
ns_context->Commit();
if (result.status.IsSucceeded() && !result.namespaceInfos.empty()) {
// update namespace caches
UpdateNamespaceCache(result.namespaceInfos);
Expand Down Expand Up @@ -459,11 +459,11 @@ namespace catalog {
response.status.errorMessage = "Cannot find table " + table_id;
response.tableInfo = nullptr;
}
EndTransactionContext(context, true);
context->Commit();
} else {
response.status = std::move(table_result.status);
response.tableInfo = nullptr;
EndTransactionContext(context, false);
context->Abort();
}
}

Expand All @@ -486,7 +486,7 @@ namespace catalog {
// this could be avoided by use a single or a quorum of catalog managers
std::shared_ptr<SessionTransactionContext> ns_context = NewTransactionContext();
ListNamespacesResult result = namespace_info_handler_->ListNamespaces(ns_context);
EndTransactionContext(ns_context, true);
ns_context->Commit();
if (result.status.IsSucceeded() && !result.namespaceInfos.empty()) {
// update namespace caches
UpdateNamespaceCache(result.namespaceInfos);
Expand Down Expand Up @@ -530,7 +530,7 @@ namespace catalog {
} else {
response.status = std::move(table_result.status);
}
EndTransactionContext(context, true);
context->Commit();
}

return response;
Expand All @@ -547,7 +547,7 @@ namespace catalog {
// this could be avoided by use a single or a quorum of catalog managers
std::shared_ptr<SessionTransactionContext> ns_context = NewTransactionContext();
ListNamespacesResult result = namespace_info_handler_->ListNamespaces(ns_context);
EndTransactionContext(ns_context, true);
ns_context->Commit();
if (result.status.IsSucceeded() && !result.namespaceInfos.empty()) {
// update namespace caches
UpdateNamespaceCache(result.namespaceInfos);
Expand All @@ -569,7 +569,7 @@ namespace catalog {
GeBaseTableIdResult index_result = table_info_handler_->GeBaseTableId(context, namespace_id, table_id);
if (!index_result.status.IsSucceeded()) {
response.status = std::move(index_result.status);
EndTransactionContext(context, false);
context->Abort();
return response;
}
base_table_id = index_result.baseTableId;
Expand Down Expand Up @@ -622,7 +622,7 @@ namespace catalog {
response.status = std::move(delete_result.status);
}
}
EndTransactionContext(context, true);
context->Commit();

return response;
}
Expand All @@ -641,7 +641,7 @@ namespace catalog {
LOG(WARNING) << "No more object identifier is available for Postgres database " << request.namespaceId;
response.status.code = StatusCode::INVALID_ARGUMENT;
response.status.errorMessage = "No more object identifier is available for " + request.namespaceId;
EndTransactionContext(ns_context, false);
ns_context->Abort();
return response;
}

Expand Down Expand Up @@ -674,7 +674,7 @@ namespace catalog {
} else {
response.status = std::move(result.status);
}
EndTransactionContext(ns_context, true);
ns_context->Commit();

return response;
}
Expand Down Expand Up @@ -816,29 +816,18 @@ namespace catalog {
std::shared_ptr<SessionTransactionContext> SqlCatalogManager::NewTransactionContext() {
std::future<K23SITxn> txn_future = k2_adapter_->beginTransaction();
std::shared_ptr<K23SITxn> txn = std::make_shared<K23SITxn>(txn_future.get());
std::shared_ptr<SessionTransactionContext> context = std::make_shared<SessionTransactionContext>();
context->SetTxn(txn);
std::shared_ptr<SessionTransactionContext> context = std::make_shared<SessionTransactionContext>(txn);
return context;
}

void SqlCatalogManager::EndTransactionContext(std::shared_ptr<SessionTransactionContext> context, bool should_commit) {
std::future<k2::EndResult> txn_result_future = context->GetTxn()->endTxn(should_commit);
k2::EndResult txn_result = txn_result_future.get();
if (!txn_result.status.is2xxOK()) {
LOG(FATAL) << "Failed to commit transaction due to error code " << txn_result.status.code
<< " and message: " << txn_result.status.message;
throw std::runtime_error("Failed to end transaction, should_commit: " + should_commit);
}
}

void SqlCatalogManager::IncreaseCatalogVersion() {
catalog_version_++;
johnfangAFW marked this conversation as resolved.
Show resolved Hide resolved
// need to update the catalog version on SKV
// the update frequency could be reduced once we have a single or a quorum of catalog managers
ClusterInfo cluster_info(cluster_id_, init_db_done_, catalog_version_);
std::shared_ptr<SessionTransactionContext> context = NewTransactionContext();
cluster_info_handler_->UpdateClusterInfo(context, cluster_info);
EndTransactionContext(context, true);
context->Commit();
}

IndexInfo SqlCatalogManager::BuildIndexInfo(std::shared_ptr<TableInfo> base_table_info, std::string index_id, std::string index_name, uint32_t pg_oid,
Expand Down
2 changes: 0 additions & 2 deletions src/k2/connector/yb/pggate/catalog/sql_catalog_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -315,8 +315,6 @@ namespace catalog {

std::shared_ptr<SessionTransactionContext> NewTransactionContext();

void EndTransactionContext(std::shared_ptr<SessionTransactionContext> context, bool should_commit);

void IncreaseCatalogVersion();

IndexInfo BuildIndexInfo(std::shared_ptr<TableInfo> base_table_info, std::string index_id, std::string index_name, uint32_t pg_oid,
Expand Down