Skip to content

Commit

Permalink
Merge branch 'master' into feat-support-hive-datatype
Browse files Browse the repository at this point in the history
  • Loading branch information
WeisonWei authored Jan 26, 2025
2 parents 135a0c8 + 304443d commit b4bbbd6
Show file tree
Hide file tree
Showing 13 changed files with 128 additions and 35 deletions.
4 changes: 4 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,10 @@ DEFINE_mInt32(download_low_speed_limit_kbps, "50");
DEFINE_mInt32(download_low_speed_time, "300");
// whether to download small files in batch
DEFINE_mBool(enable_batch_download, "true");
// whether to check md5sum when download
DEFINE_mBool(enable_download_md5sum_check, "true");
// download binlog meta timeout, default 30s
DEFINE_mInt32(download_binlog_meta_timeout_ms, "30000");

DEFINE_String(sys_log_dir, "");
DEFINE_String(user_function_dir, "${DORIS_HOME}/lib/udf");
Expand Down
4 changes: 4 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,10 @@ DECLARE_mInt32(download_low_speed_limit_kbps);
DECLARE_mInt32(download_low_speed_time);
// whether to download small files in batch.
DECLARE_mBool(enable_batch_download);
// whether to check md5sum when download
DECLARE_mBool(enable_download_md5sum_check);
// download binlog meta timeout
DECLARE_mInt32(download_binlog_meta_timeout_ms);

