-
Notifications
You must be signed in to change notification settings - Fork 60
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
v4-write-process #345
v4-write-process #345
Changes from 27 commits
e9fb2d2
2fe33ee
a6d212b
0c0e51e
94fa48e
b21d479
a92ae2c
1ffa310
67b4db9
dc8ba7d
87ca0cf
50fe3f4
f6764ee
349c129
2924a72
402bc9c
dc9b350
7ac7233
142217a
59095f3
358a71b
03d4b10
810bf12
64d04e4
77c4149
7038d0d
a9438c4
90485b1
386c675
5ebc731
df95653
acd110d
b3b3159
a80d498
052a24a
d33d13a
ded900e
797b562
0d2be78
636794e
3b9a3c5
a8e81c5
b23f819
0304010
e44e992
03e7078
fac57f4
fc93f42
bd8d016
0d54705
0296de3
eed8618
89771d4
2390bba
c7b5bbe
8098038
9882039
9986dbf
c602470
5bd2c0d
bd3e16e
36efa31
717cb2d
5faebb7
4284302
01cee15
f899bdd
79dc252
79e14e3
cefc80c
b01ffe6
54da3ab
9e6cd21
2ab81f0
2f80ca9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,212 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* License); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
|
||
#ifndef COMMON_DEVICE_ID_H | ||
#define COMMON_DEVICE_ID_H | ||
|
||
#include <algorithm> | ||
#include <cstdint> | ||
#include <cstring> | ||
#include <functional> | ||
#include <iostream> | ||
#include <memory> | ||
#include <numeric> | ||
#include <sstream> | ||
#include <stdexcept> | ||
#include <string> | ||
#include <vector> | ||
|
||
#include "common/allocator/byte_stream.h" | ||
#include "utils/errno_define.h" | ||
|
||
class IDeviceID { | ||
public: | ||
virtual ~IDeviceID() = default; | ||
virtual int serialize(common::ByteStream& write_stream) { return 0; } | ||
virtual std::string get_table_name() { return ""; } | ||
virtual int segment_num() { return 0; } | ||
virtual std::vector<std::string> get_segments() const { return {}; } | ||
virtual std::string get_device_name() const { return ""; }; | ||
virtual bool operator<(const IDeviceID& other) { return 0; } | ||
virtual bool operator==(const IDeviceID& other) { return false; } | ||
virtual bool operator!=(const IDeviceID& other) { return false; } | ||
}; | ||
|
||
struct IDeviceIDComparator { | ||
bool operator()(const std::shared_ptr<IDeviceID>& lhs, | ||
const std::shared_ptr<IDeviceID>& rhs) const { | ||
return *lhs < *rhs; | ||
} | ||
}; | ||
|
||
class StringArrayDeviceID : public IDeviceID { | ||
public: | ||
explicit StringArrayDeviceID(const std::vector<std::string>& segments) | ||
: segments_(formalize(segments)) {} | ||
|
||
explicit StringArrayDeviceID(const std::string& device_id_string) | ||
: segments_(split_device_id_string(device_id_string)) {} | ||
|
||
~StringArrayDeviceID() override = default; | ||
|
||
std::string get_device_name() const override { | ||
return std::accumulate(std::next(segments_.begin()), segments_.end(), | ||
segments_.front(), | ||
[](std::string a, const std::string& b) { | ||
return std::move(a) + "." + b; | ||
}); | ||
}; | ||
|
||
int serialize(common::ByteStream& write_stream) override { | ||
int ret = common::E_OK; | ||
if (RET_FAIL(common::SerializationUtil::write_var_int(segment_num(), | ||
write_stream))) { | ||
return ret; | ||
} | ||
for (const auto& segment : segments_) { | ||
if (RET_FAIL(common::SerializationUtil::write_var_int( | ||
segment.size(), write_stream))) { | ||
return ret; | ||
} else if (RET_FAIL(write_stream.write_buf(segment.c_str(), | ||
segment.size()))) { | ||
return ret; | ||
} | ||
} | ||
return ret; | ||
} | ||
|
||
std::string get_table_name() override { | ||
return segments_.empty() ? "" : segments_[0]; | ||
} | ||
|
||
int segment_num() override { return static_cast<int>(segments_.size()); } | ||
|
||
std::vector<std::string> get_segments() const override { return segments_; } | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If read only There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fixed |
||
|
||
virtual bool operator<(const IDeviceID& other) override { | ||
auto other_segments = other.get_segments(); | ||
return std::lexicographical_compare(segments_.begin(), segments_.end(), | ||
other_segments.begin(), | ||
other_segments.end()); | ||
} | ||
|
||
bool operator==(const IDeviceID& other) override { | ||
auto other_segments = other.get_segments(); | ||
return (segments_.size() == other_segments.size()) && | ||
std::equal(segments_.begin(), segments_.end(), | ||
other_segments.begin()); | ||
} | ||
|
||
bool operator!=(const IDeviceID& other) override { | ||
return !(*this == other); | ||
} | ||
|
||
private: | ||
std::vector<std::string> segments_; | ||
|
||
static std::vector<std::string> formalize( | ||
const std::vector<std::string>& segments) { | ||
auto it = | ||
std::find_if(segments.rbegin(), segments.rend(), | ||
[](const std::string& seg) { return !seg.empty(); }); | ||
return std::vector<std::string>(segments.begin(), it.base()); | ||
} | ||
|
||
static std::vector<std::string> split_device_id_string( | ||
std::basic_string<char> device_id_string) { | ||
std::vector<std::string> splits; | ||
std::istringstream stream(device_id_string); | ||
std::string segment; | ||
while (std::getline(stream, segment, '.')) { | ||
splits.push_back(segment); | ||
} | ||
return splits; | ||
} | ||
jt2594838 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
}; | ||
|
||
class PlainDeviceID : public IDeviceID { | ||
jt2594838 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
public: | ||
explicit PlainDeviceID(const std::string& deviceID) | ||
: device_id_(deviceID), tableName_(), segments_() {} | ||
|
||
~PlainDeviceID() override = default; | ||
|
||
bool operator==(const IDeviceID& other) override { | ||
return device_id_ == other.get_device_name(); | ||
} | ||
|
||
bool operator!=(const IDeviceID& other) override { | ||
return device_id_ != other.get_device_name(); | ||
} | ||
|
||
int serialize(common::ByteStream& write_stream) override { | ||
int ret = common::E_OK; | ||
if (RET_FAIL(common::SerializationUtil::write_var_int(device_id_.size(), | ||
write_stream))) { | ||
return ret; | ||
} else if (RET_FAIL(write_stream.write_buf(device_id_.c_str(), | ||
device_id_.size()))) { | ||
return ret; | ||
} | ||
return ret; | ||
} | ||
|
||
std::string get_device_name() const override { return device_id_; }; | ||
|
||
std::string get_table_name() override { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You can verify that you need to return a copy There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fixed |
||
if (!tableName_.empty()) { | ||
return tableName_; | ||
} | ||
|
||
size_t lastSeparatorPos = device_id_.find_last_of('.'); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fixed |
||
if (lastSeparatorPos == std::string::npos) { | ||
tableName_ = device_id_; // Use entire deviceID as tableName | ||
} else { | ||
tableName_ = device_id_.substr(0, lastSeparatorPos); | ||
} | ||
return tableName_; | ||
} | ||
|
||
int segment_num() override { | ||
if (!segments_.empty()) { | ||
return static_cast<int>(segments_.size()); | ||
} | ||
split_segments(); | ||
return static_cast<int>(segments_.size()); | ||
} | ||
|
||
bool operator<(const IDeviceID& other) override { | ||
return device_id_ < other.get_device_name(); | ||
} | ||
|
||
private: | ||
std::string device_id_; | ||
std::string tableName_; | ||
std::vector<std::string> segments_; | ||
|
||
void split_segments() { | ||
std::istringstream stream(device_id_); | ||
std::string segment; | ||
while (std::getline(stream, segment, '.')) { | ||
segments_.push_back(segment); | ||
} | ||
} | ||
}; | ||
|
||
#endif |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -46,6 +46,32 @@ void init_config_value() { | |
g_config_value_.time_compress_type_ = LZ4; | ||
} | ||
|
||
extern TSEncoding get_value_encoder(TSDataType data_type) { | ||
switch (data_type) { | ||
case BOOLEAN: | ||
return TSEncoding::RLE; | ||
case INT32: | ||
return TSEncoding::TS_2DIFF; | ||
case INT64: | ||
return TSEncoding::TS_2DIFF; | ||
case FLOAT: | ||
return TSEncoding::GORILLA; | ||
case DOUBLE: | ||
return TSEncoding::GORILLA; | ||
case TEXT: | ||
return TSEncoding::PLAIN; | ||
case VECTOR: | ||
break; | ||
case NULL_TYPE: | ||
break; | ||
case INVALID_DATATYPE: | ||
break; | ||
default: | ||
break; | ||
} | ||
return TSEncoding::PLAIN; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Note that silently converting unsupported encoding formats to PLAIN may cause issues. Some form of feedback or notification is needed. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fixed |
||
} | ||
|
||
void config_set_page_max_point_count(uint32_t page_max_ponint_count) { | ||
g_config_value_.page_writer_max_point_num_ = page_max_ponint_count; | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is already one such method. You can view the develop branch
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed