Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhance MockS3Client to support real client delegation and configurable failures #2158

Merged
merged 8 commits into from
Feb 7, 2025
2 changes: 2 additions & 0 deletions cpp/arcticdb/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,8 @@ set(arcticdb_srcs
storage/mock/s3_mock_client.cpp
storage/s3/s3_storage.cpp
storage/s3/s3_storage_tool.cpp
storage/s3/s3_client_wrapper.cpp
storage/s3/s3_client_wrapper.hpp
storage/storage_factory.cpp
storage/storage_utils.cpp
stream/aggregator.cpp
Expand Down
28 changes: 19 additions & 9 deletions cpp/arcticdb/storage/python_bindings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,23 +105,30 @@ void register_bindings(py::module& storage, py::exception<arcticdb::ArcticExcept

enum class S3SettingsPickleOrder : uint32_t {
AWS_AUTH = 0,
AWS_PROFILE = 1
AWS_PROFILE = 1,
USE_INTERNAL_CLIENT_WRAPPER_FOR_TESTING = 2
};

py::class_<s3::S3Settings>(storage, "S3Settings")
.def(py::init<s3::AWSAuthMethod, const std::string&>())
.def(py::init<s3::AWSAuthMethod, const std::string&, bool>())
.def(py::pickle(
[](const s3::S3Settings &settings) {
return py::make_tuple(settings.aws_auth(), settings.aws_profile());
return py::make_tuple(settings.aws_auth(), settings.aws_profile(), settings.use_internal_client_wrapper_for_testing());
},
[](py::tuple t) {
util::check(t.size() == 2, "Invalid S3Settings pickle objects");
s3::S3Settings settings(t[static_cast<uint32_t>(S3SettingsPickleOrder::AWS_AUTH)].cast<s3::AWSAuthMethod>(), t[static_cast<uint32_t>(S3SettingsPickleOrder::AWS_PROFILE)].cast<std::string>());
util::check(t.size() == 3, "Invalid S3Settings pickle objects");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I don't have much context on these S3Settings so this might be a dumb question. Should the pickling and unpickling be backwards compatible? Where are these pickled objects stored and will we ever try to unpickle the old version of the S3Settings?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is safe as pickled S3Settings is not persisted. The support of pickling should be for forking (more specifically, Spark) only

s3::S3Settings settings(t[static_cast<uint32_t>(S3SettingsPickleOrder::AWS_AUTH)].cast<s3::AWSAuthMethod>(),
t[static_cast<uint32_t>(S3SettingsPickleOrder::AWS_PROFILE)].cast<std::string>(),
t[static_cast<uint32_t>(S3SettingsPickleOrder::USE_INTERNAL_CLIENT_WRAPPER_FOR_TESTING)].cast<bool>()
);
return settings;
}
))
.def_property_readonly("aws_profile", [](const s3::S3Settings &settings) { return settings.aws_profile(); })
.def_property_readonly("aws_auth", [](const s3::S3Settings &settings) { return settings.aws_auth(); });
.def_property_readonly("aws_auth", [](const s3::S3Settings &settings) { return settings.aws_auth(); })
.def_property_readonly("use_internal_client_wrapper_for_testing", [](const s3::S3Settings &settings) {
return settings.use_internal_client_wrapper_for_testing();
});

py::class_<NativeVariantStorage>(storage, "NativeVariantStorage")
.def(py::init<>())
Expand All @@ -130,7 +137,7 @@ void register_bindings(py::module& storage, py::exception<arcticdb::ArcticExcept
[](const NativeVariantStorage &settings) {
return util::variant_match(settings.variant(),
[] (const s3::S3Settings& settings) {
return py::make_tuple(settings.aws_auth(), settings.aws_profile());
return py::make_tuple(settings.aws_auth(), settings.aws_profile(), settings.use_internal_client_wrapper_for_testing());
},
[](const auto &) {
util::raise_rte("Invalid native storage setting type");
Expand All @@ -139,8 +146,11 @@ void register_bindings(py::module& storage, py::exception<arcticdb::ArcticExcept
);
},
[](py::tuple t) {
util::check(t.size() == 2, "Invalid S3Settings pickle objects");
s3::S3Settings settings(t[static_cast<uint32_t>(S3SettingsPickleOrder::AWS_AUTH)].cast<s3::AWSAuthMethod>(), t[static_cast<uint32_t>(S3SettingsPickleOrder::AWS_PROFILE)].cast<std::string>());
util::check(t.size() == 3, "Invalid S3Settings pickle objects");
s3::S3Settings settings(t[static_cast<uint32_t>(S3SettingsPickleOrder::AWS_AUTH)].cast<s3::AWSAuthMethod>(),
t[static_cast<uint32_t>(S3SettingsPickleOrder::AWS_PROFILE)].cast<std::string>(),
t[static_cast<uint32_t>(S3SettingsPickleOrder::USE_INTERNAL_CLIENT_WRAPPER_FOR_TESTING)].cast<bool>()
);
return NativeVariantStorage(std::move(settings));
}
))
Expand Down
2 changes: 1 addition & 1 deletion cpp/arcticdb/storage/s3/nfs_backed_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
#include <arcticdb/storage/s3/s3_client_impl.hpp>
#include <arcticdb/storage/s3/s3_client_interface.hpp>
#include <arcticdb/util/simple_string_hash.hpp>

#include <arcticdb/storage/s3/s3_client_wrapper.hpp>
namespace arcticdb::storage::nfs_backed {

std::string add_suffix_char(const std::string& str) {
Expand Down
145 changes: 145 additions & 0 deletions cpp/arcticdb/storage/s3/s3_client_wrapper.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
/* Copyright 2023 Man Group Operations Limited
*
* Use of this software is governed by the Business Source License 1.1 included in the file licenses/BSL.txt.
*
* As of the Change Date specified in that file, in accordance with the Business Source License, use of this software will be governed by the Apache License, version 2.0.
*/

#include <arcticdb/storage/s3/s3_client_interface.hpp>
#include <arcticdb/log/log.hpp>
#include <arcticdb/util/buffer_pool.hpp>
#include <arcticdb/storage/storage_utils.hpp>
#include <arcticdb/storage/s3/s3_client_wrapper.hpp>

#include <aws/s3/S3Errors.h>

namespace arcticdb::storage{

using namespace object_store_utils;

namespace s3 {

std::optional<Aws::S3::S3Error> S3ClientTestWrapper::has_failure_trigger(const std::string& bucket_name) const {
bool static_failures_enabled = ConfigsMap::instance()->get_int("S3ClientTestWrapper.EnableFailures", 0) == 1;
// Check if mock failures are enabled
if (!static_failures_enabled) {
return std::nullopt;
}

// Get target buckets (if not set or "all", affects all buckets)
auto failure_buckets_str = ConfigsMap::instance()->get_string("S3ClientTestWrapper.FailureBucket", "all");

if (failure_buckets_str != "all") {
// Split the comma-separated bucket names and check if current bucket is in the list
std::istringstream bucket_stream(failure_buckets_str);
std::string target_bucket;
bool bucket_found = false;

while (std::getline(bucket_stream, target_bucket, ',')) {
// Trim whitespace
target_bucket.erase(0, target_bucket.find_first_not_of(" \t"));
target_bucket.erase(target_bucket.find_last_not_of(" \t") + 1);

if (target_bucket == bucket_name) {
bucket_found = true;
break;
}
}

if (!bucket_found) {
return std::nullopt;
}
}

// Get error configuration
auto error_code = ConfigsMap::instance()->get_int("S3ClientTestWrapper.ErrorCode", static_cast<int>(Aws::S3::S3Errors::NETWORK_CONNECTION));
auto retryable = ConfigsMap::instance()->get_int("S3ClientTestWrapper.ErrorRetryable", 0) == 1;

auto failure_error_ = Aws::S3::S3Error(Aws::Client::AWSError<Aws::S3::S3Errors>(
static_cast<Aws::S3::S3Errors>(error_code),
"SimulatedFailure",
"Simulated failure from environment variables",
retryable
));


return failure_error_;
}

S3Result<std::monostate> S3ClientTestWrapper::head_object(
const std::string& s3_object_name,
const std::string &bucket_name) const {
auto maybe_error = has_failure_trigger(bucket_name);
if (maybe_error.has_value()) {
return {*maybe_error};
}


return actual_client_->head_object(s3_object_name, bucket_name);
}

S3Result<Segment> S3ClientTestWrapper::get_object(
const std::string &s3_object_name,
const std::string &bucket_name) const {
auto maybe_error = has_failure_trigger(bucket_name);
if (maybe_error.has_value()) {
return {*maybe_error};
}

return actual_client_->get_object(s3_object_name, bucket_name);
}

folly::Future<S3Result<Segment>> S3ClientTestWrapper::get_object_async(
const std::string &s3_object_name,
const std::string &bucket_name) const {
auto maybe_error = has_failure_trigger(bucket_name);
if (maybe_error.has_value()) {
return folly::makeFuture<S3Result<Segment>>({*maybe_error});
}

return actual_client_->get_object_async(s3_object_name, bucket_name);
}

S3Result<std::monostate> S3ClientTestWrapper::put_object(
const std::string &s3_object_name,
Segment &segment,
const std::string &bucket_name,
PutHeader header) {
auto maybe_error = has_failure_trigger(bucket_name);
if (maybe_error.has_value()) {
return {*maybe_error};
}

return actual_client_->put_object(s3_object_name, segment, bucket_name, header);
}

S3Result<DeleteOutput> S3ClientTestWrapper::delete_objects(
const std::vector<std::string>& s3_object_names,
const std::string& bucket_name) {
auto maybe_error = has_failure_trigger(bucket_name);
if (maybe_error.has_value()) {
return {*maybe_error};
}


return actual_client_->delete_objects(s3_object_names, bucket_name);
}

// Using a fixed page size since it's only being used for simple tests.
// If we ever need to configure it we should move it to the s3 proto config instead.
constexpr auto page_size = 10;
S3Result<ListObjectsOutput> S3ClientTestWrapper::list_objects(
const std::string& name_prefix,
const std::string& bucket_name,
const std::optional<std::string>& continuation_token) const {
auto maybe_error = has_failure_trigger(bucket_name);
if (maybe_error.has_value()) {
return {*maybe_error};
}

return actual_client_->list_objects(name_prefix, bucket_name, continuation_token);
}

}

}
73 changes: 73 additions & 0 deletions cpp/arcticdb/storage/s3/s3_client_wrapper.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/* Copyright 2023 Man Group Operations Limited
*
* Use of this software is governed by the Business Source License 1.1 included in the file licenses/BSL.txt.
*
* As of the Change Date specified in that file, in accordance with the Business Source License, use of this software will be governed by the Apache License, version 2.0.
*/

#pragma once

#include <aws/s3/S3Client.h>

#include <arcticdb/storage/s3/s3_client_interface.hpp>
#include <arcticdb/storage/s3/s3_client_impl.hpp>

#include <arcticdb/util/preconditions.hpp>
#include <arcticdb/util/pb_util.hpp>
#include <arcticdb/log/log.hpp>
#include <arcticdb/util/buffer_pool.hpp>

#include <arcticdb/storage/object_store_utils.hpp>
#include <arcticdb/storage/storage_utils.hpp>
#include <arcticdb/entity/serialized_key.hpp>
#include <arcticdb/util/configs_map.hpp>
#include <arcticdb/util/composite.hpp>

namespace arcticdb::storage::s3 {

// A wrapper around the actual S3 client which can simulate failures based on the configuration.
// The S3ClientTestWrapper delegates to the real client by default, but can intercept operations
// to simulate failures or track operations for testing purposes.
class S3ClientTestWrapper : public S3ClientInterface {
public:
explicit S3ClientTestWrapper(std::unique_ptr<S3ClientInterface> actual_client) :
actual_client_(std::move(actual_client)) {
}

~S3ClientTestWrapper() override = default;

[[nodiscard]] S3Result<std::monostate> head_object(
const std::string& s3_object_name,
const std::string& bucket_name) const override;

[[nodiscard]] S3Result<Segment> get_object(
const std::string& s3_object_name,
const std::string& bucket_name) const override;

[[nodiscard]] folly::Future<S3Result<Segment>> get_object_async(
const std::string& s3_object_name,
const std::string& bucket_name) const override;

S3Result<std::monostate> put_object(
const std::string& s3_object_name,
Segment& segment,
const std::string& bucket_name,
PutHeader header = PutHeader::NONE) override;

S3Result<DeleteOutput> delete_objects(
const std::vector<std::string>& s3_object_names,
const std::string& bucket_name) override;

S3Result<ListObjectsOutput> list_objects(
const std::string& prefix,
const std::string& bucket_name,
const std::optional<std::string>& continuation_token) const override;

private:
// Returns error if failures are enabled for the given bucket
std::optional<Aws::S3::S3Error> has_failure_trigger(const std::string& bucket_name) const;

std::unique_ptr<S3ClientInterface> actual_client_;
};

}
14 changes: 11 additions & 3 deletions cpp/arcticdb/storage/s3/s3_settings.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,19 @@ class S3Settings {
bool use_raw_prefix_;
AWSAuthMethod aws_auth_;
std::string aws_profile_;
bool use_internal_client_wrapper_for_testing_;

public:
explicit S3Settings(AWSAuthMethod aws_auth, const std::string& aws_profile) :
explicit S3Settings(AWSAuthMethod aws_auth,
const std::string& aws_profile,
bool use_internal_client_wrapper_for_testing) :
aws_auth_(aws_auth),
aws_profile_(aws_profile) {
aws_profile_(aws_profile),
use_internal_client_wrapper_for_testing_(use_internal_client_wrapper_for_testing) {
}

explicit S3Settings(const arcticc::pb2::s3_storage_pb2::Config& config) :
S3Settings(AWSAuthMethod::DISABLED, "")
S3Settings(AWSAuthMethod::DISABLED, "", false)
{
update(config);
}
Expand Down Expand Up @@ -135,6 +139,10 @@ class S3Settings {
return aws_auth_;
}

bool use_internal_client_wrapper_for_testing() const {
return use_internal_client_wrapper_for_testing_;
}

std::string aws_profile() const {
return aws_profile_;
}
Expand Down
6 changes: 6 additions & 0 deletions cpp/arcticdb/storage/s3/s3_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include <arcticdb/log/log.hpp>
#include <arcticdb/storage/s3/s3_api.hpp>
#include <arcticdb/storage/s3/s3_client_wrapper.hpp>
#include <arcticdb/util/buffer_pool.hpp>
#include <arcticdb/storage/object_store_utils.hpp>
#include <arcticdb/storage/storage_options.hpp>
Expand Down Expand Up @@ -135,6 +136,11 @@ void S3Storage::create_s3_client(const S3Settings &conf, const Aws::Auth::AWSCre
ARCTICDB_RUNTIME_DEBUG(log::storage(), "Using provided auth credentials");
s3_client_ = std::make_unique<S3ClientImpl>(creds, get_s3_config(conf), Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, conf.use_virtual_addressing());
}

if (conf.use_internal_client_wrapper_for_testing()){
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see a similar change for the nfs_backed_storage. Should we add one?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also I like how this allows to use a S3ClientWrapper around a MockS3Client to be able to simulate both types of failures. Nice :)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't have S3Settings for nfs_backed_storage and at the moment it is not needed.
I think that we should add something like for it and all other storages, but I don't think that this is in the scope of this change.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah makes sense. Didn't realize S3Settings does not apply to nfs.

ARCTICDB_RUNTIME_DEBUG(log::storage(), "Using internal client wrapper for testing");
s3_client_ = std::make_unique<S3ClientTestWrapper>(std::move(s3_client_));
}
}

S3Storage::S3Storage(const LibraryPath &library_path, OpenMode mode, const S3Settings &conf) :
Expand Down
Loading
Loading