Skip to content

Commit

Permalink
reserve before loop
Browse files Browse the repository at this point in the history
  • Loading branch information
lzyy2024 committed Jan 25, 2025
1 parent c5d6c70 commit b8b3bad
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 31 deletions.
56 changes: 27 additions & 29 deletions be/src/vec/functions/function_compress.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class FunctionCompress : public IFunction {
size_t get_number_of_arguments() const override { return 1; }

DataTypePtr get_return_type_impl(const DataTypes& arguments) const override {
return make_nullable(std::make_shared<DataTypeString>());
return std::make_shared<DataTypeString>();
}

Status execute_impl(FunctionContext* context, Block& block, const ColumnNumbers& arguments,
Expand All @@ -85,24 +85,20 @@ class FunctionCompress : public IFunction {
auto& col_offset = result_column->get_offsets();
col_offset.resize(input_rows_count);

auto null_column = ColumnUInt8::create(input_rows_count);
auto& null_map = null_column->get_data();

faststring compressed_str;
Slice data;

// When the original string is large, the result is roughly this value
size_t total = arg_offset[input_rows_count - 1];
col_data.reserve(total / 1000);

for (size_t row = 0; row < input_rows_count; row++) {
null_map[row] = false;
size_t length = arg_offset[row] - arg_offset[row - 1];
data = Slice(arg_begin, length);

// Z_MEM_ERROR and Z_BUF_ERROR are already handled in compress, making sure st is always Z_OK
auto st = compression_codec->compress(data, &compressed_str);

if (!st.ok()) { // Failed to compress. The data should be a valid string or value.
col_offset[row] = col_offset[row - 1];
null_map[row] = true;
continue;
}

size_t idx = col_data.size();
if (!length) { // data is ''
col_data.resize(col_data.size() + 2);
Expand All @@ -112,7 +108,9 @@ class FunctionCompress : public IFunction {
}

// first ten digits represent the length of the uncompressed string
col_data.resize(col_data.size() + 10);
if (col_data.size() + 10 > col_data.capacity()) {
col_data.resize(col_data.size() + 10);
}
col_data[idx] = '0', col_data[idx + 1] = 'x';
for (size_t i = 0; i < 4; i++) {
unsigned char byte = (length >> (i * 8)) & 0xFF;
Expand All @@ -121,7 +119,9 @@ class FunctionCompress : public IFunction {
}
idx += 10;

col_data.resize(col_data.size() + 2 * compressed_str.size());
if (col_data.size() + 2 * compressed_str.size() > col_data.capacity()) {
col_data.resize(col_data.size() + 2 * compressed_str.size());
}

unsigned char* src = compressed_str.data();
for (size_t i = 0; i < compressed_str.size(); i++) {
Expand All @@ -133,19 +133,12 @@ class FunctionCompress : public IFunction {
col_offset[row] = col_offset[row - 1] + 10 + compressed_str.size() * 2;
}

block.replace_by_position(
result, ColumnNullable::create(std::move(result_column), std::move(null_column)));
block.replace_by_position(result, std::move(result_column));
return Status::OK();
}
};

class FunctionUncompress : public IFunction {
string hexadecimal = "0123456789ABCDEF";
std::map<char, int> hex_ctoi = {
{'0', 0}, {'1', 1}, {'2', 2}, {'3', 3}, {'4', 4}, {'5', 5}, {'6', 6}, {'7', 7},
{'8', 8}, {'9', 9}, {'A', 10}, {'B', 11}, {'C', 12}, {'D', 13}, {'E', 14}, {'F', 15},
{'a', 10}, {'b', 11}, {'c', 12}, {'d', 13}, {'e', 14}, {'f', 15}};

public:
static constexpr auto name = "uncompress";
static FunctionPtr create() { return std::make_shared<FunctionUncompress>(); }
Expand Down Expand Up @@ -183,11 +176,11 @@ class FunctionUncompress : public IFunction {
std::string uncompressed;
Slice data;
Slice uncompressed_slice;
for (size_t row = 0; row < input_rows_count; row++) {
std::function<bool(char)> check = [](char x) {
return ((x >= '0' && x <= '9') || (x >= 'a' && x <= 'f') || (x >= 'A' && x <= 'F'));
};

size_t total = arg_offset[input_rows_count - 1];
col_data.reserve(total * 1000);

for (size_t row = 0; row < input_rows_count; row++) {
null_map[row] = false;
data = Slice(arg_begin, arg_offset[row] - arg_offset[row - 1]);
size_t data_length = arg_offset[row] - arg_offset[row - 1];
Expand All @@ -201,7 +194,7 @@ class FunctionUncompress : public IFunction {
illegal = true;
}
for (size_t i = 2; i <= 9; i += 2) {
if (!check(data[i])) {
if (!std::isxdigit(data[i])) {
illegal = true;
}
}
Expand All @@ -215,7 +208,8 @@ class FunctionUncompress : public IFunction {

unsigned int length = 0;
for (size_t i = 2; i <= 9; i += 2) {
unsigned char byte = (hex_ctoi.at(data[i]) << 4) + hex_ctoi.at(data[i + 1]);
unsigned char byte;
std::from_chars(data.data + i, data.data + i + 2, byte, 16);
length += (byte << (8 * (i / 2 - 1))); //Little Endian : 0x01000000 -> 1
}

Expand All @@ -225,7 +219,9 @@ class FunctionUncompress : public IFunction {
//Converts a hexadecimal readable string to a compressed byte stream
std::string s(((int)data.size - 10) / 2, ' '); // byte stream data.size >= 10
for (size_t i = 10, j = 0; i < data.size; i += 2, j++) {
s[j] = (hex_ctoi.at(data[i]) << 4) + hex_ctoi.at(data[i + 1]);
unsigned char result;
std::from_chars(data.data + i, data.data + i + 2, result, 16);
s[j] = static_cast<char>(result);
}
Slice compressed_data(s);
auto st = compression_codec->decompress(compressed_data, &uncompressed_slice);
Expand All @@ -237,7 +233,9 @@ class FunctionUncompress : public IFunction {
}

int idx = col_data.size();
col_data.resize(col_data.size() + uncompressed_slice.size);
if (col_data.size() + uncompressed_slice.size > col_data.capacity()) {
col_data.resize(col_data.size() + uncompressed_slice.size);
}
col_offset[row] = col_offset[row - 1] + uncompressed_slice.size;
memcpy(col_data.data() + idx, uncompressed_slice.data, uncompressed_slice.size);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@

import org.apache.doris.catalog.FunctionSignature;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.functions.AlwaysNullable;
import org.apache.doris.nereids.trees.expressions.functions.ExplicitlyCastableSignature;
import org.apache.doris.nereids.trees.expressions.functions.PropagateNullable;
import org.apache.doris.nereids.trees.expressions.shape.UnaryExpression;
import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor;
import org.apache.doris.nereids.types.StringType;
import org.apache.doris.nereids.types.VarcharType;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
Expand All @@ -34,9 +35,10 @@
* ScalarFunction 'compress'.
*/
public class Compress extends ScalarFunction
implements UnaryExpression, ExplicitlyCastableSignature, AlwaysNullable {
implements UnaryExpression, ExplicitlyCastableSignature, PropagateNullable {

public static final List<FunctionSignature> SIGNATURES = ImmutableList.of(
FunctionSignature.ret(StringType.INSTANCE).args(VarcharType.SYSTEM_DEFAULT),
FunctionSignature.ret(StringType.INSTANCE).args(StringType.INSTANCE));

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.doris.nereids.trees.expressions.shape.UnaryExpression;
import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor;
import org.apache.doris.nereids.types.StringType;
import org.apache.doris.nereids.types.VarcharType;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
Expand All @@ -37,6 +38,7 @@ public class Uncompress extends ScalarFunction
implements UnaryExpression, ExplicitlyCastableSignature, AlwaysNullable {

public static final List<FunctionSignature> SIGNATURES = ImmutableList.of(
FunctionSignature.ret(StringType.INSTANCE).args(VarcharType.SYSTEM_DEFAULT),
FunctionSignature.ret(StringType.INSTANCE).args(StringType.INSTANCE));

/**
Expand Down

0 comments on commit b8b3bad

Please sign in to comment.