Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SealShard state change in pre-commit #120

Merged
merged 4 commits into from
Apr 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 5 additions & 8 deletions src/lib/homestore_backend/hs_homeobject.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,11 @@ class HSHomeObject : public HomeObjectImpl {
void on_shard_message_commit(int64_t lsn, sisl::blob const& header, homestore::MultiBlkId const& blkids,
shared< homestore::ReplDev > repl_dev, cintrusive< homestore::repl_req_ctx >& hs_ctx);

bool on_shard_message_pre_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key,
cintrusive< homestore::repl_req_ctx >&);
void on_shard_message_rollback(int64_t lsn, sisl::blob const& header, sisl::blob const& key,
cintrusive< homestore::repl_req_ctx >&);

/**
* @brief Retrieves the chunk number associated with the given shard ID.
*
Expand All @@ -369,14 +374,6 @@ class HSHomeObject : public HomeObjectImpl {

cshared< HeapChunkSelector > chunk_selector() const { return chunk_selector_; }

//////////// Called by internal classes. These are not Public APIs ///////////////////
bool on_pre_commit_shard_msg(int64_t lsn, sisl::blob const& header, sisl::blob const& key,
cintrusive< homestore::repl_req_ctx >&);
void on_rollback_shard_msg(int64_t lsn, sisl::blob const& header, sisl::blob const& key,
cintrusive< homestore::repl_req_ctx >&);
void on_shard_message_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key,
cintrusive< homestore::repl_req_ctx >& hs_ctx);

// Blob manager related.
void on_blob_put_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key,
const homestore::MultiBlkId& pbas, cintrusive< homestore::repl_req_ctx >& hs_ctx);
Expand Down
89 changes: 87 additions & 2 deletions src/lib/homestore_backend/hs_shard_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,90 @@ ShardManager::AsyncResult< ShardInfo > HSHomeObject::_seal_shard(ShardInfo const
return req->result();
}

// move seal_shard to pre_commit can not fundamentally solve the conflict between seal_shard and put_blob, since
// put_blob handler will only check the shard state at the very beginning and will not check again before proposing to
// raft, so we need a callback to check whether we can handle this request before appending log, which is previous to
// pre_commit.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

prior to the pre_commit of put_blob

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for pointing out this misusing , will be careful!


// FIXME after we have the callback, which is coming in homestore.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

an issue in HS and HO respectively, pls

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure


bool HSHomeObject::on_shard_message_pre_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key,
cintrusive< homestore::repl_req_ctx >& hs_ctx) {
repl_result_ctx< ShardManager::Result< ShardInfo > >* ctx{nullptr};
if (hs_ctx && hs_ctx->is_proposer) {
ctx = boost::static_pointer_cast< repl_result_ctx< ShardManager::Result< ShardInfo > > >(hs_ctx).get();
}
const ReplicationMessageHeader* msg_header = r_cast< const ReplicationMessageHeader* >(header.cbytes());
if (msg_header->corrupted()) {
LOGW("replication message header is corrupted with crc error, lsn:{}", lsn);
if (ctx) { ctx->promise_.setValue(folly::makeUnexpected(ShardError::CRC_MISMATCH)); }
return false;
}
switch (msg_header->msg_type) {
case ReplicationMessageType::SEAL_SHARD_MSG: {
auto sb = r_cast< shard_info_superblk const* >(header.cbytes() + sizeof(ReplicationMessageHeader));
auto const shard_info = sb->info;

{
std::scoped_lock lock_guard(_shard_lock);
auto iter = _shard_map.find(shard_info.id);
RELEASE_ASSERT(iter != _shard_map.end(), "Missing shard info");
auto& state = (*iter->second)->info.state;
// we just change the state to SEALED, so that it will fail the later coming put_blob on this shard and will
// be easy for rollback.
// the update of superblk will be done in on_shard_message_commit;
if (state == ShardInfo::State::OPEN) {
state = ShardInfo::State::SEALED;
} else {
LOGW("try to seal an unopen shard, shard_id: {}", shard_info.id);
}
}
}
default: {
break;
}
}
return true;
}

void HSHomeObject::on_shard_message_rollback(int64_t lsn, sisl::blob const& header, sisl::blob const& key,
cintrusive< homestore::repl_req_ctx >& hs_ctx) {
repl_result_ctx< ShardManager::Result< ShardInfo > >* ctx{nullptr};
if (hs_ctx && hs_ctx->is_proposer) {
ctx = boost::static_pointer_cast< repl_result_ctx< ShardManager::Result< ShardInfo > > >(hs_ctx).get();
}
const ReplicationMessageHeader* msg_header = r_cast< const ReplicationMessageHeader* >(header.cbytes());
if (msg_header->corrupted()) {
LOGW("replication message header is corrupted with crc error, lsn:{}", lsn);
if (ctx) { ctx->promise_.setValue(folly::makeUnexpected(ShardError::CRC_MISMATCH)); }
return;
}

switch (msg_header->msg_type) {
case ReplicationMessageType::SEAL_SHARD_MSG: {
auto sb = r_cast< shard_info_superblk const* >(header.cbytes() + sizeof(ReplicationMessageHeader));
auto const shard_info = sb->info;
{
std::scoped_lock lock_guard(_shard_lock);
auto iter = _shard_map.find(shard_info.id);
RELEASE_ASSERT(iter != _shard_map.end(), "Missing shard info");
auto& state = (*iter->second)->info.state;
// we just change the state to SEALED, since it will be easy for rollback
// the update of superblk will be done in on_shard_message_commit;
if (state == ShardInfo::State::SEALED) {
state = ShardInfo::State::OPEN;
} else {
LOGW("try to rollback seal_shard message , but the shard state is not sealed. shard_id: {}",
shard_info.id);
}
}
}
default: {
break;
}
}
}

void HSHomeObject::on_shard_message_commit(int64_t lsn, sisl::blob const& h, homestore::MultiBlkId const& blkids,
shared< homestore::ReplDev > repl_dev,
cintrusive< homestore::repl_req_ctx >& hs_ctx) {
Expand Down Expand Up @@ -259,12 +343,13 @@ void HSHomeObject::on_shard_message_commit(int64_t lsn, sisl::blob const& h, hom
state = (*iter->second)->info.state;
}

if (state == ShardInfo::State::OPEN) {
if (state == ShardInfo::State::SEALED) {
auto chunk_id = get_shard_chunk(shard_info.id);
RELEASE_ASSERT(chunk_id.has_value(), "Chunk id not found");
chunk_selector()->release_chunk(chunk_id.value());
update_shard_in_map(shard_info);
}
} else
LOGW("try to commit SEAL_SHARD_MSG but shard state is not sealed, shard_id: {}", shard_info.id);
if (ctx) { ctx->promise_.setValue(ShardManager::Result< ShardInfo >(shard_info)); }
break;
}
Expand Down
35 changes: 31 additions & 4 deletions src/lib/homestore_backend/replication_state_machine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,45 @@ void ReplicationStateMachine::on_commit(int64_t lsn, const sisl::blob& header, c
}
}

bool ReplicationStateMachine::on_pre_commit(int64_t lsn, sisl::blob const&, sisl::blob const&,
cintrusive< homestore::repl_req_ctx >&) {
bool ReplicationStateMachine::on_pre_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key,
cintrusive< homestore::repl_req_ctx >& ctx) {
LOGI("on_pre_commit with lsn:{}", lsn);
// For shard creation, since homestore repldev inside will write shard header to data service first before this
// function is called. So there is nothing is needed to do and we can get the binding chunk_id with the newly shard
// from the blkid in on_commit()
const ReplicationMessageHeader* msg_header = r_cast< const ReplicationMessageHeader* >(header.cbytes());
if (msg_header->corrupted()) {
LOGE("corrupted message in pre_commit, lsn:{}", lsn);
return false;
}
switch (msg_header->msg_type) {
case ReplicationMessageType::SEAL_SHARD_MSG: {
return home_object_->on_shard_message_pre_commit(lsn, header, key, ctx);
}
default: {
break;
}
}
return true;
}

void ReplicationStateMachine::on_rollback(int64_t lsn, sisl::blob const&, sisl::blob const&,
cintrusive< homestore::repl_req_ctx >&) {
void ReplicationStateMachine::on_rollback(int64_t lsn, sisl::blob const& header, sisl::blob const& key,
cintrusive< homestore::repl_req_ctx >& ctx) {
LOGI("on_rollback with lsn:{}", lsn);
const ReplicationMessageHeader* msg_header = r_cast< const ReplicationMessageHeader* >(header.cbytes());
if (msg_header->corrupted()) {
LOGE("corrupted message in rollback, lsn:{}", lsn);
return;
}
switch (msg_header->msg_type) {
case ReplicationMessageType::SEAL_SHARD_MSG: {
home_object_->on_shard_message_rollback(lsn, header, key, ctx);
break;
}
default: {
break;
}
}
}

void ReplicationStateMachine::on_error(ReplServiceError error, const sisl::blob& header, const sisl::blob& key,
Expand Down
Loading