Skip to content

Commit

Permalink
seal shard in precommit
Browse files Browse the repository at this point in the history
  • Loading branch information
JacksonYao287 committed Mar 22, 2024
1 parent 34f7a1a commit 67c74bb
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 14 deletions.
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class HomeObjectConan(ConanFile):
name = "homeobject"
version = "1.0.11"
version = "1.0.12"
homepage = "https://github.com/eBay/HomeObject"
description = "Blob Store built on HomeReplication"
topics = ("ebay")
Expand Down
12 changes: 5 additions & 7 deletions src/lib/homestore_backend/hs_homeobject.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,11 @@ class HSHomeObject : public HomeObjectImpl {
void on_shard_message_commit(int64_t lsn, sisl::blob const& header, homestore::MultiBlkId const& blkids,
shared< homestore::ReplDev > repl_dev, cintrusive< homestore::repl_req_ctx >& hs_ctx);

bool on_shard_message_pre_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key,
cintrusive< homestore::repl_req_ctx >&);
void on_shard_message_rollback(int64_t lsn, sisl::blob const& header, sisl::blob const& key,
cintrusive< homestore::repl_req_ctx >&);

/**
* @brief Retrieves the chunk number associated with the given shard ID.
*
Expand All @@ -317,13 +322,6 @@ class HSHomeObject : public HomeObjectImpl {

cshared< HeapChunkSelector > chunk_selector() const { return chunk_selector_; }

bool on_pre_commit_shard_msg(int64_t lsn, sisl::blob const& header, sisl::blob const& key,
cintrusive< homestore::repl_req_ctx >&);
void on_rollback_shard_msg(int64_t lsn, sisl::blob const& header, sisl::blob const& key,
cintrusive< homestore::repl_req_ctx >&);
void on_shard_message_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key,
cintrusive< homestore::repl_req_ctx >& hs_ctx);

// Blob manager related.
void on_blob_put_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key,
const homestore::MultiBlkId& pbas, cintrusive< homestore::repl_req_ctx >& hs_ctx);
Expand Down
63 changes: 61 additions & 2 deletions src/lib/homestore_backend/hs_shard_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,64 @@ ShardManager::AsyncResult< ShardInfo > HSHomeObject::_seal_shard(ShardInfo const
return req->result();
}

bool HSHomeObject::on_shard_message_pre_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key,
cintrusive< homestore::repl_req_ctx >& ctx) {
const ReplicationMessageHeader* msg_header = r_cast< const ReplicationMessageHeader* >(header.cbytes());
switch (msg_header->msg_type) {
case ReplicationMessageType::SEAL_SHARD_MSG: {
auto sb = r_cast< shard_info_superblk const* >(header.cbytes() + sizeof(ReplicationMessageHeader));
auto const shard_info = sb->info;

{
std::scoped_lock lock_guard(_shard_lock);
auto iter = _shard_map.find(shard_info.id);
RELEASE_ASSERT(iter != _shard_map.end(), "Missing shard info");
auto& state = (*iter->second)->info.state;
// we just change the state to SEALED, since it will be easy for rollback
// the update of superblk will be done in on_shard_message_commit;
if (state == ShardInfo::State::OPEN) {
state = ShardInfo::State::SEALED;
} else {
LOGW("try to seal an unopen shard, shard_id: {}", shard_info.id);
}
}
}
default: {
break;
}
}
return true;
}

void HSHomeObject::on_shard_message_rollback(int64_t lsn, sisl::blob const& header, sisl::blob const& key,
cintrusive< homestore::repl_req_ctx >&) {
const ReplicationMessageHeader* msg_header = r_cast< const ReplicationMessageHeader* >(header.cbytes());
switch (msg_header->msg_type) {
case ReplicationMessageType::SEAL_SHARD_MSG: {
auto sb = r_cast< shard_info_superblk const* >(header.cbytes() + sizeof(ReplicationMessageHeader));
auto const shard_info = sb->info;

{
std::scoped_lock lock_guard(_shard_lock);
auto iter = _shard_map.find(shard_info.id);
RELEASE_ASSERT(iter != _shard_map.end(), "Missing shard info");
auto& state = (*iter->second)->info.state;
// we just change the state to SEALED, since it will be easy for rollback
// the update of superblk will be done in on_shard_message_commit;
if (state == ShardInfo::State::SEALED) {
state = ShardInfo::State::OPEN;
} else {
LOGW("try to rollback seal_shard message , but the shard state is not sealed. shard_id: {}",
shard_info.id);
}
}
}
default: {
break;
}
}
}

void HSHomeObject::on_shard_message_commit(int64_t lsn, sisl::blob const& h, homestore::MultiBlkId const& blkids,
shared< homestore::ReplDev > repl_dev,
cintrusive< homestore::repl_req_ctx >& hs_ctx) {
Expand Down Expand Up @@ -259,12 +317,13 @@ void HSHomeObject::on_shard_message_commit(int64_t lsn, sisl::blob const& h, hom
state = (*iter->second)->info.state;
}

if (state == ShardInfo::State::OPEN) {
if (state == ShardInfo::State::SEALED) {
auto chunk_id = get_shard_chunk(shard_info.id);
RELEASE_ASSERT(chunk_id.has_value(), "Chunk id not found");
chunk_selector()->release_chunk(chunk_id.value());
update_shard_in_map(shard_info);
}
} else
LOGW("try to commit SEAL_SHARD_MSG but shard state is not sealed, shard_id: {}", shard_info.id);
if (ctx) { ctx->promise_.setValue(ShardManager::Result< ShardInfo >(shard_info)); }
break;
}
Expand Down
27 changes: 23 additions & 4 deletions src/lib/homestore_backend/replication_state_machine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,37 @@ void ReplicationStateMachine::on_commit(int64_t lsn, const sisl::blob& header, c
}
}

bool ReplicationStateMachine::on_pre_commit(int64_t lsn, sisl::blob const&, sisl::blob const&,
cintrusive< homestore::repl_req_ctx >&) {
bool ReplicationStateMachine::on_pre_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key,
cintrusive< homestore::repl_req_ctx >& ctx) {
LOGI("on_pre_commit with lsn:{}", lsn);
// For shard creation, since homestore repldev inside will write shard header to data service first before this
// function is called. So there is nothing is needed to do and we can get the binding chunk_id with the newly shard
// from the blkid in on_commit()
const ReplicationMessageHeader* msg_header = r_cast< const ReplicationMessageHeader* >(header.cbytes());
switch (msg_header->msg_type) {
case ReplicationMessageType::SEAL_SHARD_MSG: {
return home_object_->on_shard_message_pre_commit(lsn, header, key, ctx);
}
default: {
break;
}
}
return true;
}

void ReplicationStateMachine::on_rollback(int64_t lsn, sisl::blob const&, sisl::blob const&,
cintrusive< homestore::repl_req_ctx >&) {
void ReplicationStateMachine::on_rollback(int64_t lsn, sisl::blob const& header, sisl::blob const& key,
cintrusive< homestore::repl_req_ctx >& ctx) {
LOGI("on_rollback with lsn:{}", lsn);
const ReplicationMessageHeader* msg_header = r_cast< const ReplicationMessageHeader* >(header.cbytes());
switch (msg_header->msg_type) {
case ReplicationMessageType::SEAL_SHARD_MSG: {
home_object_->on_shard_message_rollback(lsn, header, key, ctx);
break;
}
default: {
break;
}
}
}

void ReplicationStateMachine::on_error(ReplServiceError error, const sisl::blob& header, const sisl::blob& key,
Expand Down

0 comments on commit 67c74bb

Please sign in to comment.