Skip to content
This repository has been archived by the owner on Apr 1, 2023. It is now read-only.

add logic to maintain cluster level information, namespace info, and table info in catalog manager #56

Merged
merged 10 commits into from
Dec 9, 2020
6 changes: 3 additions & 3 deletions src/k2/connector/yb/entities/index.cc
Original file line number Diff line number Diff line change
Expand Up @@ -84,15 +84,15 @@ namespace sql {
// the dependency list does not need to be cached in a member id list for fast access.
bool IndexInfo::CheckColumnDependency(ColumnId column_id) const {
for (const IndexColumn &index_col : columns_) {
// The protobuf data contains IDs of all columns that this index is referencing.
// The index data contains IDs of all columns that this index is referencing.
// Examples:
// 1. Index by column
// - INDEX ON tab (a_column)
// - The ID of "a_column" is included in protobuf data.
// - The ID of "a_column" is included in index data.
//
// 2. Index by expression of column:
// - INDEX ON tab (j_column->>'field')
// - The ID of "j_column" is included in protobuf data.
// - The ID of "j_column" is included in index data.
if (index_col.indexed_column_id == column_id) {
return true;
}
Expand Down
57 changes: 49 additions & 8 deletions src/k2/connector/yb/entities/index.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,23 +81,32 @@ namespace k2pg {
ColumnId column_id; // Column id in the index table.
std::string column_name; // Column name in the index table - colexpr.MangledName().
ColumnId indexed_column_id; // Corresponding column id in indexed table.
PgExpr colexpr; // Index expression.
std::shared_ptr<PgExpr> colexpr = nullptr; // Index expression.

explicit IndexColumn(ColumnId in_column_id, std::string in_column_name,
ColumnId in_indexed_column_id, PgExpr in_colexpr)
ColumnId in_indexed_column_id, std::shared_ptr<PgExpr> in_colexpr)
: column_id(in_column_id), column_name(std::move(in_column_name)),
indexed_column_id(in_indexed_column_id), colexpr(std::move(colexpr)) {
indexed_column_id(in_indexed_column_id), colexpr(in_colexpr) {
}

explicit IndexColumn(ColumnId in_column_id, std::string in_column_name,
ColumnId in_indexed_column_id)
: column_id(in_column_id), column_name(std::move(in_column_name)),
indexed_column_id(in_indexed_column_id) {
}
};

class IndexInfo {
public:
explicit IndexInfo(TableId table_id, TableId indexed_table_id, uint32_t schema_version,
explicit IndexInfo(TableId table_id, std::string table_name, uint32_t pg_oid,
TableId indexed_table_id, uint32_t schema_version,
bool is_unique, std::vector<IndexColumn> columns, size_t hash_column_count,
size_t range_column_count, std::vector<ColumnId> indexed_hash_column_ids,
std::vector<ColumnId> indexed_range_column_ids, IndexPermissions index_permissions,
bool use_mangled_column_name)
: table_id_(table_id),
table_name_(table_name),
pg_oid_(pg_oid),
indexed_table_id_(indexed_table_id),
schema_version_(schema_version),
is_unique_(is_unique),
Expand All @@ -109,22 +118,52 @@ namespace k2pg {
index_permissions_(index_permissions) {
}

explicit IndexInfo(TableId table_id,
std::string table_name,
uint32_t pg_oid,
TableId indexed_table_id,
uint32_t schema_version,
bool is_unique,
std::vector<IndexColumn> columns,
IndexPermissions index_permissions)
: table_id_(std::move(table_id)),
table_name_(std::move(table_name)),
pg_oid_(pg_oid),
indexed_table_id_(indexed_table_id),
schema_version_(schema_version),
is_unique_(is_unique),
columns_(std::move(columns)),
// All the index columns are primary keys and we don't manage range keys in PG gate, as a result,
// we treat all primary keys as the (hash) partition keys.
hash_column_count_(columns_.size()),
johnfangAFW marked this conversation as resolved.
Show resolved Hide resolved
range_column_count_(0),
index_permissions_(index_permissions) {
}

const TableId& table_id() const {
return table_id_;
}

const TableId& indexed_table_id() const {
return indexed_table_id_;
const std::string& table_name() const {
return table_name_;
}

uint32_t schema_version() const {
return schema_version_;
const uint32_t pg_oid() const {
return pg_oid_;
}

const TableId& indexed_table_id() const {
return indexed_table_id_;
}

bool is_unique() const {
return is_unique_;
}

const uint32_t version() const {
johnfangAFW marked this conversation as resolved.
Show resolved Hide resolved
return schema_version_;
}

const std::vector<IndexColumn>& columns() const {
return columns_;
}
Expand Down Expand Up @@ -193,6 +232,8 @@ namespace k2pg {

private:
const TableId table_id_; // Index table id.
const std::string table_name_; // Index table name.
const uint32_t pg_oid_;
const TableId indexed_table_id_; // Indexed table id.
const uint32_t schema_version_ = 0; // Index table's schema version.
const bool is_unique_ = false; // Whether this is a unique index.
Expand Down
8 changes: 8 additions & 0 deletions src/k2/connector/yb/entities/schema.cc
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,14 @@ namespace sql {
return ColumnId(column_id(column_index));
}

std::pair<bool, ColumnId> Schema::FindColumnIdByName(const std::string& column_name) const {
size_t column_index = find_column(column_name);
if (column_index == Schema::kColumnNotFound) {
return std::make_pair<bool, ColumnId>(false, -1);
}
return std::make_pair<bool, ColumnId>(true, ColumnId(column_id(column_index)));
}

ColumnId Schema::first_column_id() {
return kFirstColumnId;
}
Expand Down
13 changes: 9 additions & 4 deletions src/k2/connector/yb/entities/schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -243,8 +243,11 @@ namespace sql {
SortingType sorting_type_;
};

class TableProperties{
class TableProperties {
public:
TableProperties() = default;
johnfangAFW marked this conversation as resolved.
Show resolved Hide resolved
~TableProperties() = default;

bool operator==(const TableProperties& other) const {
return default_time_to_live_ == other.default_time_to_live_;
}
Expand Down Expand Up @@ -457,6 +460,8 @@ namespace sql {

Result<int> ColumnIndexByName(GStringPiece col_name) const;

std::pair<bool, ColumnId> FindColumnIdByName(const std::string& col_name) const;

// Returns true if the schema contains nullable columns
bool has_nullables() const {
return has_nullables_;
Expand Down Expand Up @@ -549,11 +554,11 @@ namespace sql {

static ColumnId first_column_id();

uint64_t version() const {
uint32_t version() const {
return version_;
}

void set_version(uint64_t version) {
void set_version(uint32_t version) {
version_ = version;
}

Expand Down Expand Up @@ -590,7 +595,7 @@ namespace sql {

TableProperties table_properties_;

uint64_t version_;
uint32_t version_;
};

// Helper used for schema creation/editing.
Expand Down
2 changes: 1 addition & 1 deletion src/k2/connector/yb/entities/table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
namespace k2pg {
namespace sql {

Result<const IndexInfo*> TableInfo::FindIndex(const TableId& index_id) const {
Result<const IndexInfo*> TableInfo::FindIndex(const std::string& index_id) const {
return index_map_.FindIndex(index_id);
}

Expand Down
68 changes: 57 additions & 11 deletions src/k2/connector/yb/entities/table.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,12 @@
namespace k2pg {
namespace sql {
struct TableIdentifier {
johnfangAFW marked this conversation as resolved.
Show resolved Hide resolved
NamespaceName namespace_name; // Can be empty, that means the namespace has not been set yet.
TableName table_name;
TableIdentifier(NamespaceName ns, TableName tn) : namespace_name(ns), table_name(tn) {
std::string namespace_id;
johnfangAFW marked this conversation as resolved.
Show resolved Hide resolved
std::string namespace_name; // Can be empty, that means the namespace has not been set yet.
std::string table_id;
std::string table_name;
johnfangAFW marked this conversation as resolved.
Show resolved Hide resolved
TableIdentifier(std::string ns_id, std::string ns_name, std::string tb_id, std::string tb_name) :
namespace_id(ns_id), namespace_name(ns_name), table_id(tb_id), table_name(tb_name) {
}
};

Expand All @@ -40,22 +43,46 @@ namespace sql {

typedef std::shared_ptr<TableInfo> SharedPtr;

TableInfo(NamespaceName namespace_name, TableName table_name, Schema schema) :
table_id_(namespace_name, table_name), schema_(std::move(schema)) {
TableInfo(std::string namespace_id, std::string namespace_name, std::string table_id, std::string table_name, Schema schema) :
table_id_(namespace_id, namespace_name, table_id, table_name), schema_(std::move(schema)) {
}

const NamespaceName& namespace_name() const {
const std::string& namespace_id() const {
return table_id_.namespace_id;
}

const std::string& namespace_name() const {
return table_id_.namespace_name;
}

const TableName& table_name() const {
const std::string& table_id() const {
return table_id_.table_id;
}

const std::string& table_name() const {
return table_id_.table_name;
}

const TableIdentifier& table_identifier() {
return table_id_;
}

void set_pg_oid(uint32_t pg_oid) {
pg_oid_ = pg_oid;
}

uint32_t pg_oid() {
return pg_oid_;
}

void set_next_column_id(int32_t next_column_id) {
next_column_id_ = next_column_id;
}

int32_t next_column_id() {
return next_column_id_;
}

const Schema& schema() const {
return schema_;
}
Expand Down Expand Up @@ -84,17 +111,36 @@ namespace sql {
return schema_.num_range_key_columns();
}

void add_secondary_index(const TableId& index_id, const IndexInfo& index_info) {
void add_secondary_index(const std::string& index_id, const IndexInfo& index_info) {
index_map_.emplace(index_id, index_info);
}

Result<const IndexInfo*> FindIndex(const TableId& index_id) const;
const IndexMap& secondary_indexes() {
return index_map_;
}

private:

void drop_index(const std::string& index_id) {
index_map_.erase(index_id);
}

Result<const IndexInfo*> FindIndex(const std::string& index_id) const;

void set_is_sys_table(bool is_sys_table) {
is_sys_table_ = is_sys_table;
}

bool is_sys_table() {
return is_sys_table_;
}

private:
TableIdentifier table_id_;
// PG internal object id
uint32_t pg_oid_;
Schema schema_;
IndexMap index_map_;
int32_t next_column_id_ = 0;
bool is_sys_table_ = false;
};

} // namespace sql
Expand Down
4 changes: 2 additions & 2 deletions src/k2/connector/yb/pggate/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
file(GLOB HEADERS "*.h")
file(GLOB SOURCES "*.cc")
file(GLOB HEADERS "*.h" "*/*.h")
johnfangAFW marked this conversation as resolved.
Show resolved Hide resolved
johnfangAFW marked this conversation as resolved.
Show resolved Hide resolved
file(GLOB SOURCES "*.cc" "*/*.cc")

add_library(ybpggate STATIC ${HEADERS} ${SOURCES})

Expand Down
90 changes: 90 additions & 0 deletions src/k2/connector/yb/pggate/catalog/base_handler.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
MIT License

Copyright(c) 2020 Futurewei Cloud

Permission is hereby granted,
free of charge, to any person obtaining a copy of this software and associated documentation files(the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and / or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions :

The above copyright notice and this permission notice shall be included in all copies
or
substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS",
WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
DAMAGES OR OTHER
LIABILITY,
WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
*/

#include "yb/pggate/catalog/base_handler.h"

#include <glog/logging.h>

namespace k2pg {
namespace sql {
namespace catalog {
BaseHandler::BaseHandler(std::shared_ptr<K2Adapter> k2_adapter) : k2_adapter_(k2_adapter) {
}

BaseHandler::~BaseHandler() {
}

RStatus BaseHandler::CreateSKVSchema(std::string collection_name, std::shared_ptr<k2::dto::Schema> schema) {
RStatus response;
auto result = k2_adapter_->CreateSchema(collection_name, schema).get();
if (!result.status.is2xxOK()) {
LOG(FATAL) << "Failed to create SKV schema for " << schema->name << "in" << collection_name
<< " due to error code " << result.status.code
<< " and message: " << result.status.message;
response.code = StatusCode::INTERNAL_ERROR;
response.errorMessage = std::move(result.status.message);
} else {
response.Succeed();
}
return response;
}

RStatus BaseHandler::PersistSKVRecord(std::shared_ptr<SessionTransactionContext> context, k2::dto::SKVRecord& record) {
return SaveOrUpdateSKVRecord(context, record, false);
}

RStatus BaseHandler::DeleteSKVRecord(std::shared_ptr<SessionTransactionContext> context, k2::dto::SKVRecord& record) {
return SaveOrUpdateSKVRecord(context, record, true);
}

RStatus BaseHandler::BatchDeleteSKVRecords(std::shared_ptr<SessionTransactionContext> context, std::vector<k2::dto::SKVRecord>& records) {
RStatus response;
for (auto& record : records) {
RStatus result = DeleteSKVRecord(context, record);
if (!result.IsSucceeded()) {
return result;
}
}
response.Succeed();
return response;
}

RStatus BaseHandler::SaveOrUpdateSKVRecord(std::shared_ptr<SessionTransactionContext> context, k2::dto::SKVRecord& record, bool isDelete) {
RStatus response;
auto result = context->GetTxn()->write(std::move(record), isDelete).get();
if (!result.status.is2xxOK()) {
LOG(FATAL) << "Failed to " << (isDelete ? "Delete" : "Save")
<<" SKV record "
<< " due to error code " << result.status.code
<< " and message: " << result.status.message;
response.code = StatusCode::INTERNAL_ERROR;
response.errorMessage = std::move(result.status.message);
} else {
response.Succeed();
}
return response;
}

} // namespace sql
} // namespace sql
} // namespace k2pg
Loading