Skip to content

Commit

Permalink
Implemented the V4 tree model and table model write processes
Browse files Browse the repository at this point in the history
* v4 writer

* tmp

* tmp

* write table

* fix device_id.h

* write table, split deviceid

* fix get_device_id

* fix tablet get_value

* fix device id

* complete the develop of the registration and writing process

* fix tablet

* fix v4 write

* fix device_id.h

* fix flush()

* Implement the Table Model Write Process

* Implement the Table Model Write Process

* Implement the Table Model Write Process

* fix mem leak

* try to fix mem leak

* try to fix mem leak

* try to fix mem leak

* try to fix mem leak

* try to fix mem leak

* try to fix men leak

* fix men leak

* Implement the v4 writing process

* tmp

* fix some issues

* fix some issues

* add String type

* fix mem leak

* fix format

* fix mingw compilation

* tmp

* try to fix the index area of generated file

* tmp

* tmp

* try to fix the index area of generated file

* tmp

* Implement read process of index area

* bug fix

* tmp

* tmp

* tmp

* tmp

* tmp

* tmp

* tmp

* fix mem leak

* fix mem leak

* fix compilation error

* fix compilation error

* fix compilation error

* fix compilation error

* bugfix

* bugfix

* fix LZ4 compressor

* tmp

* Implemented the V4 tree model and table model write processes; Support string data types.

* Implemented the V4 tree model and table model write processes; Support string data types.

* Implemented the V4 tree model and table model write processes; Support string data types.

* Implemented the V4 tree model and table model write processes; Support string data types.

* Fix some issues

* Fix some issues

* Fix some issues

* Fix some issues

* try to fix compilation error

* try to fix compilation error

* fix some issues

* fix some issues

---------

Co-authored-by: zwhzzz0821 <[email protected]>
  • Loading branch information
761417898 and zwhzzz0821 authored Jan 22, 2025
1 parent 02727c5 commit 00002d6
Show file tree
Hide file tree
Showing 70 changed files with 4,315 additions and 2,312 deletions.
4 changes: 3 additions & 1 deletion cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@ project(TsFile_CPP)
cmake_policy(SET CMP0079 NEW)
set(TsFile_CPP_VERSION 2.0.0.dev)
set(CMAKE_CXX_FLAGS "$ENV{CXXFLAGS} -Wall -Werror")
if(CMAKE_CXX_COMPILER_ID MATCHES "GNU")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-error=maybe-uninitialized")
endif()
message("cmake using: USE_CPP11=${USE_CPP11}")


set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11")

if(DEFINED ENV{CXX})
Expand Down
14 changes: 13 additions & 1 deletion cpp/src/common/allocator/my_string.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,18 @@ struct String {
}
}

FORCE_INLINE void max(const String &that, common::PageArena &pa) {
if (compare(that) < 0) {
this->dup_from(that, pa);
}
}

FORCE_INLINE void min(const String &that, common::PageArena &pa) {
if (compare(that) > 0) {
this->dup_from(that, pa);
}
}

