Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(make_idempotent): support making incr request idempotent in pegasus_write_service::impl #2185

Merged
merged 11 commits into from
Feb 5, 2025
15 changes: 15 additions & 0 deletions idl/rrdb.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,26 @@ enum mutate_operation
MO_DELETE
}

enum update_type
{
UT_PUT,
UT_INCR
}

// The single-put request, just writes a key/value pair into storage, which is certainly
// idempotent.
struct update_request
{
1:dsn.blob key;
2:dsn.blob value;
3:i32 expire_ts_seconds;

// This field marks the type of a single-put request, mainly used to differentiate a general
// single-put request from the one translated from a non-idempotent atomic write request:
// - a general single-put request, if `type` is UT_PUT or not set by default as it's
// optional, or
// - a put request translated from a non-idempotent incr request, if `type` is UT_INCR.
4:optional update_type type;
}

struct update_response
Expand Down
2 changes: 1 addition & 1 deletion src/server/pegasus_server_write.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ class pegasus_server_write : public dsn::replication::replica_base

friend class pegasus_server_write_test;
friend class pegasus_write_service_test;
friend class pegasus_write_service_impl_test;
friend class PegasusWriteServiceImplTest;
friend class rocksdb_wrapper_test;

std::unique_ptr<pegasus_write_service> _write_svc;
Expand Down
3 changes: 1 addition & 2 deletions src/server/pegasus_write_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -189,9 +189,8 @@ class pegasus_write_service : dsn::replication::replica_base
private:
void clear_up_batch_states();

private:
friend class pegasus_write_service_test;
friend class pegasus_write_service_impl_test;
friend class PegasusWriteServiceImplTest;
friend class pegasus_server_write_test;
friend class rocksdb_wrapper_test;

Expand Down
178 changes: 173 additions & 5 deletions src/server/pegasus_write_service_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,99 @@ class pegasus_write_service::impl : public dsn::replication::replica_base
return resp.error;
}

// Tranlate an incr request which is certainly non-idempotent into a single-put request
// which is certainly idempotent. Return current status for RocksDB.
int make_idempotent(const dsn::apps::incr_request &req,
dsn::apps::incr_response &err_resp,
dsn::apps::update_request &update)
{
// Get current raw value for the provided key from the RocksDB instance.
db_get_context get_ctx;
const int err = _rocksdb_wrapper->get(req.key.to_string_view(), &get_ctx);
if (dsn_unlikely(err != rocksdb::Status::kOk)) {
return make_error_response(err, err_resp);
}

if (!get_ctx.found || get_ctx.expired) {
// Once the provided key is not found or has been expired, we could assume that
// its value is 0 before incr; thus the final result for incr could be set as
// the value of the single-put request, i.e. req.increment.
return make_idempotent_request_for_incr(
req.key, req.increment, calc_expire_on_non_existent(req), update);
}

// Extract user data from raw value as base for increment.
dsn::blob base_value;
pegasus_extract_user_data(_pegasus_data_version, std::move(get_ctx.raw_value), base_value);

int64_t new_int = 0;
if (base_value.empty()) {
// Old value is also considered as 0 before incr as above once it's empty, thus
// set req.increment as the value for single put.
new_int = req.increment;
} else {
int64_t base_int = 0;
if (dsn_unlikely(!dsn::buf2int64(base_value.to_string_view(), base_int))) {
// Old value is not valid int64.
LOG_ERROR_PREFIX("incr failed: error = base value \"{}\" "
"is not an integer or out of range",
utils::c_escape_sensitive_string(base_value));
return make_error_response(rocksdb::Status::kInvalidArgument, err_resp);
}

new_int = base_int + req.increment;
if (dsn_unlikely((req.increment > 0 && new_int < base_int) ||
(req.increment < 0 && new_int > base_int))) {
// New value overflows, just respond with the base value.
LOG_ERROR_PREFIX("incr failed: error = new value is out of range, "
"base_value = {}, increment = {}, new_value = {}",
base_int,
req.increment,
new_int);
return make_error_response(rocksdb::Status::kInvalidArgument, base_int, err_resp);
}
}

return make_idempotent_request_for_incr(
req.key, new_int, calc_expire_on_existing(req, get_ctx), update);
}

// Apply single-put request translated from incr request into RocksDB, and build response
// for incr. Return current status for RocksDB.
int put(const db_write_context &ctx,
const dsn::apps::update_request &update,
dsn::apps::incr_response &resp)
{
resp.app_id = get_gpid().get_app_id();
resp.partition_index = get_gpid().get_partition_index();
resp.decree = ctx.decree;
resp.server = _primary_host_port;

auto cleanup = dsn::defer([this]() { _rocksdb_wrapper->clear_up_write_batch(); });

resp.error =
_rocksdb_wrapper->write_batch_put_ctx(ctx,
update.key.to_string_view(),
update.value.to_string_view(),
static_cast<uint32_t>(update.expire_ts_seconds));
if (dsn_unlikely(resp.error != rocksdb::Status::kOk)) {
return resp.error;
}

resp.error = _rocksdb_wrapper->write(ctx.decree);
if (dsn_unlikely(resp.error != rocksdb::Status::kOk)) {
return resp.error;
}

// Shouldn't fail to parse since the value must be a valid int64.
CHECK(dsn::buf2int64(update.value.to_string_view(), resp.new_value),
"invalid int64 value for put idempotent incr: key={}, value={}",
update.key,
update.value);

return resp.error;
}

