Skip to content
This repository has been archived by the owner on Apr 1, 2023. It is now read-only.

Commit

Permalink
Removing RStatus, consolidate K2Adapter APIs, some code cleanup (#221)
Browse files Browse the repository at this point in the history
* removing RStatus, consolidate K2Adapter APIs, some code cleanup.
* Replace SessionTransactionCContext with PgTxnHandler, and hiding K23SITxn
* CombineStartNewTransactionIfNotYet with GetTxnHandle
* Keep only asyncAPI on K2Adapter
fixes #58 

Co-authored-by: Jerry Feng <[email protected]>
  • Loading branch information
jerryhfeng and Jerry Feng authored Mar 30, 2021
1 parent 6432e3c commit eeba0e4
Show file tree
Hide file tree
Showing 32 changed files with 1,122 additions and 1,426 deletions.
2 changes: 2 additions & 0 deletions src/k2/connector/common/status.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@

#include "common/strings/substitute.h"

// TODO: make Status K2Log compatible and indeed extended PGErrorCode at the same time

// Return the given status if it is not OK.
#define YB_RETURN_NOT_OK(s) do { \
auto&& _s = (s); \
Expand Down
86 changes: 0 additions & 86 deletions src/k2/connector/pggate/catalog/base_handler.cc

This file was deleted.

64 changes: 0 additions & 64 deletions src/k2/connector/pggate/catalog/base_handler.h

This file was deleted.

53 changes: 28 additions & 25 deletions src/k2/connector/pggate/catalog/cluster_info_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,64 +30,67 @@ namespace sql {
namespace catalog {

ClusterInfoHandler::ClusterInfoHandler(std::shared_ptr<K2Adapter> k2_adapter)
: BaseHandler(k2_adapter),
collection_name_(CatalogConsts::skv_collection_name_sql_primary),
: collection_name_(CatalogConsts::skv_collection_name_sql_primary),
schema_name_(CatalogConsts::skv_schema_name_cluster_info) {
schema_ptr_ = std::make_shared<k2::dto::Schema>(schema_);
k2_adapter_ = k2_adapter;
}

ClusterInfoHandler::~ClusterInfoHandler() {
}

// Called only once in sql_catalog_manager::InitPrimaryCluster()
InitClusterInfoResult ClusterInfoHandler::InitClusterInfo(std::shared_ptr<SessionTransactionContext> context, ClusterInfo& cluster_info) {
InitClusterInfoResult ClusterInfoHandler::InitClusterInfo(std::shared_ptr<PgTxnHandler> txnHandler, ClusterInfo& cluster_info) {
InitClusterInfoResult response;
RStatus schema_result = CreateSKVSchema(collection_name_, schema_ptr_);
if (!schema_result.IsSucceeded()) {
response.status = std::move(schema_result);
auto result = k2_adapter_->CreateSchema(collection_name_, schema_ptr_).get();
if (!result.status.is2xxOK()) {
K2LOG_E(log::catalog, "Failed to create schema for {} in {}, due to {}", schema_ptr_->name, collection_name_, result.status);
response.status = K2Adapter::K2StatusToYBStatus(result.status);
return response;
}
UpdateClusterInfoResult result = UpdateClusterInfo(context, cluster_info);
response.status = std::move(result.status);

UpdateClusterInfoResult updateResult = UpdateClusterInfo(txnHandler, cluster_info);
response.status = std::move(updateResult.status);
return response;
}

UpdateClusterInfoResult ClusterInfoHandler::UpdateClusterInfo(std::shared_ptr<SessionTransactionContext> context, ClusterInfo& cluster_info) {
UpdateClusterInfoResult ClusterInfoHandler::UpdateClusterInfo(std::shared_ptr<PgTxnHandler> txnHandler, ClusterInfo& cluster_info) {
UpdateClusterInfoResult response;
k2::dto::SKVRecord record(collection_name_, schema_ptr_);
record.serializeNext<k2::String>(cluster_info.GetClusterId());
// use signed integers for unsigned integers since SKV does not support them
record.serializeNext<int64_t>(cluster_info.GetCatalogVersion());
record.serializeNext<bool>(cluster_info.IsInitdbDone());
response.status = PersistSKVRecord(context, record);
auto upsertRes = k2_adapter_->UpsertRecord(txnHandler->GetTxn(), record).get();
if (!upsertRes.status.is2xxOK())
{
K2LOG_E(log::catalog, "Failed to upsert cluster info record due to {}", upsertRes.status);
response.status = K2Adapter::K2StatusToYBStatus(upsertRes.status);
return response;
}

response.status = Status(); // OK
return response;
}

GetClusterInfoResult ClusterInfoHandler::ReadClusterInfo(std::shared_ptr<SessionTransactionContext> context, const std::string& cluster_id) {
GetClusterInfoResult ClusterInfoHandler::GetClusterInfo(std::shared_ptr<PgTxnHandler> txnHandler, const std::string& cluster_id) {
GetClusterInfoResult response;
k2::dto::SKVRecord record(collection_name_, schema_ptr_);
record.serializeNext<k2::String>(cluster_id);
auto read_result = context->GetTxn()->read(std::move(record)).get();
if (read_result.status == k2::dto::K23SIStatus::KeyNotFound) {
K2LOG_D(log::catalog, "Cluster info record does not exist");
response.clusterInfo = nullptr;
response.status.Succeed();
return response;
}

k2::dto::SKVRecord recordKey(collection_name_, schema_ptr_);
recordKey.serializeNext<k2::String>(cluster_id);
auto read_result = k2_adapter_->ReadRecord(txnHandler->GetTxn(), recordKey).get();
if (!read_result.status.is2xxOK()) {
K2LOG_E(log::catalog, "Failed to read SKV record due to {}", read_result.status);
response.status.code = StatusCode::INTERNAL_ERROR;
response.status.errorMessage = read_result.status.message;
K2LOG_E(log::catalog, "Failed to read SKV record due to {}", read_result.status);
response.status = K2Adapter::K2StatusToYBStatus(read_result.status);
return response;
}

std::shared_ptr<ClusterInfo> cluster_info = std::make_shared<ClusterInfo>();
cluster_info->SetClusterId(read_result.value.deserializeNext<k2::String>().value());
// use signed integers for unsigned integers since SKV does not support them
cluster_info->SetCatalogVersion(read_result.value.deserializeNext<int64_t>().value());
cluster_info->SetInitdbDone(read_result.value.deserializeNext<bool>().value());
response.clusterInfo = cluster_info;
response.status.Succeed();
response.status = Status(); // OK
return response;
}

Expand Down
24 changes: 15 additions & 9 deletions src/k2/connector/pggate/catalog/cluster_info_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,28 +25,33 @@ Copyright(c) 2020 Futurewei Cloud
#define CHOGORI_SQL_CLUSTER_INFO_HANDLER_H

#include <string>
#include "catalog_log.h"

#include "pggate/catalog/base_handler.h"
#include "pggate/catalog/sql_catalog_defaults.h"
#include "pggate/catalog/sql_catalog_entity.h"
#include "pggate/k2_adapter.h"
#include "catalog_log.h"

namespace k2pg {
namespace sql {
namespace catalog {

using k2pg::gate::K2Adapter;
using yb::Status;

struct InitClusterInfoResult {
RStatus status;
Status status;
};

struct UpdateClusterInfoResult {
RStatus status;
Status status;
};

struct GetClusterInfoResult {
RStatus status;
Status status;
std::shared_ptr<ClusterInfo> clusterInfo;
};

class ClusterInfoHandler : public BaseHandler {
class ClusterInfoHandler {
public:
typedef std::shared_ptr<ClusterInfoHandler> SharedPtr;

Expand All @@ -64,16 +69,17 @@ class ClusterInfoHandler : public BaseHandler {
ClusterInfoHandler(std::shared_ptr<K2Adapter> k2_adapter);
~ClusterInfoHandler();

InitClusterInfoResult InitClusterInfo(std::shared_ptr<SessionTransactionContext> context, ClusterInfo& cluster_info);
InitClusterInfoResult InitClusterInfo(std::shared_ptr<PgTxnHandler> txnHandler, ClusterInfo& cluster_info);

UpdateClusterInfoResult UpdateClusterInfo(std::shared_ptr<SessionTransactionContext> context, ClusterInfo& cluster_info);
UpdateClusterInfoResult UpdateClusterInfo(std::shared_ptr<PgTxnHandler> txnHandler, ClusterInfo& cluster_info);

GetClusterInfoResult ReadClusterInfo(std::shared_ptr<SessionTransactionContext> context, const std::string& cluster_id);
GetClusterInfoResult GetClusterInfo(std::shared_ptr<PgTxnHandler> txnHandler, const std::string& cluster_id);

private:
std::string collection_name_;
std::string schema_name_;
std::shared_ptr<k2::dto::Schema> schema_ptr_;
std::shared_ptr<K2Adapter> k2_adapter_;
};

} // namespace sql
Expand Down
Loading

0 comments on commit eeba0e4

Please sign in to comment.