// deprecated, use env var LOG_DIR in be.conf
DECLARE_String(sys_log_dir);
Expand Down
1 change: 0 additions & 1 deletion be/src/pipeline/exec/analytic_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ Status AnalyticSourceOperatorX::get_block(RuntimeState* state, vectorized::Block
local_state.reached_limit(output_block, eos);
if (!output_block->empty()) {
auto return_rows = output_block->rows();
local_state._num_rows_returned += return_rows;
COUNTER_UPDATE(local_state._filtered_rows_counter, output_rows - return_rows);
}
return Status::OK();
Expand Down
19 changes: 13 additions & 6 deletions be/src/runtime/snapshot_loader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include <unordered_map>
#include <utility>

#include "common/config.h"
#include "common/logging.h"
#include "gutil/strings/split.h"
#include "http/http_client.h"
Expand Down Expand Up @@ -411,9 +412,7 @@ Status SnapshotLoader::download(const std::map<std::string, std::string>& src_to
Status SnapshotLoader::remote_http_download(
const std::vector<TRemoteTabletSnapshot>& remote_tablet_snapshots,
std::vector<int64_t>* downloaded_tablet_ids) {
constexpr uint32_t kListRemoteFileTimeout = 15;
constexpr uint32_t kDownloadFileMaxRetry = 3;
constexpr uint32_t kGetLengthTimeout = 10;

// check if job has already been cancelled
int tmp_counter = 1;
Expand Down Expand Up @@ -496,7 +495,7 @@ Status SnapshotLoader::remote_http_download(
string file_list_str;
auto list_files_cb = [&remote_url_prefix, &file_list_str](HttpClient* client) {
RETURN_IF_ERROR(client->init(remote_url_prefix));
client->set_timeout_ms(kListRemoteFileTimeout * 1000);
client->set_timeout_ms(config::download_binlog_meta_timeout_ms);
return client->execute(&file_list_str);
};
RETURN_IF_ERROR(HttpClient::execute_with_retry(kDownloadFileMaxRetry, 1, list_files_cb));
Expand All @@ -512,12 +511,20 @@ Status SnapshotLoader::remote_http_download(
uint64_t file_size = 0;
std::string file_md5;
auto get_file_stat_cb = [&remote_file_url, &file_size, &file_md5](HttpClient* client) {
std::string url = fmt::format("{}&acquire_md5=true", remote_file_url);
int64_t timeout_ms = config::download_binlog_meta_timeout_ms;
std::string url = remote_file_url;
if (config::enable_download_md5sum_check) {
// compute md5sum is time-consuming, so we set a longer timeout
timeout_ms = config::download_binlog_meta_timeout_ms * 3;
url = fmt::format("{}&acquire_md5=true", remote_file_url);
}
RETURN_IF_ERROR(client->init(url));
client->set_timeout_ms(kGetLengthTimeout * 1000);
client->set_timeout_ms(timeout_ms);
RETURN_IF_ERROR(client->head());
RETURN_IF_ERROR(client->get_content_length(&file_size));
RETURN_IF_ERROR(client->get_content_md5(&file_md5));
if (config::enable_download_md5sum_check) {
RETURN_IF_ERROR(client->get_content_md5(&file_md5));
}
return Status::OK();
};
RETURN_IF_ERROR(
Expand Down
42 changes: 22 additions & 20 deletions be/src/service/backend_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ class TTransportException;
namespace doris {

namespace {
constexpr uint64_t kMaxTimeoutMs = 3000; // 3s

struct IngestBinlogArg {
int64_t txn_id;
int64_t partition_id;
Expand Down Expand Up @@ -156,6 +156,14 @@ void _ingest_binlog(StorageEngine& engine, IngestBinlogArg* arg) {
tstatus.error_msgs.push_back(std::move(error_msg));
};

auto estimate_download_timeout = [](int64_t file_size) {
uint64_t estimate_timeout = file_size / config::download_low_speed_limit_kbps / 1024;
if (estimate_timeout < config::download_low_speed_time) {
estimate_timeout = config::download_low_speed_time;
}
return estimate_timeout;
};

// Step 3: get binlog info
auto binlog_api_url = fmt::format("http://{}:{}/api/_binlog/_download", request.remote_host,
request.remote_port);
Expand All @@ -167,7 +175,7 @@ void _ingest_binlog(StorageEngine& engine, IngestBinlogArg* arg) {
std::string binlog_info;
auto get_binlog_info_cb = [&get_binlog_info_url, &binlog_info](HttpClient* client) {
RETURN_IF_ERROR(client->init(get_binlog_info_url));
client->set_timeout_ms(kMaxTimeoutMs);
client->set_timeout_ms(config::download_binlog_meta_timeout_ms);
return client->execute(&binlog_info);
};
auto status = HttpClient::execute_with_retry(max_retry, 1, get_binlog_info_cb);
Expand Down Expand Up @@ -206,7 +214,7 @@ void _ingest_binlog(StorageEngine& engine, IngestBinlogArg* arg) {
std::string rowset_meta_str;
auto get_rowset_meta_cb = [&get_rowset_meta_url, &rowset_meta_str](HttpClient* client) {
RETURN_IF_ERROR(client->init(get_rowset_meta_url));
client->set_timeout_ms(kMaxTimeoutMs);
client->set_timeout_ms(config::download_binlog_meta_timeout_ms);
return client->execute(&rowset_meta_str);
};
status = HttpClient::execute_with_retry(max_retry, 1, get_rowset_meta_cb);
Expand Down Expand Up @@ -255,7 +263,7 @@ void _ingest_binlog(StorageEngine& engine, IngestBinlogArg* arg) {
auto get_segment_file_size_cb = [&get_segment_file_size_url,
&segment_file_size](HttpClient* client) {
RETURN_IF_ERROR(client->init(get_segment_file_size_url));
client->set_timeout_ms(kMaxTimeoutMs);
client->set_timeout_ms(config::download_binlog_meta_timeout_ms);
RETURN_IF_ERROR(client->head());
return client->get_content_length(&segment_file_size);
};
Expand Down Expand Up @@ -291,16 +299,11 @@ void _ingest_binlog(StorageEngine& engine, IngestBinlogArg* arg) {
auto get_segment_file_url =
fmt::format("{}&acquire_md5=true", segment_file_urls[segment_index]);

uint64_t estimate_timeout =
segment_file_size / config::download_low_speed_limit_kbps / 1024;
if (estimate_timeout < config::download_low_speed_time) {
estimate_timeout = config::download_low_speed_time;
}

auto segment_path = local_segment_path(local_tablet->tablet_path(),
rowset_meta->rowset_id().to_string(), segment_index);
LOG(INFO) << "download segment file from " << get_segment_file_url << " to "
<< segment_path;
uint64_t estimate_timeout = estimate_download_timeout(segment_file_size);
auto get_segment_file_cb = [&get_segment_file_url, &segment_path, segment_file_size,
estimate_timeout, &download_success_files](HttpClient* client) {
RETURN_IF_ERROR(client->init(get_segment_file_url));
Expand All @@ -309,7 +312,9 @@ void _ingest_binlog(StorageEngine& engine, IngestBinlogArg* arg) {
download_success_files.push_back(segment_path);

std::string remote_file_md5;
RETURN_IF_ERROR(client->get_content_md5(&remote_file_md5));
if (config::enable_download_md5sum_check) {
RETURN_IF_ERROR(client->get_content_md5(&remote_file_md5));
}
LOG(INFO) << "download segment file to " << segment_path
<< ", remote md5: " << remote_file_md5
<< ", remote size: " << segment_file_size;
Expand Down Expand Up @@ -381,7 +386,7 @@ void _ingest_binlog(StorageEngine& engine, IngestBinlogArg* arg) {
[&get_segment_index_file_size_url,
&segment_index_file_size](HttpClient* client) {
RETURN_IF_ERROR(client->init(get_segment_index_file_size_url));
client->set_timeout_ms(kMaxTimeoutMs);
client->set_timeout_ms(config::download_binlog_meta_timeout_ms);
RETURN_IF_ERROR(client->head());
return client->get_content_length(&segment_index_file_size);
};
Expand Down Expand Up @@ -420,7 +425,7 @@ void _ingest_binlog(StorageEngine& engine, IngestBinlogArg* arg) {
[&get_segment_index_file_size_url,
&segment_index_file_size](HttpClient* client) {
RETURN_IF_ERROR(client->init(get_segment_index_file_size_url));
client->set_timeout_ms(kMaxTimeoutMs);
client->set_timeout_ms(config::download_binlog_meta_timeout_ms);
RETURN_IF_ERROR(client->head());
return client->get_content_length(&segment_index_file_size);
};
Expand Down Expand Up @@ -468,12 +473,7 @@ void _ingest_binlog(StorageEngine& engine, IngestBinlogArg* arg) {
auto get_segment_index_file_url =
fmt::format("{}&acquire_md5=true", segment_index_file_urls[i]);

uint64_t estimate_timeout =
segment_index_file_size / config::download_low_speed_limit_kbps / 1024;
if (estimate_timeout < config::download_low_speed_time) {
estimate_timeout = config::download_low_speed_time;
}

uint64_t estimate_timeout = estimate_download_timeout(segment_index_file_size);
auto local_segment_index_path = segment_index_file_names[i];
LOG(INFO) << fmt::format("download segment index file from {} to {}",
get_segment_index_file_url, local_segment_index_path);
Expand All @@ -486,7 +486,9 @@ void _ingest_binlog(StorageEngine& engine, IngestBinlogArg* arg) {
download_success_files.push_back(local_segment_index_path);

std::string remote_file_md5;
RETURN_IF_ERROR(client->get_content_md5(&remote_file_md5));
if (config::enable_download_md5sum_check) {
RETURN_IF_ERROR(client->get_content_md5(&remote_file_md5));
}

std::error_code ec;
// Check file length
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,10 @@ public PartitionKeyValueType getPartitionType() {
return partitionKeyValueType;
}

public boolean hasInValues() {
return inValues != null;
}

public void analyze(int partColNum) throws AnalysisException {
if (isDummy()) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -749,12 +749,18 @@ public void cancel(CancelBackupStmt stmt) throws DdlException {

public boolean handleFinishedSnapshotTask(SnapshotTask task, TFinishTaskRequest request) {
AbstractJob job = getCurrentJob(task.getDbId());

if (job == null) {
LOG.warn("failed to find backup or restore job for task: {}", task);
// return true to remove this task from AgentTaskQueue
return true;
}

if (job.getJobId() != task.getJobId()) {
LOG.warn("invalid snapshot task: {}, job id: {}, task job id: {}", task, job.getJobId(), task.getJobId());
// return true to remove this task from AgentTaskQueue
return true;
}

if (job instanceof BackupJob) {
if (task.isRestoreTask()) {
LOG.warn("expect finding restore job, but get backup job {} for task: {}", job, task);
Expand All @@ -779,13 +785,13 @@ public boolean handleFinishedSnapshotUploadTask(UploadTask task, TFinishTaskRequ
LOG.info("invalid upload task: {}, no backup job is found. db id: {}", task, task.getDbId());
return false;
}
BackupJob restoreJob = (BackupJob) job;
if (restoreJob.getJobId() != task.getJobId() || restoreJob.getState() != BackupJobState.UPLOADING) {
BackupJob backupJob = (BackupJob) job;
if (backupJob.getJobId() != task.getJobId() || backupJob.getState() != BackupJobState.UPLOADING) {
LOG.info("invalid upload task: {}, job id: {}, job state: {}",
task, restoreJob.getJobId(), restoreJob.getState().name());
task, backupJob.getJobId(), backupJob.getState().name());
return false;
}
return restoreJob.finishSnapshotUploadTask(task, request);
return backupJob.finishSnapshotUploadTask(task, request);
}

public boolean handleDownloadSnapshotTask(DownloadTask task, TFinishTaskRequest request) {
Expand All @@ -796,6 +802,12 @@ public boolean handleDownloadSnapshotTask(DownloadTask task, TFinishTaskRequest
return true;
}

if (job.getJobId() != task.getJobId()) {
LOG.warn("invalid download task: {}, job id: {}, task job id: {}", task, job.getJobId(), task.getJobId());
// return true to remove this task from AgentTaskQueue
return true;
}

return ((RestoreJob) job).finishTabletDownloadTask(task, request);
}

Expand All @@ -807,6 +819,12 @@ public boolean handleDirMoveTask(DirMoveTask task, TFinishTaskRequest request) {
return true;
}

if (job.getJobId() != task.getJobId()) {
LOG.warn("invalid dir move task: {}, job id: {}, task job id: {}", task, job.getJobId(), task.getJobId());
// return true to remove this task from AgentTaskQueue
return true;
}

return ((RestoreJob) job).finishDirMoveTask(task, request);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ public PartitionItem createAndCheckPartitionItem(SinglePartitionDesc desc, boole
// get partition key
PartitionKeyDesc partitionKeyDesc = desc.getPartitionKeyDesc();

if (!partitionKeyDesc.hasInValues()) {
throw new DdlException("List partition expected 'VALUES [IN or ((\"xxx\", \"xxx\"), ...)]'");
}

// we might receive one whole empty values list, we should add default partition value for
// such occasion
for (List<PartitionValue> values : partitionKeyDesc.getInValues()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ public PartitionItem createAndCheckPartitionItem(SinglePartitionDesc desc, boole
Range<PartitionKey> newRange = null;
PartitionKeyDesc partitionKeyDesc = desc.getPartitionKeyDesc();
// check range
if (partitionKeyDesc.hasInValues()) {
throw new DdlException("Range partition expected 'VALUES [LESS THAN or [(\"xxx\" ,...), (\"xxx\", ...))]'");
}
try {
newRange = createAndCheckNewRange(partitionKeyDesc, isTemp);
} catch (AnalysisException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

suite("test_backup_restore_colocate", "backup_restore") {
String suiteName = "test_backup_restore_colocate"
String repoName = "${suiteName}_repo"
String repoName = "repo_" + UUID.randomUUID().toString().replace("-", "")
String dbName = "${suiteName}_db"
String newDbName = "${suiteName}_db_new"
String tableName1 = "${suiteName}_table1"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import org.codehaus.groovy.runtime.IOGroovyMethods

suite("test_cumu_compaction_with_delete") {
suite("test_cumu_compaction_with_delete", "nonConcurrent") {
def backendId_to_backendIP = [:]
def backendId_to_backendHttpPort = [:]
getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

suite("test_partition_add_mismatched") {
sql "drop table if exists test_partition_add_mismatched_list_tbl"
sql """
CREATE TABLE IF NOT EXISTS test_partition_add_mismatched_list_tbl (
k1 int NOT NULL,
k2 bigint NOT NULL
)
PARTITION BY LIST(k1,k2) ()
DISTRIBUTED BY HASH(k1) BUCKETS 5 properties("replication_num" = "1")
"""

sql "drop table if exists test_partition_add_mismatched_range_tbl"
sql """
CREATE TABLE IF NOT EXISTS test_partition_add_mismatched_range_tbl (
k1 int NOT NULL,
k2 bigint NOT NULL
)
PARTITION BY RANGE(k1,k2) ()
DISTRIBUTED BY HASH(k1) BUCKETS 5 properties("replication_num" = "1")
"""

test{
sql """ alter table test_partition_add_mismatched_list_tbl add partition p0 values [("0"), ("2")) """
exception "List partition expected 'VALUES [IN or ((\"xxx\", \"xxx\"), ...)]'"
}

test{
sql """ alter table test_partition_add_mismatched_range_tbl add partition p0 values in (("0", "2")) """
exception "Range partition expected 'VALUES [LESS THAN or [(\"xxx\" ,...), (\"xxx\", ...))]'"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ suite("test_column_boundary") {
sql """
CREATE TABLE IF NOT EXISTS test_column_boundary (
u_id int NULL COMMENT "",
u_city varchar(20) NULL COMMENT ""
u_city varchar(40) NULL COMMENT ""
) ENGINE=OLAP
DUPLICATE KEY(`u_id`, `u_city`)
DISTRIBUTED BY HASH(`u_id`, `u_city`) BUCKETS 1
Expand Down

0 comments on commit b4bbbd6

Please sign in to comment.