int incr(int64_t decree, const dsn::apps::incr_request &update, dsn::apps::incr_response &resp)
{
resp.app_id = get_gpid().get_app_id();
Expand Down Expand Up @@ -242,14 +335,15 @@ class pegasus_write_service::impl : public dsn::replication::replica_base
auto cleanup = dsn::defer([this]() { _rocksdb_wrapper->clear_up_write_batch(); });
resp.error = _rocksdb_wrapper->write_batch_put(
decree, update.key.to_string_view(), std::to_string(new_value), new_expire_ts);
if (resp.error) {
if (resp.error != rocksdb::Status::kOk) {
return resp.error;
}

resp.error = _rocksdb_wrapper->write(decree);
if (resp.error == rocksdb::Status::kOk) {
resp.new_value = new_value;
}

return resp.error;
}

Expand Down Expand Up @@ -569,6 +663,83 @@ class pegasus_write_service::impl : public dsn::replication::replica_base
return raw_key;
}

// Calculate expire timestamp in seconds for the keys not contained in the storage
// according to `req`.
template <typename TRequest>
static inline int32_t calc_expire_on_non_existent(const TRequest &req)
{
return req.expire_ts_seconds > 0 ? req.expire_ts_seconds : 0;
}

// Calculate new expire timestamp in seconds for the keys contained in the storage
// according to `req` and their current expire timestamp in `get_ctx`.
template <typename TRequest>
static inline int32_t calc_expire_on_existing(const TRequest &req,
const db_get_context &get_ctx)
{
if (req.expire_ts_seconds == 0) {
// Still use current expire timestamp of the existing key as the new value.
return static_cast<int32_t>(get_ctx.expire_ts);
}

if (req.expire_ts_seconds < 0) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When will expire_ts_seconds to be less than 0?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, -1 means clearing TTL as is described by Java client if ttlSeconds == -1, then update to no ttl if increment succeed. (see https://github.com/apache/incubator-pegasus/blob/master/java-client/src/main/java/org/apache/pegasus/client/PegasusClientInterface.java#L561); and Java client would throw exception once it is less than -1 (see https://github.com/apache/incubator-pegasus/blob/master/java-client/src/main/java/org/apache/pegasus/client/PegasusTable.java#L733).
On server-side, currently non-idempotent incr would also clear TTL once it is less than 0 (see https://github.com/apache/incubator-pegasus/blob/master/src/server/pegasus_write_service_impl.h#L235).

// Reset expire timestamp to 0.
return 0;
}

return req.expire_ts_seconds;
}

// Build a single-put request by provided int64 value.
static inline void make_idempotent_request(const dsn::blob &key,
int64_t value,
int32_t expire_ts_seconds,
dsn::apps::update_type::type type,
dsn::apps::update_request &update)
{
update.key = key;
update.value = dsn::blob::create_from_numeric(value);
update.expire_ts_seconds = expire_ts_seconds;
update.__set_type(type);
}

// Build corresponding single-put request for an incr request, and return current status
// for RocksDB, i.e. kOk.
static inline int make_idempotent_request_for_incr(const dsn::blob &key,
int64_t value,
int32_t expire_ts_seconds,
dsn::apps::update_request &update)
{
make_idempotent_request(
key, value, expire_ts_seconds, dsn::apps::update_type::UT_INCR, update);
return rocksdb::Status::kOk;
}

// Build incr response only for error, and return the current error status for RocksDB.
inline int make_error_response(int err, dsn::apps::incr_response &resp)
{
CHECK(err != rocksdb::Status::kOk, "this incr response is built only for error");
resp.error = err;

const auto pid = get_gpid();
resp.app_id = pid.get_app_id();
resp.partition_index = pid.get_partition_index();

// Currently the mutation has not been assigned with valid decree, thus set to -1.
resp.decree = -1;

resp.server = _primary_host_port;

return err;
}

// Build incr response as above, except that also set new value for response.
inline int make_error_response(int err, int64_t new_value, dsn::apps::incr_response &resp)
{
resp.new_value = new_value;
return make_error_response(err, resp);
}

// return true if the check type is supported
static bool is_check_type_supported(::dsn::apps::cas_check_type::type check_type)
{
Expand Down Expand Up @@ -678,13 +849,10 @@ class pegasus_write_service::impl : public dsn::replication::replica_base
return false;
}

private:
friend class pegasus_write_service_test;
friend class pegasus_server_write_test;
friend class pegasus_write_service_impl_test;
friend class PegasusWriteServiceImplTest;
friend class rocksdb_wrapper_test;
FRIEND_TEST(pegasus_write_service_impl_test, put_verify_timetag);
FRIEND_TEST(pegasus_write_service_impl_test, verify_timetag_compatible_with_version_0);

const std::string _primary_host_port;
const uint32_t _pegasus_data_version;
Expand Down
13 changes: 9 additions & 4 deletions src/server/rocksdb_wrapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,20 +79,25 @@ int rocksdb_wrapper::get(std::string_view raw_key, /*out*/ db_get_context *ctx)
{
FAIL_POINT_INJECT_F("db_get", [](std::string_view) -> int { return FAIL_DB_GET; });

rocksdb::Status s =
const rocksdb::Status s =
_db->Get(_rd_opts, _data_cf, utils::to_rocksdb_slice(raw_key), &ctx->raw_value);
if (dsn_likely(s.ok())) {
// success
// The key is found and its value is read successfully.
ctx->found = true;
ctx->expire_ts = pegasus_extract_expire_ts(_pegasus_data_version, ctx->raw_value);
if (check_if_ts_expired(utils::epoch_now(), ctx->expire_ts)) {
ctx->expired = true;
METRIC_VAR_INCREMENT(read_expired_values);
} else {
ctx->expired = false;
}
return rocksdb::Status::kOk;
} else if (s.IsNotFound()) {
// NotFound is an acceptable error
}

if (s.IsNotFound()) {
// NotFound is considered normal since the key may not be present in DB now.
ctx->found = false;
ctx->expired = false;
return rocksdb::Status::kOk;
}

Expand Down
Loading
Loading