Skip to content

Commit

Permalink
[Fix](recycler) Further fix for #47475 (#47486)
Browse files Browse the repository at this point in the history
Related PR: #47475 
在pr#47475中,我们修复了潜在的少删数据的问题,是通过在delete rowset data函数中添加删除逻辑,删除recycle
tablet中漏删的文件,但是那个pr忽略了其中存在的一个判断条件,导致在recycle tablet中漏删的文件被跳过了,没有实际删除。

在此pr中,tmp
rowset不再依赖于上述判断条件,尽可能删除每一个rowset数据,包括倒排索引v2的数据,但是普通的rowset不会跳过这个判断条件,因为普通rowset数量过大,如果不跳过,可能会影响删除效率。
  • Loading branch information
Yukang-Lian authored Feb 3, 2025
1 parent f4c220a commit 2870721
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 12 deletions.
1 change: 1 addition & 0 deletions cloud/src/recycler/hdfs_accessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,7 @@ std::string extract_parent_path(const std::string& path) {
}

int HdfsAccessor::init() {
TEST_SYNC_POINT_RETURN_WITH_VALUE("HdfsAccessor::init.hdfs_init_failed", (int)-1);
// TODO(plat1ko): Cache hdfsFS
fs_ = HDFSBuilder::create_fs(info_.build_conf());
if (!fs_) {
Expand Down
24 changes: 15 additions & 9 deletions cloud/src/recycler/recycler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1464,15 +1464,18 @@ int InstanceRecycler::delete_rowset_data(const doris::RowsetMetaCloudPB& rs_meta
return accessor->delete_files(file_paths);
}

int InstanceRecycler::delete_rowset_data(const std::vector<doris::RowsetMetaCloudPB>& rowsets) {
int InstanceRecycler::delete_rowset_data(const std::vector<doris::RowsetMetaCloudPB>& rowsets,
RowsetRecyclingState type) {
int ret = 0;
// resource_id -> file_paths
std::map<std::string, std::vector<std::string>> resource_file_paths;
// (resource_id, tablet_id, rowset_id)
std::vector<std::tuple<std::string, int64_t, std::string>> rowsets_delete_by_prefix;

for (const auto& rs : rowsets) {
{
// we have to treat tmp rowset as "orphans" that may not related to any existing tablets
// due to aborted schema change.
if (type == RowsetRecyclingState::FORMAL_ROWSET) {
std::lock_guard lock(recycled_tablets_mtx_);
if (recycled_tablets_.count(rs.tablet_id())) {
continue; // Rowset data has already been deleted
Expand All @@ -1499,7 +1502,7 @@ int InstanceRecycler::delete_rowset_data(const std::vector<doris::RowsetMetaClou
std::vector<std::pair<int64_t, std::string>> index_ids;
// default format as v1.
InvertedIndexStorageFormatPB index_format = InvertedIndexStorageFormatPB::V1;

int inverted_index_get_ret = 0;
if (rs.has_tablet_schema()) {
for (const auto& index : rs.tablet_schema().index()) {
if (index.has_index_type() && index.index_type() == IndexType::INVERTED) {
Expand All @@ -1519,12 +1522,12 @@ int InstanceRecycler::delete_rowset_data(const std::vector<doris::RowsetMetaClou
continue;
}
InvertedIndexInfo index_info;
int get_ret =
inverted_index_get_ret =
inverted_index_id_cache_->get(rs.index_id(), rs.schema_version(), index_info);
if (get_ret == 0) {
if (inverted_index_get_ret == 0) {
index_format = index_info.first;
index_ids = index_info.second;
} else if (get_ret == 1) {
} else if (inverted_index_get_ret == 1) {
// 1. Schema kv not found means tablet has been recycled
// Maybe some tablet recycle failed by some bugs
// We need to delete again to double check
Expand Down Expand Up @@ -1562,7 +1565,10 @@ int InstanceRecycler::delete_rowset_data(const std::vector<doris::RowsetMetaClou
file_paths.push_back(inverted_index_path_v1(tablet_id, rowset_id, i,
index_id.first, index_id.second));
}
} else if (!index_ids.empty()) {
} else if (!index_ids.empty() || inverted_index_get_ret == 1) {
// try to recycle inverted index v2 when get_ret == 1
// we treat schema not found as if it has a v2 format inverted index
// to reduce chance of data leakage
file_paths.push_back(inverted_index_path_v2(tablet_id, rowset_id, i));
}
}
Expand Down Expand Up @@ -2028,7 +2034,7 @@ int InstanceRecycler::recycle_rowsets() {
rowsets_to_delete.swap(rowsets);
worker_pool->submit([&, rowset_keys_to_delete = std::move(rowset_keys_to_delete),
rowsets_to_delete = std::move(rowsets_to_delete)]() {
if (delete_rowset_data(rowsets_to_delete) != 0) {
if (delete_rowset_data(rowsets_to_delete, RowsetRecyclingState::FORMAL_ROWSET) != 0) {
LOG(WARNING) << "failed to delete rowset data, instance_id=" << instance_id_;
return;
}
Expand Down Expand Up @@ -2225,7 +2231,7 @@ int InstanceRecycler::recycle_tmp_rowsets() {
tmp_rowset_keys.clear();
tmp_rowsets.clear();
});
if (delete_rowset_data(tmp_rowsets) != 0) {
if (delete_rowset_data(tmp_rowsets, RowsetRecyclingState::TMP_ROWSET) != 0) {
LOG(WARNING) << "failed to delete tmp rowset data, instance_id=" << instance_id_;
return -1;
}
Expand Down
8 changes: 7 additions & 1 deletion cloud/src/recycler/recycler.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,11 @@ class Recycler {
std::shared_ptr<TxnLazyCommitter> txn_lazy_committer_;
};

enum class RowsetRecyclingState {
FORMAL_ROWSET,
TMP_ROWSET,
};

class InstanceRecycler {
public:
explicit InstanceRecycler(std::shared_ptr<TxnKv> txn_kv, const InstanceInfoPB& instance,
Expand Down Expand Up @@ -222,7 +227,8 @@ class InstanceRecycler {
const std::string& rowset_id);

// return 0 for success otherwise error
int delete_rowset_data(const std::vector<doris::RowsetMetaCloudPB>& rowsets);
int delete_rowset_data(const std::vector<doris::RowsetMetaCloudPB>& rowsets,
RowsetRecyclingState type);

/**
* Get stage storage info from instance and init StorageVaultAccessor
Expand Down
10 changes: 8 additions & 2 deletions cloud/test/recycler_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1129,6 +1129,7 @@ TEST(RecyclerTest, recycle_indexes) {
j & 1);
auto tmp_rowset = create_rowset("recycle_tmp_rowsets", tablet_id, index_id, 5,
schemas[j % 5], txn_id_base + j);
tmp_rowset.set_resource_id("recycle_indexes");
create_tmp_rowset(txn_kv.get(), accessor.get(), tmp_rowset, j & 1);
}
for (int j = 0; j < 10; ++j) {
Expand Down Expand Up @@ -3132,7 +3133,7 @@ TEST(RecyclerTest, delete_rowset_data) {

rowset_pbs.emplace_back(std::move(rowset));
}
ASSERT_EQ(0, recycler.delete_rowset_data(rowset_pbs));
ASSERT_EQ(0, recycler.delete_rowset_data(rowset_pbs, RowsetRecyclingState::FORMAL_ROWSET));
std::unique_ptr<ListIterator> list_iter;
ASSERT_EQ(0, accessor->list_all(&list_iter));
ASSERT_FALSE(list_iter->has_next());
Expand Down Expand Up @@ -3237,7 +3238,7 @@ TEST(RecyclerTest, delete_rowset_data_without_inverted_index_storage_format) {

rowset_pbs.emplace_back(std::move(rowset));
}
ASSERT_EQ(0, recycler.delete_rowset_data(rowset_pbs));
ASSERT_EQ(0, recycler.delete_rowset_data(rowset_pbs, RowsetRecyclingState::FORMAL_ROWSET));
std::unique_ptr<ListIterator> list_iter;
ASSERT_EQ(0, accessor->list_all(&list_iter));
ASSERT_FALSE(list_iter->has_next());
Expand Down Expand Up @@ -3352,6 +3353,11 @@ TEST(RecyclerTest, init_vault_accessor_failed_test) {
rs = resp->add_rowset_meta();
rs->set_resource_id("success_vault");
});
sp->set_call_back("HdfsAccessor::init.hdfs_init_failed", [](auto&& args) {
auto* ret = try_any_cast_ret<int>(args);
ret->first = -1;
ret->second = true;
});
sp->enable_processing();

// succeed to init MockAccessor
Expand Down

0 comments on commit 2870721

Please sign in to comment.