Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: hoist key mutexes to ExecuteCommands #2620

Merged
merged 31 commits into from
Nov 4, 2024
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
df58c47
Merge commit 'a29df097dd23c36a749e5c616f4d1cae1051e853' into unstable
PokIsemaine Sep 8, 2024
559f6a8
test: txn-context-enabled test cases
PokIsemaine Sep 8, 2024
adc497a
Merge branch 'apache:unstable' into unstable
PokIsemaine Sep 11, 2024
fcfeec5
Merge branch 'apache:unstable' into unstable
PokIsemaine Sep 17, 2024
603fe89
Merge branch 'apache:unstable' into unstable
PokIsemaine Oct 4, 2024
512f4a2
Merge commit 'f2c423348d7bb3b475d988559068ba8e2697d497' into unstable
PokIsemaine Oct 7, 2024
3d067c6
refactor: promote the engine::Context in the Execute
PokIsemaine Oct 7, 2024
476e08b
Merge branch 'unstable' into unstable
PragmaTwice Oct 9, 2024
d7cf182
Merge branch 'apache:unstable' into unstable
PokIsemaine Oct 11, 2024
4373fdd
Merge branch 'apache:unstable' into unstable
PokIsemaine Oct 13, 2024
5e2a4e8
Merge branch 'apache:unstable' into unstable
PokIsemaine Oct 16, 2024
9b20687
Merge branch 'apache:unstable' into unstable
PokIsemaine Oct 23, 2024
f439dbe
refactor: Hoist key mutexes
PokIsemaine Oct 23, 2024
ccf757d
chore: remove temporary comments
PokIsemaine Oct 24, 2024
19715ab
Merge branch 'unstable' into unstable
PragmaTwice Oct 24, 2024
0430acd
Merge branch 'unstable' into unstable
git-hulk Oct 28, 2024
1177698
fix: add MultiLockGuard of OnWrite
PokIsemaine Oct 30, 2024
765366f
fix: try to fix issue 2473
PokIsemaine Oct 31, 2024
b1af21f
Merge branch 'unstable' into unstable
PokIsemaine Oct 31, 2024
3715626
refactor: GetLockKeys=>GetLocks
PokIsemaine Oct 31, 2024
abd3826
fix: issues 2617
PokIsemaine Oct 31, 2024
ab3ce18
Merge branch 'unstable' into unstable
PokIsemaine Oct 31, 2024
23501da
chore: comments & guard
PokIsemaine Oct 31, 2024
6cc5351
Merge branch 'unstable' into unstable
PragmaTwice Nov 1, 2024
a521a7f
Merge branch 'unstable' into unstable
PragmaTwice Nov 1, 2024
02ca56a
Merge branch 'unstable' into unstable
PragmaTwice Nov 2, 2024
2672d72
Merge branch 'unstable' into unstable
PragmaTwice Nov 2, 2024
b8227a1
Merge branch 'unstable' into unstable
PragmaTwice Nov 3, 2024
8b8f1ca
Merge branch 'unstable' into unstable
PragmaTwice Nov 3, 2024
76a8b1d
Merge branch 'unstable' into unstable
PragmaTwice Nov 4, 2024
9af83cd
Merge branch 'unstable' into unstable
PragmaTwice Nov 4, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 16 additions & 2 deletions src/server/redis_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -498,9 +498,24 @@ void Connection::ExecuteCommands(std::deque<CommandTokens> *to_process_cmds) {

SetLastCmd(cmd_name);
{
std::optional<MultiLockGuard> guard;
mapleFU marked this conversation as resolved.
Show resolved Hide resolved
if (cmd_flags & kCmdWrite) {
std::vector<std::string> lock_keys;
attributes->ForEachKeyRange(
[&, this](const std::vector<std::string> &args, const CommandKeyRange &key_range) {
PokIsemaine marked this conversation as resolved.
Show resolved Hide resolved
key_range.ForEachKey(
[&, this](const std::string &key) {
auto ns_key = ComposeNamespaceKey(ns_, key, srv_->storage->IsSlotIdEncoded());
lock_keys.emplace_back(std::move(ns_key));
},
args);
},
cmd_tokens);

guard.emplace(srv_->storage->GetLockManager(), lock_keys);
}
engine::Context ctx(srv_->storage);

// TODO: transaction support for index recording
std::vector<GlobalIndexer::RecordResult> index_records;
if (!srv_->index_mgr.index_map.empty() && IsCmdForIndexing(cmd_flags, attributes->category) &&
!config->cluster_enabled) {
Expand All @@ -521,7 +536,6 @@ void Connection::ExecuteCommands(std::deque<CommandTokens> *to_process_cmds) {
}

s = ExecuteCommand(ctx, cmd_name, cmd_tokens, current_cmd.get(), &reply);
// TODO: transaction support for index updating
for (const auto &record : index_records) {
auto s = GlobalIndexer::Update(ctx, record);
if (!s.IsOK() && !s.Is<Status::TypeMismatched>()) {
Expand Down
20 changes: 8 additions & 12 deletions src/storage/redis_db.cc
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ rocksdb::Status Database::Expire(engine::Context &ctx, const Slice &user_key, ui

std::string value;
Metadata metadata(kRedisNone, false);
LockGuard guard(storage_->GetLockManager(), ns_key);

rocksdb::Status s = storage_->Get(ctx, ctx.GetReadOptions(), metadata_cf_handle_, ns_key, &value);
if (!s.ok()) return s;

Expand Down Expand Up @@ -151,7 +151,7 @@ rocksdb::Status Database::Del(engine::Context &ctx, const Slice &user_key) {
std::string ns_key = AppendNamespacePrefix(user_key);

std::string value;
LockGuard guard(storage_->GetLockManager(), ns_key);

rocksdb::Status s = storage_->Get(ctx, ctx.GetReadOptions(), metadata_cf_handle_, ns_key, &value);
if (!s.ok()) return s;
Metadata metadata(kRedisNone, false);
Expand All @@ -166,13 +166,12 @@ rocksdb::Status Database::Del(engine::Context &ctx, const Slice &user_key) {
rocksdb::Status Database::MDel(engine::Context &ctx, const std::vector<Slice> &keys, uint64_t *deleted_cnt) {
*deleted_cnt = 0;

std::vector<std::string> lock_keys;
lock_keys.reserve(keys.size());
std::vector<std::string> ns_keys;
ns_keys.reserve(keys.size());
for (const auto &key : keys) {
std::string ns_key = AppendNamespacePrefix(key);
lock_keys.emplace_back(std::move(ns_key));
ns_keys.emplace_back(std::move(ns_key));
}
MultiLockGuard guard(storage_->GetLockManager(), lock_keys);

auto batch = storage_->GetWriteBatchBase();
WriteBatchLogData log_data(kRedisNone);
Expand All @@ -182,8 +181,8 @@ rocksdb::Status Database::MDel(engine::Context &ctx, const std::vector<Slice> &k
}

std::vector<Slice> slice_keys;
slice_keys.reserve(lock_keys.size());
for (const auto &ns_key : lock_keys) {
slice_keys.reserve(ns_keys.size());
for (const auto &ns_key : ns_keys) {
slice_keys.emplace_back(ns_key);
}

Expand All @@ -203,7 +202,7 @@ rocksdb::Status Database::MDel(engine::Context &ctx, const std::vector<Slice> &k
if (!s.ok()) continue;
if (metadata.Expired()) continue;

s = batch->Delete(metadata_cf_handle_, lock_keys[i]);
s = batch->Delete(metadata_cf_handle_, ns_keys[i]);
if (!s.ok()) return s;
*deleted_cnt += 1;
}
Expand Down Expand Up @@ -646,9 +645,6 @@ rocksdb::Status Database::typeInternal(engine::Context &ctx, const Slice &key, R

rocksdb::Status Database::Copy(engine::Context &ctx, const std::string &key, const std::string &new_key, bool nx,
bool delete_old, CopyResult *res) {
std::vector<std::string> lock_keys = {key, new_key};
MultiLockGuard guard(storage_->GetLockManager(), lock_keys);

RedisType type = kRedisNone;
auto s = typeInternal(ctx, key, &type);
if (!s.ok()) return s;
Expand Down
10 changes: 1 addition & 9 deletions src/types/redis_bitmap.cc
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,6 @@ rocksdb::Status Bitmap::SetBit(engine::Context &ctx, const Slice &user_key, uint
std::string raw_value;
std::string ns_key = AppendNamespacePrefix(user_key);

LockGuard guard(storage_->GetLockManager(), ns_key);
BitmapMetadata metadata;
rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata, &raw_value);
if (!s.ok() && !s.IsNotFound()) return s;
Expand Down Expand Up @@ -461,7 +460,6 @@ rocksdb::Status Bitmap::BitOp(engine::Context &ctx, BitOpFlags op_flag, const st
const Slice &user_key, const std::vector<Slice> &op_keys, int64_t *len) {
std::string raw_value;
std::string ns_key = AppendNamespacePrefix(user_key);
LockGuard guard(storage_->GetLockManager(), ns_key);

std::vector<std::pair<std::string, BitmapMetadata>> meta_pairs;
uint64_t max_bitmap_size = 0;
Expand Down Expand Up @@ -824,15 +822,9 @@ template <bool ReadOnly>
rocksdb::Status Bitmap::bitfield(engine::Context &ctx, const Slice &user_key, const std::vector<BitfieldOperation> &ops,
std::vector<std::optional<BitfieldValue>> *rets) {
std::string ns_key = AppendNamespacePrefix(user_key);

std::optional<LockGuard> guard;
if constexpr (!ReadOnly) {
guard = LockGuard(storage_->GetLockManager(), ns_key);
}

BitmapMetadata metadata;
std::string raw_value;
// TODO(mwish): maintain snapshot for read-only bitfield.
PokIsemaine marked this conversation as resolved.
Show resolved Hide resolved

auto s = GetMetadata(ctx, ns_key, &metadata, &raw_value);
if (!s.ok() && !s.IsNotFound()) {
return s;
Expand Down
2 changes: 0 additions & 2 deletions src/types/redis_bloom_chain.cc
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,6 @@ rocksdb::Status BloomChain::Reserve(engine::Context &ctx, const Slice &user_key,
uint16_t expansion) {
std::string ns_key = AppendNamespacePrefix(user_key);

LockGuard guard(storage_->GetLockManager(), ns_key);
BloomChainMetadata bloom_chain_metadata;
rocksdb::Status s = getBloomChainMetadata(ctx, ns_key, &bloom_chain_metadata);
if (!s.ok() && !s.IsNotFound()) return s;
Expand Down Expand Up @@ -156,7 +155,6 @@ rocksdb::Status BloomChain::InsertCommon(engine::Context &ctx, const Slice &user
const BloomFilterInsertOptions &insert_options,
std::vector<BloomFilterAddResult> *rets) {
std::string ns_key = AppendNamespacePrefix(user_key);
LockGuard guard(storage_->GetLockManager(), ns_key);

BloomChainMetadata metadata;
rocksdb::Status s = getBloomChainMetadata(ctx, ns_key, &metadata);
Expand Down
5 changes: 1 addition & 4 deletions src/types/redis_hash.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ rocksdb::Status Hash::IncrBy(engine::Context &ctx, const Slice &user_key, const

std::string ns_key = AppendNamespacePrefix(user_key);

LockGuard guard(storage_->GetLockManager(), ns_key);
HashMetadata metadata;
rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata);
if (!s.ok() && !s.IsNotFound()) return s;
Expand Down Expand Up @@ -117,7 +116,6 @@ rocksdb::Status Hash::IncrByFloat(engine::Context &ctx, const Slice &user_key, c

std::string ns_key = AppendNamespacePrefix(user_key);

LockGuard guard(storage_->GetLockManager(), ns_key);
HashMetadata metadata;
rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata);
if (!s.ok() && !s.IsNotFound()) return s;
Expand Down Expand Up @@ -211,7 +209,7 @@ rocksdb::Status Hash::Delete(engine::Context &ctx, const Slice &user_key, const
WriteBatchLogData log_data(kRedisHash);
auto s = batch->PutLogData(log_data.Encode());
if (!s.ok()) return s;
LockGuard guard(storage_->GetLockManager(), ns_key);

s = GetMetadata(ctx, ns_key, &metadata);
if (!s.ok()) return s.IsNotFound() ? rocksdb::Status::OK() : s;

Expand Down Expand Up @@ -245,7 +243,6 @@ rocksdb::Status Hash::MSet(engine::Context &ctx, const Slice &user_key, const st
*added_cnt = 0;
std::string ns_key = AppendNamespacePrefix(user_key);

LockGuard guard(storage_->GetLockManager(), ns_key);
HashMetadata metadata;
rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata);
if (!s.ok() && !s.IsNotFound()) return s;
Expand Down
2 changes: 0 additions & 2 deletions src/types/redis_hyperloglog.cc
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,6 @@ rocksdb::Status HyperLogLog::Add(engine::Context &ctx, const Slice &user_key,
*ret = 0;
std::string ns_key = AppendNamespacePrefix(user_key);

LockGuard guard(storage_->GetLockManager(), ns_key);
HyperLogLogMetadata metadata{};
rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata);
if (!s.ok() && !s.IsNotFound()) {
Expand Down Expand Up @@ -238,7 +237,6 @@ rocksdb::Status HyperLogLog::Merge(engine::Context &ctx, const Slice &dest_user_
}

std::string dest_key = AppendNamespacePrefix(dest_user_key);
LockGuard guard(storage_->GetLockManager(), dest_key);
std::vector<std::string> registers;
HyperLogLogMetadata metadata;

Expand Down
21 changes: 1 addition & 20 deletions src/types/redis_json.cc
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,6 @@ rocksdb::Status Json::Set(engine::Context &ctx, const std::string &user_key, con
const std::string &value) {
auto ns_key = AppendNamespacePrefix(user_key);

LockGuard guard(storage_->GetLockManager(), ns_key);

JsonMetadata metadata;
JsonValue origin;
auto s = read(ctx, ns_key, &metadata, &origin);
Expand Down Expand Up @@ -190,8 +188,6 @@ rocksdb::Status Json::ArrAppend(engine::Context &ctx, const std::string &user_ke
append_values.emplace_back(std::move(value.value));
}

LockGuard guard(storage_->GetLockManager(), ns_key);

JsonMetadata metadata;
JsonValue value;
auto s = read(ctx, ns_key, &metadata, &value);
Expand Down Expand Up @@ -248,8 +244,6 @@ rocksdb::Status Json::Merge(engine::Context &ctx, const std::string &user_key, c
const std::string &merge_value, bool &result) {
auto ns_key = AppendNamespacePrefix(user_key);

LockGuard guard(storage_->GetLockManager(), ns_key);

JsonMetadata metadata;
JsonValue json_val;

Expand Down Expand Up @@ -279,8 +273,6 @@ rocksdb::Status Json::Clear(engine::Context &ctx, const std::string &user_key, c
size_t *result) {
auto ns_key = AppendNamespacePrefix(user_key);

LockGuard guard(storage_->GetLockManager(), ns_key);

JsonValue json_val;
JsonMetadata metadata;
auto s = read(ctx, ns_key, &metadata, &json_val);
Expand Down Expand Up @@ -327,8 +319,6 @@ rocksdb::Status Json::ArrInsert(engine::Context &ctx, const std::string &user_ke
insert_values.emplace_back(std::move(value.value));
}

LockGuard guard(storage_->GetLockManager(), ns_key);

JsonMetadata metadata;
JsonValue value;
auto s = read(ctx, ns_key, &metadata, &value);
Expand All @@ -349,8 +339,6 @@ rocksdb::Status Json::Toggle(engine::Context &ctx, const std::string &user_key,
Optionals<bool> *results) {
auto ns_key = AppendNamespacePrefix(user_key);

LockGuard guard(storage_->GetLockManager(), ns_key);

JsonMetadata metadata;
JsonValue origin;
auto s = read(ctx, ns_key, &metadata, &origin);
Expand All @@ -367,8 +355,6 @@ rocksdb::Status Json::ArrPop(engine::Context &ctx, const std::string &user_key,
std::vector<std::optional<JsonValue>> *results) {
auto ns_key = AppendNamespacePrefix(user_key);

LockGuard guard(storage_->GetLockManager(), ns_key);

JsonMetadata metadata;
JsonValue json_val;
auto s = read(ctx, ns_key, &metadata, &json_val);
Expand Down Expand Up @@ -403,8 +389,6 @@ rocksdb::Status Json::ArrTrim(engine::Context &ctx, const std::string &user_key,
int64_t stop, Optionals<uint64_t> *results) {
auto ns_key = AppendNamespacePrefix(user_key);

LockGuard guard(storage_->GetLockManager(), ns_key);

JsonMetadata metadata;
JsonValue json_val;
auto s = read(ctx, ns_key, &metadata, &json_val);
Expand All @@ -424,7 +408,7 @@ rocksdb::Status Json::Del(engine::Context &ctx, const std::string &user_key, con
*result = 0;

auto ns_key = AppendNamespacePrefix(user_key);
LockGuard guard(storage_->GetLockManager(), ns_key);

JsonValue json_val;
JsonMetadata metadata;
auto s = read(ctx, ns_key, &metadata, &json_val);
Expand Down Expand Up @@ -473,8 +457,6 @@ rocksdb::Status Json::numop(engine::Context &ctx, JsonValue::NumOpEnum op, const
auto s = read(ctx, ns_key, &metadata, &json_val);
if (!s.ok()) return s;

LockGuard guard(storage_->GetLockManager(), ns_key);

auto res = json_val.NumOp(path, number, op, result);
if (!res) {
return rocksdb::Status::InvalidArgument(res.Msg());
Expand Down Expand Up @@ -571,7 +553,6 @@ rocksdb::Status Json::MSet(engine::Context &ctx, const std::vector<std::string>
std::string ns_key = AppendNamespacePrefix(user_key);
ns_keys.emplace_back(std::move(ns_key));
}
MultiLockGuard guard(storage_->GetLockManager(), ns_keys);

auto batch = storage_->GetWriteBatchBase();
WriteBatchLogData log_data(kRedisJson);
Expand Down
10 changes: 1 addition & 9 deletions src/types/redis_list.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ rocksdb::Status List::push(engine::Context &ctx, const Slice &user_key, const st
WriteBatchLogData log_data(kRedisList, {std::to_string(cmd)});
auto s = batch->PutLogData(log_data.Encode());
if (!s.ok()) return s;
LockGuard guard(storage_->GetLockManager(), ns_key);

s = GetMetadata(ctx, ns_key, &metadata);
if (!s.ok() && !(create_if_missing && s.IsNotFound())) {
return s.IsNotFound() ? rocksdb::Status::OK() : s;
Expand Down Expand Up @@ -108,7 +108,6 @@ rocksdb::Status List::PopMulti(engine::Context &ctx, const rocksdb::Slice &user_

std::string ns_key = AppendNamespacePrefix(user_key);

LockGuard guard(storage_->GetLockManager(), ns_key);
ListMetadata metadata(false);
rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata);
if (!s.ok()) return s;
Expand Down Expand Up @@ -177,7 +176,6 @@ rocksdb::Status List::Rem(engine::Context &ctx, const Slice &user_key, int count

std::string ns_key = AppendNamespacePrefix(user_key);

LockGuard guard(storage_->GetLockManager(), ns_key);
ListMetadata metadata(false);
rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata);
if (!s.ok()) return s;
Expand Down Expand Up @@ -271,7 +269,6 @@ rocksdb::Status List::Insert(engine::Context &ctx, const Slice &user_key, const
*new_size = 0;
std::string ns_key = AppendNamespacePrefix(user_key);

LockGuard guard(storage_->GetLockManager(), ns_key);
ListMetadata metadata(false);
rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata);
if (!s.ok()) return s;
Expand Down Expand Up @@ -462,7 +459,6 @@ rocksdb::Status List::Pos(engine::Context &ctx, const Slice &user_key, const Sli
rocksdb::Status List::Set(engine::Context &ctx, const Slice &user_key, int index, Slice elem) {
std::string ns_key = AppendNamespacePrefix(user_key);

LockGuard guard(storage_->GetLockManager(), ns_key);
ListMetadata metadata(false);
rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata);
if (!s.ok()) return s;
Expand Down Expand Up @@ -501,7 +497,6 @@ rocksdb::Status List::lmoveOnSingleList(engine::Context &ctx, const rocksdb::Sli
std::string *elem) {
std::string ns_key = AppendNamespacePrefix(src);

LockGuard guard(storage_->GetLockManager(), ns_key);
ListMetadata metadata(false);
rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata);
if (!s.ok()) {
Expand Down Expand Up @@ -567,8 +562,6 @@ rocksdb::Status List::lmoveOnTwoLists(engine::Context &ctx, const rocksdb::Slice
std::string src_ns_key = AppendNamespacePrefix(src);
std::string dst_ns_key = AppendNamespacePrefix(dst);

std::vector<std::string> lock_keys{src_ns_key, dst_ns_key};
MultiLockGuard guard(storage_->GetLockManager(), lock_keys);
ListMetadata src_metadata(false);
auto s = GetMetadata(ctx, src_ns_key, &src_metadata);
if (!s.ok()) {
Expand Down Expand Up @@ -636,7 +629,6 @@ rocksdb::Status List::Trim(engine::Context &ctx, const Slice &user_key, int star
uint32_t trim_cnt = 0;
std::string ns_key = AppendNamespacePrefix(user_key);

LockGuard guard(storage_->GetLockManager(), ns_key);
ListMetadata metadata(false);
rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata);
if (!s.ok()) return s.IsNotFound() ? rocksdb::Status::OK() : s;
Expand Down
Loading
Loading