bool operator<(const String &other) const {
if (this->is_null() && other.is_null()) {
return false;
Expand All @@ -147,7 +159,7 @@ struct String {

return this->len_ < other.len_;
}
std::string to_std_string() { return std::string(buf_, len_); }
std::string to_std_string() const { return std::string(buf_, len_); }

#ifndef NDEBUG
friend std::ostream &operator<<(std::ostream &os, const String &s) {
Expand Down
6 changes: 4 additions & 2 deletions cpp/src/common/config/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
#ifndef COMMON_CONFIG_CONFIG_H
#define COMMON_CONFIG_CONFIG_H

#include <stdint.h>
#include <cstdint>

#include "common/mutex/mutex.h"
#include "utils/db_utils.h"
Expand All @@ -46,10 +46,12 @@ typedef struct ConfigValue {
CompressionType time_compress_type_;
int32_t chunk_group_size_threshold_;
int32_t record_count_for_next_mem_check_;
bool encrypt_flag_ = false;
} ConfigValue;

extern void init_config_value();

extern TSEncoding get_value_encoder(TSDataType data_type);
extern CompressionType get_default_compressor();
// In the future, configuration items need to be dynamically adjusted according
// to the level
extern void set_config_value();
Expand Down
1 change: 1 addition & 0 deletions cpp/src/common/constant/tsfile_constant.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,5 @@ namespace storage

static const std::regex IDENTIFIER_PATTERN("([a-zA-Z0-9_\\u2E80-\\u9FFF]+)");
static const std::regex NODE_NAME_PATTERN("(\\*{0,2}[a-zA-Z0-9_\\u2E80-\\u9FFF]+\\*{0,2})");
static const int DEFAULT_SEGMENT_NUM_FOR_TABLE_NAME = 3;
} // namespace storage
16 changes: 11 additions & 5 deletions cpp/src/common/db_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,22 +23,24 @@
#include <iostream>

#include "utils/util_define.h"
#include "common/allocator/my_string.h"

namespace common {

enum TSDataType {
enum TSDataType : uint8_t {
BOOLEAN = 0,
INT32 = 1,
INT64 = 2,
FLOAT = 3,
DOUBLE = 4,
TEXT = 5,
VECTOR = 6,
STRING = 11,
NULL_TYPE = 254,
INVALID_DATATYPE = 255
};

enum TSEncoding {
enum TSEncoding : uint8_t {
PLAIN = 0,
DICTIONARY = 1,
RLE = 2,
Expand All @@ -53,7 +55,7 @@ enum TSEncoding {
INVALID_ENCODING = 255
};

enum CompressionType {
enum CompressionType : uint8_t {
UNCOMPRESSED = 0,
SNAPPY = 1,
GZIP = 2,
Expand All @@ -65,12 +67,12 @@ enum CompressionType {
INVALID_COMPRESSION = 255
};

extern const char* s_data_type_names[7];
extern const char* s_data_type_names[8];
extern const char* s_encoding_names[12];
extern const char* s_compression_names[8];

FORCE_INLINE const char* get_data_type_name(TSDataType type) {
ASSERT(type >= BOOLEAN && type <= VECTOR);
ASSERT(type >= BOOLEAN && type <= STRING);
return s_data_type_names[type];
}

Expand Down Expand Up @@ -148,6 +150,10 @@ template <>
FORCE_INLINE common::TSDataType GetDataTypeFromTemplateType<double>() {
return common::DOUBLE;
}
template <>
FORCE_INLINE common::TSDataType GetDataTypeFromTemplateType<common::String>() {
return common::STRING;
}

FORCE_INLINE size_t get_data_type_size(TSDataType data_type) {
switch (data_type) {
Expand Down
207 changes: 207 additions & 0 deletions cpp/src/common/device_id.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#ifndef COMMON_DEVICE_ID_H
#define COMMON_DEVICE_ID_H

#include <algorithm>
#include <cstdint>
#include <cstring>
#include <memory>
#include <numeric>
#include <string>
#include <vector>

#include "common/allocator/byte_stream.h"
#include "constant/tsfile_constant.h"
#include "parser/path_nodes_generator.h"
#include "utils/errno_define.h"

namespace storage {
class IDeviceID {
public:
virtual ~IDeviceID() = default;
virtual int serialize(common::ByteStream& write_stream) { return 0; }
virtual int deserialize(common::ByteStream& read_stream) { return 0; }
virtual std::string get_table_name() { return ""; }
virtual int segment_num() { return 0; }
virtual const std::vector<std::string>& get_segments() const {
return empty_segments_;
}
virtual std::string get_device_name() const { return ""; };
virtual bool operator<(const IDeviceID& other) { return 0; }
virtual bool operator==(const IDeviceID& other) { return false; }
virtual bool operator!=(const IDeviceID& other) { return false; }

protected:
IDeviceID() : empty_segments_() {}

private:
const std::vector<std::string> empty_segments_;
};

struct IDeviceIDComparator {
bool operator()(const std::shared_ptr<IDeviceID>& lhs,
const std::shared_ptr<IDeviceID>& rhs) const {
return *lhs < *rhs;
}
};

class StringArrayDeviceID : public IDeviceID {
public:
explicit StringArrayDeviceID(const std::vector<std::string>& segments)
: segments_(formalize(segments)) {}

explicit StringArrayDeviceID(const std::string& device_id_string)
: segments_(split_device_id_string(device_id_string)) {}

~StringArrayDeviceID() override = default;

std::string get_device_name() const override {
return std::accumulate(std::next(segments_.begin()), segments_.end(),
segments_.front(),
[](std::string a, const std::string& b) {
return std::move(a) + "." + b;
});
};

int serialize(common::ByteStream& write_stream) override {
int ret = common::E_OK;
if (RET_FAIL(common::SerializationUtil::write_var_uint(segment_num(),
write_stream))) {
return ret;
}
for (const auto& segment : segments_) {
if (RET_FAIL(common::SerializationUtil::write_str(segment,
write_stream))) {
return ret;
}
}
return ret;
}

int deserialize(common::ByteStream& read_stream) override {
int ret = common::E_OK;
uint32_t num_segments;
if (RET_FAIL(common::SerializationUtil::read_var_uint(num_segments, read_stream))) {
return ret;
}
segments_.clear();
for (uint32_t i = 0; i < num_segments; ++i) {
std::string segment;
if (RET_FAIL(common::SerializationUtil::read_str(segment, read_stream))) {
return ret;
}
segments_.push_back(segment);
}
return ret;
}

std::string get_table_name() override {
return segments_.empty() ? "" : segments_[0];
}

int segment_num() override { return static_cast<int>(segments_.size()); }

const std::vector<std::string>& get_segments() const override {
return segments_;
}

virtual bool operator<(const IDeviceID& other) override {
auto other_segments = other.get_segments();
return std::lexicographical_compare(segments_.begin(), segments_.end(),
other_segments.begin(),
other_segments.end());
}

bool operator==(const IDeviceID& other) override {
auto other_segments = other.get_segments();
return (segments_.size() == other_segments.size()) &&
std::equal(segments_.begin(), segments_.end(),
other_segments.begin());
}

bool operator!=(const IDeviceID& other) override {
return !(*this == other);
}

private:
std::vector<std::string> segments_;

std::vector<std::string> formalize(
const std::vector<std::string>& segments) {
auto it =
std::find_if(segments.rbegin(), segments.rend(),
[](const std::string& seg) { return !seg.empty(); });
return std::vector<std::string>(segments.begin(), it.base());
}

std::vector<std::string> split_device_id_string(
const std::string& device_id_string) {
auto splits =
storage::PathNodesGenerator::invokeParser(device_id_string);
return split_device_id_string(splits);
}

std::vector<std::string> split_device_id_string(
const std::vector<std::string>& splits) {
size_t segment_cnt = splits.size();
std::vector<std::string> final_segments;

if (segment_cnt == 0) {
return final_segments;
}

if (segment_cnt == 1) {
// "root" -> {"root"}
final_segments.push_back(splits[0]);
} else if (segment_cnt < static_cast<size_t>(
storage::DEFAULT_SEGMENT_NUM_FOR_TABLE_NAME + 1)) {
// "root.a" -> {"root", "a"}
// "root.a.b" -> {"root.a", "b"}
std::string table_name = std::accumulate(
splits.begin(), splits.end() - 1, std::string(),
[](const std::string& a, const std::string& b) {
return a.empty() ? b : a + storage::PATH_SEPARATOR + b;
});
final_segments.push_back(table_name);
final_segments.push_back(splits.back());
} else {
// "root.a.b.c" -> {"root.a.b", "c"}
// "root.a.b.c.d" -> {"root.a.b", "c", "d"}
std::string table_name = std::accumulate(
splits.begin(),
splits.begin() + storage::DEFAULT_SEGMENT_NUM_FOR_TABLE_NAME,
std::string(), [](const std::string& a, const std::string& b) {
return a.empty() ? b : a + storage::PATH_SEPARATOR + b;
});

final_segments.emplace_back(std::move(table_name));
final_segments.insert(
final_segments.end(),
splits.begin() + storage::DEFAULT_SEGMENT_NUM_FOR_TABLE_NAME,
splits.end());
}

return final_segments;
}
};
}

#endif
37 changes: 34 additions & 3 deletions cpp/src/common/global.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,38 @@ void init_config_value() {
g_config_value_.time_compress_type_ = LZ4;
}

extern TSEncoding get_value_encoder(TSDataType data_type) {
switch (data_type) {
case BOOLEAN:
return TSEncoding::RLE;
case INT32:
return TSEncoding::TS_2DIFF;
case INT64:
return TSEncoding::TS_2DIFF;
case FLOAT:
return TSEncoding::GORILLA;
case DOUBLE:
return TSEncoding::GORILLA;
case TEXT:
return TSEncoding::PLAIN;
case STRING:
return TSEncoding::PLAIN;
case VECTOR:
break;
case NULL_TYPE:
break;
case INVALID_DATATYPE:
break;
default:
break;
}
return TSEncoding::INVALID_ENCODING;
}

extern CompressionType get_default_compressor() {
return LZ4;
}

void config_set_page_max_point_count(uint32_t page_max_ponint_count) {
g_config_value_.page_writer_max_point_num_ = page_max_ponint_count;
}
Expand All @@ -55,9 +87,8 @@ void config_set_max_degree_of_index_node(uint32_t max_degree_of_index_node) {
}

void set_config_value() {}

const char* s_data_type_names[7] = {"BOOLEAN", "INT32", "INT64", "FLOAT",
"DOUBLE", "TEXT", "VECTOR"};
const char* s_data_type_names[8] = {"BOOLEAN", "INT32", "INT64", "FLOAT",
"DOUBLE", "TEXT", "VECTOR", "STRING"};

const char* s_encoding_names[12] = {
"PLAIN", "DICTIONARY", "RLE", "DIFF", "TS_2DIFF", "BITMAP",
Expand Down
Loading

0 comments on commit 00002d6

Please sign in to comment.