From b23cd9f93822a3d07351e52faa8f0106feb61110 Mon Sep 17 00:00:00 2001 From: Pedro Nauck Date: Fri, 27 Dec 2024 01:20:16 -0300 Subject: [PATCH] fix(repo): S3 payload and encoding (#366) * refactor(repo): Change from S3 specific to a Storage trait * refactor(repo): use displaydoc instead of error * fix(data-parser): adjust encode to support internal messages * fix(repo): s3 being saved and parsed corretly * fix(repo): final adjustments * build(repo): fix lint * build(repo): fix chart * build(repo): fix tests * fix(storage): add retry mechanism * build(repo): fix lint * fix(repo): small adjustments --- .env.sample | 1 + .github/workflows/ci.yaml | 10 +- Cargo.lock | 52 +- Cargo.toml | 7 - benches/data-parser/Cargo.toml | 1 + .../benches/deserialize_decompress.rs | 5 +- benches/data-parser/src/lib.rs | 12 +- cluster/charts/fuel-streams/Chart.yaml | 2 +- .../templates/nats/accounts-secret.yaml | 15 - .../templates/nats/certificate.yaml | 57 -- cluster/charts/fuel-streams/values.yaml | 5 - cluster/docker/docker-compose.yml | 8 +- cluster/docker/init-localstack.sh | 2 +- crates/fuel-data-parser/Cargo.toml | 29 +- crates/fuel-data-parser/README.md | 6 +- .../src/compression_strategies.rs | 45 +- crates/fuel-data-parser/src/error.rs | 22 +- crates/fuel-data-parser/src/lib.rs | 212 +++++-- crates/fuel-streams-core/Cargo.toml | 5 +- crates/fuel-streams-core/README.md | 7 +- crates/fuel-streams-core/src/blocks/mod.rs | 14 +- crates/fuel-streams-core/src/inputs/mod.rs | 6 +- crates/fuel-streams-core/src/inputs/types.rs | 48 +- crates/fuel-streams-core/src/lib.rs | 12 +- crates/fuel-streams-core/src/logs/mod.rs | 6 +- crates/fuel-streams-core/src/outputs/mod.rs | 6 +- .../fuel-streams-core/src/primitive_types.rs | 234 ++++---- crates/fuel-streams-core/src/receipts/mod.rs | 6 +- .../fuel-streams-core/src/receipts/types.rs | 4 - crates/fuel-streams-core/src/stream/error.rs | 16 +- .../src/stream/fuel_streams.rs | 25 +- crates/fuel-streams-core/src/stream/mod.rs | 2 - .../src/stream/stream_encoding.rs | 83 --- .../src/stream/stream_impl.rs | 185 ++++-- .../fuel-streams-core/src/transactions/mod.rs | 6 +- .../src/transactions/types.rs | 37 +- crates/fuel-streams-core/src/utxos/mod.rs | 6 +- .../fuel-streams-core/src/utxos/subjects.rs | 14 +- crates/fuel-streams-core/src/utxos/types.rs | 4 +- crates/fuel-streams-executors/Cargo.toml | 4 +- crates/fuel-streams-executors/src/inputs.rs | 2 +- crates/fuel-streams-executors/src/lib.rs | 44 +- .../src/transactions.rs | 2 +- crates/fuel-streams-executors/src/utxos.rs | 6 +- crates/fuel-streams-storage/Cargo.toml | 6 +- crates/fuel-streams-storage/src/lib.rs | 6 + crates/fuel-streams-storage/src/retry.rs | 120 ++++ .../fuel-streams-storage/src/s3/s3_client.rs | 538 +++++++++++------- .../src/s3/s3_client_opts.rs | 173 +++--- crates/fuel-streams-storage/src/storage.rs | 47 ++ .../src/storage_config.rs | 46 ++ crates/sv-consumer/Cargo.toml | 2 + crates/sv-consumer/src/main.rs | 94 +-- crates/sv-publisher/Cargo.toml | 1 + crates/sv-publisher/src/main.rs | 43 +- crates/sv-webserver/Cargo.toml | 5 +- crates/sv-webserver/src/server/context.rs | 15 +- crates/sv-webserver/src/server/ws/errors.rs | 6 +- crates/sv-webserver/src/server/ws/socket.rs | 91 ++- crates/sv-webserver/src/telemetry/system.rs | 5 +- tests/src/lib.rs | 14 +- tests/tests/client.rs | 8 +- tests/tests/publisher.rs | 32 +- 63 files changed, 1487 insertions(+), 1040 deletions(-) delete mode 100644 cluster/charts/fuel-streams/templates/nats/accounts-secret.yaml delete mode 100644 cluster/charts/fuel-streams/templates/nats/certificate.yaml delete mode 100644 crates/fuel-streams-core/src/stream/stream_encoding.rs create mode 100644 crates/fuel-streams-storage/src/retry.rs create mode 100644 crates/fuel-streams-storage/src/storage.rs create mode 100644 crates/fuel-streams-storage/src/storage_config.rs diff --git a/.env.sample b/.env.sample index c111ace9..43760dff 100644 --- a/.env.sample +++ b/.env.sample @@ -9,6 +9,7 @@ AWS_ENDPOINT_URL=http://localhost:4566 AWS_REGION=us-east-1 AWS_S3_ENABLED=false AWS_S3_BUCKET_NAME=fuel-streams-local +STORAGE_MAX_RETRIES=5 # NATS Configuration NATS_URL=nats://localhost:4222 diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 590ef420..7245f0fa 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -222,7 +222,7 @@ jobs: AWS_SECRET_ACCESS_KEY: test AWS_REGION: us-east-1 AWS_ENDPOINT_URL: http://localhost:4566 - AWS_S3_BUCKET_NAME: fuel-streams-test + AWS_S3_BUCKET_NAME: fuel-streams-local strategy: fail-fast: false matrix: @@ -247,16 +247,16 @@ jobs: tool: cargo-nextest locked: true - - name: Start Nats + - name: Start Docker run: | - make start-nats + make start-docker - name: Run tests run: make test PACKAGE=${{ matrix.package }} PROFILE=ci - - name: Stop Nats + - name: Stop Docker if: always() - run: make stop-nats + run: make stop-docker build: needs: install-deps diff --git a/Cargo.lock b/Cargo.lock index 8b55aaca..0cbb7f53 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -490,10 +490,8 @@ checksum = "df895a515f70646414f4b45c0b79082783b80552b373a68283012928df56f522" dependencies = [ "brotli 7.0.0", "bzip2", - "deflate64", "flate2", "futures-core", - "futures-io", "memchr", "pin-project-lite", "tokio", @@ -2616,6 +2614,7 @@ dependencies = [ "fuel-core-types 0.40.2", "fuel-data-parser", "rand", + "serde", "strum 0.26.3", "tokio", ] @@ -2629,12 +2628,6 @@ dependencies = [ "uuid", ] -[[package]] -name = "deflate64" -version = "0.1.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "da692b8d1080ea3045efaab14434d40468c3d8657e42abddfffca87b428f4c1b" - [[package]] name = "der" version = "0.6.1" @@ -4133,7 +4126,6 @@ dependencies = [ "anyhow", "async-nats", "async-trait", - "chrono", "displaydoc", "fuel-core", "fuel-core-bin", @@ -4171,8 +4163,10 @@ name = "fuel-streams-executors" version = "0.0.13" dependencies = [ "anyhow", - "async-nats", + "displaydoc", + "dotenvy", "fuel-core", + "fuel-data-parser", "fuel-streams-core", "futures", "num_cpus", @@ -4211,10 +4205,10 @@ dependencies = [ name = "fuel-streams-storage" version = "0.0.13" dependencies = [ + "async-trait", "aws-config", "aws-sdk-s3", - "aws-smithy-runtime-api", - "aws-smithy-types", + "displaydoc", "dotenvy", "pretty_assertions", "rand", @@ -4222,6 +4216,7 @@ dependencies = [ "thiserror 2.0.9", "tokio", "tracing", + "tracing-test", ] [[package]] @@ -8719,9 +8714,9 @@ dependencies = [ [[package]] name = "serde_with" -version = "3.11.0" +version = "3.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e28bdad6db2b8340e449f7108f020b3b092e8583a9e3fb82713e1d4e71fe817" +checksum = "d6b6f7f2fcb69f747921f79f3926bd1e203fce4fef62c268dd3abfb6d86029aa" dependencies = [ "base64 0.22.1", "chrono", @@ -8737,9 +8732,9 @@ dependencies = [ [[package]] name = "serde_with_macros" -version = "3.11.0" +version = "3.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9d846214a9854ef724f3da161b426242d8de7c1fc7de2f89bb1efcb154dca79d" +checksum = "8d00caa5193a3c8362ac2b73be6b9e768aa5a4b2f721d8f4b339600c3cb51f8e" dependencies = [ "darling 0.20.10", "proc-macro2", @@ -9117,11 +9112,13 @@ dependencies = [ "anyhow", "async-nats", "clap 4.5.23", + "displaydoc", "dotenvy", "fuel-core", "fuel-streams-core", "fuel-streams-executors", "futures", + "hex", "num_cpus", "openssl", "serde_json", @@ -9140,6 +9137,7 @@ dependencies = [ "anyhow", "async-nats", "clap 4.5.23", + "displaydoc", "fuel-core", "fuel-core-bin", "fuel-core-types 0.40.2", @@ -9171,6 +9169,7 @@ dependencies = [ "displaydoc", "dotenvy", "elasticsearch", + "fuel-data-parser", "fuel-streams-core", "fuel-streams-nats", "fuel-streams-storage", @@ -10095,6 +10094,27 @@ dependencies = [ "tracing-serde", ] +[[package]] +name = "tracing-test" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "557b891436fe0d5e0e363427fc7f217abf9ccd510d5136549847bdcbcd011d68" +dependencies = [ + "tracing-core", + "tracing-subscriber", + "tracing-test-macro", +] + +[[package]] +name = "tracing-test-macro" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "04659ddb06c87d233c566112c1c9c5b9e98256d9af50ec3bc9c8327f873a7568" +dependencies = [ + "quote", + "syn 2.0.91", +] + [[package]] name = "triomphe" version = "0.1.14" diff --git a/Cargo.toml b/Cargo.toml index a12efbce..94da1905 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,8 +26,6 @@ actix-web = "4.9" anyhow = "1.0" async-nats = "0.38" async-trait = "0.1" -assert_matches = "1.5.0" -bytes = "1.9" chrono = { version = "0.4", features = ["serde"] } clap = { version = "4.5", features = ["derive", "env"] } dotenvy = "0.15" @@ -49,9 +47,6 @@ fuel-core-storage = { version = "0.40.2" } fuel-core-types = { version = "0.40.2", default-features = false, features = ["std", "serde"] } fuel-core-services = { version = "0.40.2", default-features = false, features = ["test-helpers"] } futures-util = "0.3" -itertools = "0.13" -mockall = "0.13" -mockall_double = "0.3.1" hex = "0.4" pretty_assertions = "1.4" num_cpus = "1.16" @@ -63,7 +58,6 @@ sha2 = "0.10" strum = "0.26" strum_macros = "0.26" tokio = { version = "1.41", features = ["full"] } -tokio-stream = "0.1.16" tracing = "0.1" tracing-subscriber = "0.3" tracing-actix-web = "0.7" @@ -78,7 +72,6 @@ fuel-streams-storage = { version = "0.0.13", path = "crates/fuel-streams-storage fuel-streams-executors = { version = "0.0.13", path = "crates/fuel-streams-executors" } subject-derive = { version = "0.0.13", path = "crates/fuel-streams-macros/subject-derive" } sv-publisher = { version = "0.0.13", path = "crates/sv-publisher" } -sv-consumer = { version = "0.0.13", path = "crates/sv-consumer" } sv-webserver = { version = "0.0.13", path = "crates/sv-webserver" } # Workspace projects diff --git a/benches/data-parser/Cargo.toml b/benches/data-parser/Cargo.toml index c857c664..a56bf2d9 100644 --- a/benches/data-parser/Cargo.toml +++ b/benches/data-parser/Cargo.toml @@ -34,6 +34,7 @@ path = "benches/deserialize_decompress.rs" fuel-core-types = { workspace = true } fuel-data-parser = { workspace = true, features = ["test-helpers", "bench-helpers"] } rand = { workspace = true } +serde = { workspace = true } strum = { workspace = true } tokio = { workspace = true } diff --git a/benches/data-parser/benches/deserialize_decompress.rs b/benches/data-parser/benches/deserialize_decompress.rs index 91d019dc..1abc1978 100644 --- a/benches/data-parser/benches/deserialize_decompress.rs +++ b/benches/data-parser/benches/deserialize_decompress.rs @@ -1,6 +1,5 @@ use criterion::{black_box, criterion_group, criterion_main, Criterion}; -use data_parser::generate_test_block; -use fuel_core_types::{blockchain::block::Block, fuel_tx::Transaction}; +use data_parser::{generate_test_block, TestBlock}; use fuel_data_parser::{ DataParser, SerializationType, @@ -56,7 +55,7 @@ fn bench_decompress_deserialize(c: &mut Criterion) { b.to_async(&runtime).iter(|| async { let deserialized_and_decompressed = data_parser - .decode::>(serialized_and_compressed) + .decode::(serialized_and_compressed) .await .expect("decompresison and deserialization"); diff --git a/benches/data-parser/src/lib.rs b/benches/data-parser/src/lib.rs index 1e5b9f47..281b5c86 100644 --- a/benches/data-parser/src/lib.rs +++ b/benches/data-parser/src/lib.rs @@ -8,9 +8,17 @@ use fuel_core_types::{ fuel_types::BlockHeight, tai64::Tai64, }; +use fuel_data_parser::{DataEncoder, DataParserError}; use rand::Rng; -pub fn generate_test_block() -> Block { +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct TestBlock(Block); + +impl DataEncoder for TestBlock { + type Err = DataParserError; +} + +pub fn generate_test_block() -> TestBlock { let mut rng = rand::thread_rng(); let block_height: u32 = rng.gen_range(1..100); let block_txs: u32 = rng.gen_range(1..100); @@ -40,7 +48,7 @@ pub fn generate_test_block() -> Block { block .header_mut() .set_transaction_root(Bytes32::new(tx_root)); - block + TestBlock(block) } pub fn generate_test_tx() -> Transaction { diff --git a/cluster/charts/fuel-streams/Chart.yaml b/cluster/charts/fuel-streams/Chart.yaml index 228de229..50edd0a5 100755 --- a/cluster/charts/fuel-streams/Chart.yaml +++ b/cluster/charts/fuel-streams/Chart.yaml @@ -2,7 +2,7 @@ apiVersion: v2 appVersion: "1.0" description: A Helm chart for Kubernetes name: fuel-streams -version: 0.7.2 +version: 0.7.3 dependencies: - name: nats version: 1.2.8 diff --git a/cluster/charts/fuel-streams/templates/nats/accounts-secret.yaml b/cluster/charts/fuel-streams/templates/nats/accounts-secret.yaml deleted file mode 100644 index f5b8c743..00000000 --- a/cluster/charts/fuel-streams/templates/nats/accounts-secret.yaml +++ /dev/null @@ -1,15 +0,0 @@ -{{- $secret := .Values.natsAccountsSecret }} -{{- if $secret.enabled }} -apiVersion: v1 -kind: Secret -metadata: - {{- include "k8s.metadata" (dict "context" . "suffix" "-nats-accounts") | nindent 2 }} - labels: - {{- include "fuel-streams.labels" (dict "name" "nats-accounts" "context" .) | nindent 4 }} - app.kubernetes.io/component: nats -type: Opaque -data: - {{- if $secret.data }} - {{- toYaml $secret.data | nindent 2 }} - {{- end }} -{{- end }} diff --git a/cluster/charts/fuel-streams/templates/nats/certificate.yaml b/cluster/charts/fuel-streams/templates/nats/certificate.yaml deleted file mode 100644 index 46b00bba..00000000 --- a/cluster/charts/fuel-streams/templates/nats/certificate.yaml +++ /dev/null @@ -1,57 +0,0 @@ -{{- $cert := .Values.natsExternalService.certificate}} -{{- $service := .Values.natsExternalService.service }} -{{- if and .Values.natsExternalService.enabled $service.dns }} -apiVersion: cert-manager.io/v1 -kind: Certificate -metadata: - {{- include "k8s.metadata" (dict "context" . "suffix" "-nats-cert") | nindent 2 }} - annotations: - {{- include "set-value" (dict "context" $cert "path" "annotations") | nindent 4 }} - labels: - {{- include "fuel-streams.labels" (dict "name" "nats-client" "context" .) | nindent 4 }} - {{- include "set-value" (dict "context" $cert "path" "labels") | nindent 4 }} - app.kubernetes.io/component: nats -spec: - secretName: {{ include "fuel-streams.fullname" . }}-nats-tls - duration: {{ $cert.duration }} - renewBefore: {{ $cert.renewBefore }} - dnsNames: - - {{ $service.dns }} - issuerRef: - name: {{ $cert.issuer }} - kind: ClusterIssuer ---- -apiVersion: networking.k8s.io/v1 -kind: Ingress -metadata: - {{- include "k8s.metadata" (dict "context" . "suffix" "-nats-cert-validator") | nindent 2 }} - labels: - {{- include "fuel-streams.labels" (dict "name" "nats-client" "context" .) | nindent 4 }} - {{- include "set-value" (dict "context" $cert "path" "labels") | nindent 4 }} - app.kubernetes.io/component: nats - annotations: - cert-manager.io/cluster-issuer: {{ $cert.issuer }} - kubernetes.io/ingress.class: nginx - acme.cert-manager.io/http01-ingress-class: nginx - nginx.ingress.kubernetes.io/ssl-redirect: "false" - nginx.ingress.kubernetes.io/force-ssl-redirect: "false" - cert-manager.io/common-name: {{ $service.dns }} - {{- include "set-value" (dict "context" $cert "path" "annotations") | nindent 4 }} -spec: - ingressClassName: nginx - tls: - - hosts: - - {{ $service.dns }} - secretName: {{ include "fuel-streams.fullname" . }}-nats-tls - rules: - - host: {{ $service.dns }} - http: - paths: - - path: /.well-known/acme-challenge/ - pathType: Prefix - backend: - service: - name: cm-acme-http-solver - port: - number: 8089 -{{- end }} diff --git a/cluster/charts/fuel-streams/values.yaml b/cluster/charts/fuel-streams/values.yaml index 3c44142d..7ca20973 100755 --- a/cluster/charts/fuel-streams/values.yaml +++ b/cluster/charts/fuel-streams/values.yaml @@ -221,11 +221,6 @@ consumer: podValue: 4 periodSeconds: 15 - env: - PORT: 8080 - PUBLISHER_MAX_THREADS: "32" - NATS_URL: "fuel-streams-nats-publisher:4222" - # ------------------------------------------------------------------------------------------------- # Consumer configuration # ------------------------------------------------------------------------------------------------- diff --git a/cluster/docker/docker-compose.yml b/cluster/docker/docker-compose.yml index 34b76756..eff95417 100644 --- a/cluster/docker/docker-compose.yml +++ b/cluster/docker/docker-compose.yml @@ -54,10 +54,10 @@ services: environment: - SERVICES=s3 # Enable just S3 service - DEBUG=1 - - AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID} - - AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY} - - DEFAULT_REGION=${AWS_REGION} - - DEFAULT_BUCKETS=${AWS_S3_BUCKET_NAME} + - AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID:-test} + - AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY:-test} + - DEFAULT_REGION=${AWS_REGION:-us-east-1} + - DEFAULT_BUCKETS=${AWS_S3_BUCKET_NAME:-fuel-streams-local} volumes: - ./localstack-data:/var/lib/localstack - /var/run/docker.sock:/var/run/docker.sock diff --git a/cluster/docker/init-localstack.sh b/cluster/docker/init-localstack.sh index befa0901..d88f344c 100755 --- a/cluster/docker/init-localstack.sh +++ b/cluster/docker/init-localstack.sh @@ -3,6 +3,6 @@ set -e echo "Creating S3 bucket in LocalStack..." -BUCKET_NAME=${AWS_S3_BUCKET_NAME:-fuel-streams-test} +BUCKET_NAME=${AWS_S3_BUCKET_NAME:-fuel-streams-local} awslocal s3 mb "s3://${BUCKET_NAME}" echo "Bucket created: ${BUCKET_NAME}" diff --git a/crates/fuel-data-parser/Cargo.toml b/crates/fuel-data-parser/Cargo.toml index ba9de63f..6140c616 100644 --- a/crates/fuel-data-parser/Cargo.toml +++ b/crates/fuel-data-parser/Cargo.toml @@ -11,21 +11,38 @@ version = { workspace = true } rust-version = { workspace = true } [dependencies] -async-compression = { version = "0.4", features = ["all"] } +async-compression = { version = "0.4", features = ["tokio"], optional = true } async-trait = { workspace = true } -bincode = "1.3" +bincode = { version = "1.3", optional = true } displaydoc = { workspace = true } lazy_static = "1.5" paste = "1.0" -postcard = { version = "1.0", features = ["alloc"] } +postcard = { version = "1.0", features = ["alloc"], optional = true } serde = { workspace = true } -serde_json = { workspace = true } +serde_json = { workspace = true, optional = true } strum = { workspace = true, features = ["derive"] } strum_macros = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true } [features] -default = [] +default = ["json", "zstd"] +all = ["bincode", "postcard", "json", "zlib", "gzip", "brotli", "bzip2", "lzma", "deflate", "zstd"] + +# Serialization formats +bincode = ["dep:bincode"] +postcard = ["dep:postcard"] +json = ["dep:serde_json"] + +# Compression strategies +zlib = ["dep:async-compression", "async-compression/zlib"] +gzip = ["dep:async-compression", "async-compression/gzip"] +brotli = ["dep:async-compression", "async-compression/brotli"] +bzip2 = ["dep:async-compression", "async-compression/bzip2"] +lzma = ["dep:async-compression", "async-compression/lzma"] +deflate = ["dep:async-compression", "async-compression/deflate"] +zstd = ["dep:async-compression", "async-compression/zstd"] + +# Helper features test-helpers = [] -bench-helpers = [] +bench-helpers = ["all"] diff --git a/crates/fuel-data-parser/README.md b/crates/fuel-data-parser/README.md index a6fb58de..0eaebbc8 100644 --- a/crates/fuel-data-parser/README.md +++ b/crates/fuel-data-parser/README.md @@ -42,7 +42,7 @@ The `DataParser` struct provides functionality for encoding and decoding data th This library is intended for internal use within the Fuel Data Systems project. This is an example of usage outside of this crate within the project: ```rust -use fuel_data_parser::{DataParser, SerializationType, DataParseable}; +use fuel_data_parser::{DataEncoder, DataParser, SerializationType, DataParserError}; use serde::{Serialize, Deserialize}; #[derive(Debug, Clone, Serialize, Deserialize)] @@ -50,6 +50,10 @@ struct YourDataType { // Your data fields here } +impl DataEncoder for YourDataType { + type Err = DataParserError; +} + async fn example_usage() -> Result<(), Box> { let parser = DataParser::default() .with_serialization_type(SerializationType::Bincode); diff --git a/crates/fuel-data-parser/src/compression_strategies.rs b/crates/fuel-data-parser/src/compression_strategies.rs index c2d86a00..78df0202 100644 --- a/crates/fuel-data-parser/src/compression_strategies.rs +++ b/crates/fuel-data-parser/src/compression_strategies.rs @@ -111,68 +111,70 @@ macro_rules! define_compression_strategy { }; } +#[cfg(feature = "zlib")] #[derive(Clone)] -pub struct ZLibCompressionStrategy; +pub struct ZlibCompressionStrategy; +#[cfg(feature = "zlib")] define_compression_strategy!( - ZLibCompressionStrategy, + ZlibCompressionStrategy, Zlib, CompressionLevel::Fastest ); -#[cfg(feature = "bench-helpers")] +#[cfg(feature = "gzip")] #[derive(Clone)] pub struct GzipCompressionStrategy; -#[cfg(feature = "bench-helpers")] +#[cfg(feature = "gzip")] define_compression_strategy!( GzipCompressionStrategy, Gzip, CompressionLevel::Fastest ); -#[cfg(feature = "bench-helpers")] +#[cfg(feature = "brotli")] #[derive(Clone)] pub struct BrotliCompressionStrategy; -#[cfg(feature = "bench-helpers")] +#[cfg(feature = "brotli")] define_compression_strategy!( BrotliCompressionStrategy, Brotli, CompressionLevel::Fastest ); -#[cfg(feature = "bench-helpers")] +#[cfg(feature = "bzip2")] #[derive(Clone)] pub struct BzCompressionStrategy; -#[cfg(feature = "bench-helpers")] +#[cfg(feature = "bzip2")] define_compression_strategy!( BzCompressionStrategy, Bz, CompressionLevel::Fastest ); -#[cfg(feature = "bench-helpers")] +#[cfg(feature = "lzma")] #[derive(Clone)] pub struct LzmaCompressionStrategy; -#[cfg(feature = "bench-helpers")] +#[cfg(feature = "lzma")] define_compression_strategy!( LzmaCompressionStrategy, Lzma, CompressionLevel::Fastest ); -#[cfg(feature = "bench-helpers")] +#[cfg(feature = "deflate")] #[derive(Clone)] pub struct DeflateCompressionStrategy; -#[cfg(feature = "bench-helpers")] +#[cfg(feature = "deflate")] define_compression_strategy!( DeflateCompressionStrategy, Deflate, CompressionLevel::Fastest ); -#[cfg(feature = "bench-helpers")] +#[cfg(feature = "zstd")] #[derive(Clone)] pub struct ZstdCompressionStrategy; -#[cfg(feature = "bench-helpers")] +#[cfg(feature = "zstd")] define_compression_strategy!( ZstdCompressionStrategy, Zstd, @@ -182,19 +184,24 @@ define_compression_strategy!( use std::sync::Arc; lazy_static::lazy_static! { - pub static ref DEFAULT_COMPRESSION_STRATEGY: Arc = Arc::new(ZLibCompressionStrategy); + pub static ref DEFAULT_COMPRESSION_STRATEGY: Arc = Arc::new(ZstdCompressionStrategy); } -#[cfg(feature = "bench-helpers")] lazy_static::lazy_static! { - pub static ref ALL_COMPRESSION_STRATEGIES: [Arc; 7] = [ - Arc::new(ZLibCompressionStrategy), + pub static ref ALL_COMPRESSION_STRATEGIES: Vec> = vec![ + #[cfg(feature = "zlib")] + Arc::new(ZlibCompressionStrategy), + #[cfg(feature = "gzip")] Arc::new(GzipCompressionStrategy), + #[cfg(feature = "brotli")] Arc::new(BrotliCompressionStrategy), + #[cfg(feature = "bzip2")] Arc::new(BzCompressionStrategy), + #[cfg(feature = "lzma")] Arc::new(LzmaCompressionStrategy), + #[cfg(feature = "deflate")] Arc::new(DeflateCompressionStrategy), + #[cfg(feature = "zstd")] Arc::new(ZstdCompressionStrategy), ]; - } diff --git a/crates/fuel-data-parser/src/error.rs b/crates/fuel-data-parser/src/error.rs index 6fe06639..e6564a7f 100644 --- a/crates/fuel-data-parser/src/error.rs +++ b/crates/fuel-data-parser/src/error.rs @@ -6,18 +6,25 @@ use thiserror::Error; /// Compression error types #[derive(Debug, DisplayDoc, Error)] pub enum CompressionError { + #[cfg(feature = "zlib")] /// Failed to compress or decompress data using zlib: {0} Zlib(std::io::Error), + #[cfg(feature = "gzip")] /// Failed to compress or decompress data using gzip: {0} Gzip(std::io::Error), + #[cfg(feature = "brotli")] /// Failed to compress or decompress data using brotli: {0} Brotli(std::io::Error), + #[cfg(feature = "bzip2")] /// Failed to compress or decompress data using bzip2: {0} Bz(std::io::Error), + #[cfg(feature = "lzma")] /// Failed to compress or decompress data using lzma: {0} Lzma(std::io::Error), + #[cfg(feature = "deflate")] /// Failed to compress or decompress data using deflate: {0} Deflate(std::io::Error), + #[cfg(feature = "zstd")] /// Failed to compress or decompress data using zstd: {0} Zstd(std::io::Error), } @@ -25,19 +32,32 @@ pub enum CompressionError { /// Serialization/Deserialization error types. #[derive(Debug, DisplayDoc, Error)] pub enum SerdeError { + #[cfg(feature = "bincode")] /// Failed to serialize or deserialize data using bincode: {0} Bincode(#[from] bincode::ErrorKind), + #[cfg(feature = "postcard")] /// Failed to serialize or deserialize data using postcard: {0} Postcard(#[from] postcard::Error), + #[cfg(feature = "json")] /// Failed to serialize or deserialize data using JSON: {0} Json(#[from] serde_json::Error), } /// Data parser error types. #[derive(Debug, DisplayDoc, Error)] -pub enum Error { +pub enum DataParserError { /// An error occurred during data compression or decompression: {0} Compression(#[from] CompressionError), /// An error occurred during data serialization or deserialization: {0} Serde(#[from] SerdeError), + /// An error occurred during data encoding: {0} + Encode(#[source] SerdeError), + /// An error occurred during data decoding: {0} + Decode(#[source] SerdeError), + #[cfg(feature = "json")] + /// An error occurred during data encoding to JSON: {0} + EncodeJson(#[source] SerdeError), + #[cfg(feature = "json")] + /// An error occurred during data decoding from JSON: {0} + DecodeJson(#[source] SerdeError), } diff --git a/crates/fuel-data-parser/src/lib.rs b/crates/fuel-data-parser/src/lib.rs index 78bc227f..5de469c5 100644 --- a/crates/fuel-data-parser/src/lib.rs +++ b/crates/fuel-data-parser/src/lib.rs @@ -6,25 +6,30 @@ mod error; use std::{fmt::Debug, sync::Arc}; pub use compression_strategies::*; +#[cfg(feature = "json")] +use serde::de::DeserializeOwned; -pub use crate::error::{CompressionError, Error, SerdeError}; +pub use crate::error::{CompressionError, DataParserError, SerdeError}; /// Serialization types supported for data parsing #[derive(Debug, Clone, strum::EnumIter, strum_macros::Display)] pub enum SerializationType { /// Bincode serialization + #[cfg(feature = "bincode")] #[strum(serialize = "bincode")] Bincode, - /// Postcard serialization - #[strum(serialize = "postcard")] - Postcard, /// json serialization + #[cfg(feature = "json")] #[strum(serialize = "json")] Json, + /// Postcard serialization + #[cfg(feature = "postcard")] + #[strum(serialize = "postcard")] + Postcard, } -/// Traits required for a data type to be parseable -pub trait DataParseable: +#[async_trait::async_trait] +pub trait DataEncoder: serde::Serialize + serde::de::DeserializeOwned + Clone @@ -33,17 +38,48 @@ pub trait DataParseable: + Debug + std::marker::Sized { -} + type Err: std::error::Error + From; -impl< - T: serde::Serialize - + serde::de::DeserializeOwned - + Clone - + Send - + Sync - + Debug, - > DataParseable for T -{ + fn data_parser() -> DataParser { + DataParser::default() + } + + async fn encode(&self) -> Result, Self::Err> { + Self::data_parser().encode(self).await.map_err(Into::into) + } + + #[cfg(feature = "json")] + fn encode_json(&self) -> Result, Self::Err> { + Self::data_parser().encode_json(self).map_err(Into::into) + } + + #[cfg(feature = "json")] + fn encode_json_value(&self) -> Result { + Self::data_parser() + .encode_json_value(self) + .map_err(Into::into) + } + + async fn decode(encoded: &[u8]) -> Result { + Self::data_parser() + .decode(encoded) + .await + .map_err(Into::into) + } + + #[cfg(feature = "json")] + fn decode_json(encoded: &[u8]) -> Result { + Self::data_parser().decode_json(encoded).map_err(Into::into) + } + + #[cfg(feature = "json")] + fn decode_json_value( + encoded: &serde_json::Value, + ) -> Result { + Self::data_parser() + .decode_json_value(encoded) + .map_err(Into::into) + } } /// `DataParser` is a utility struct for encoding (serializing and optionally compressing) @@ -61,7 +97,7 @@ impl< /// # Examples /// /// ``` -/// use fuel_data_parser::{DataParser, SerializationType}; +/// use fuel_data_parser::*; /// use std::sync::Arc; /// /// #[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq)] @@ -69,6 +105,10 @@ impl< /// field: String, /// } /// +/// impl DataEncoder for TestData { +/// type Err = DataParserError; +/// } +/// /// #[tokio::main] /// async fn main() -> Result<(), Box> { /// let parser = DataParser::default(); @@ -101,7 +141,7 @@ impl Default for DataParser { /// ``` fn default() -> Self { Self { - compression_strategy: None, + compression_strategy: Some(DEFAULT_COMPRESSION_STRATEGY.clone()), serialization_type: SerializationType::Json, } } @@ -121,7 +161,7 @@ impl DataParser { /// # Examples /// /// ``` - /// use fuel_data_parser::{DataParser, DEFAULT_COMPRESSION_STRATEGY}; + /// use fuel_data_parser::*; /// use std::sync::Arc; /// /// let parser = DataParser::default() @@ -148,10 +188,10 @@ impl DataParser { /// # Examples /// /// ``` - /// use fuel_data_parser::{DataParser, SerializationType}; + /// use fuel_data_parser::*; /// /// let parser = DataParser::default() - /// .with_serialization_type(SerializationType::Json); + /// .with_serialization_type(SerializationType::Postcard); /// ``` pub fn with_serialization_type( mut self, @@ -175,13 +215,17 @@ impl DataParser { /// # Examples /// /// ``` - /// use fuel_data_parser::DataParser; + /// use fuel_data_parser::*; /// /// #[derive(serde::Serialize, serde::Deserialize, Debug, Clone)] /// struct TestData { /// field: String, /// } /// + /// impl DataEncoder for TestData { + /// type Err = DataParserError; + /// } + /// /// #[tokio::main] /// async fn main() -> Result<(), Box> { /// let parser = DataParser::default(); @@ -191,10 +235,10 @@ impl DataParser { /// Ok(()) /// } /// ``` - pub async fn encode( + pub async fn encode( &self, data: &T, - ) -> Result, Error> { + ) -> Result, DataParserError> { let serialized_data = self.serialize(data).await?; Ok(match &self.compression_strategy { Some(strategy) => strategy.compress(&serialized_data[..]).await?, @@ -202,13 +246,22 @@ impl DataParser { }) } - pub fn encode_json( + #[cfg(feature = "json")] + pub fn encode_json( &self, data: &T, - ) -> Result, Error> { + ) -> Result, DataParserError> { self.serialize_json(data) } + #[cfg(feature = "json")] + pub fn encode_json_value( + &self, + data: &T, + ) -> Result { + self.serialize_json_value(data) + } + /// Serializes the provided data according to the selected `SerializationType`. /// /// # Arguments @@ -219,26 +272,39 @@ impl DataParser { /// /// A `Result` containing either a `Vec` of the serialized data, /// or an `Error` if serialization fails. - pub async fn serialize( + pub async fn serialize( &self, raw_data: &T, - ) -> Result, Error> { + ) -> Result, DataParserError> { match self.serialization_type { + #[cfg(feature = "bincode")] SerializationType::Bincode => bincode::serialize(&raw_data) - .map_err(|e| Error::Serde(SerdeError::Bincode(*e))), - SerializationType::Postcard => postcard::to_allocvec(&raw_data) - .map_err(|e| Error::Serde(SerdeError::Postcard(e))), + .map_err(|e| DataParserError::Encode(SerdeError::Bincode(*e))), + #[cfg(feature = "json")] SerializationType::Json => serde_json::to_vec(&raw_data) - .map_err(|e| Error::Serde(SerdeError::Json(e))), + .map_err(|e| DataParserError::EncodeJson(SerdeError::Json(e))), + #[cfg(feature = "postcard")] + SerializationType::Postcard => postcard::to_allocvec(&raw_data) + .map_err(|e| DataParserError::Encode(SerdeError::Postcard(e))), } } - fn serialize_json( + #[cfg(feature = "json")] + fn serialize_json( &self, raw_data: &T, - ) -> Result, Error> { + ) -> Result, DataParserError> { serde_json::to_vec(&raw_data) - .map_err(|e| Error::Serde(SerdeError::Json(e))) + .map_err(|e| DataParserError::EncodeJson(SerdeError::Json(e))) + } + + #[cfg(feature = "json")] + fn serialize_json_value( + &self, + raw_data: &T, + ) -> Result { + serde_json::to_value(raw_data) + .map_err(|e| DataParserError::EncodeJson(SerdeError::Json(e))) } /// Decodes the provided data by deserializing and optionally decompressing it. @@ -255,13 +321,17 @@ impl DataParser { /// # Examples /// /// ``` - /// use fuel_data_parser::DataParser; + /// use fuel_data_parser::*; /// /// #[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq)] /// struct TestData { /// field: String, /// } /// + /// impl DataEncoder for TestData { + /// type Err = DataParserError; + /// } + /// /// #[tokio::main] /// async fn main() -> Result<(), Box> { /// let parser = DataParser::default(); @@ -272,10 +342,10 @@ impl DataParser { /// Ok(()) /// } /// ``` - pub async fn decode( + pub async fn decode( &self, data: &[u8], - ) -> Result { + ) -> Result { let data = match &self.compression_strategy { Some(strategy) => strategy.decompress(data).await?, None => data.to_vec(), @@ -284,13 +354,22 @@ impl DataParser { Ok(decoded_data) } - pub fn decode_json( + #[cfg(feature = "json")] + pub fn decode_json( &self, data: &[u8], - ) -> Result { + ) -> Result { self.deserialize_json(data) } + #[cfg(feature = "json")] + pub fn decode_json_value( + &self, + data: &serde_json::Value, + ) -> Result { + self.deserialize_json_value(data) + } + /// Deserializes the provided data according to the selected `SerializationType`. /// /// # Arguments @@ -301,25 +380,38 @@ impl DataParser { /// /// A `Result` containing either the deserialized data structure, /// or an `Error` if deserialization fails. - pub fn deserialize<'a, T: serde::Deserialize<'a>>( + pub fn deserialize( &self, - raw_data: &'a [u8], - ) -> Result { + raw_data: &[u8], + ) -> Result { match self.serialization_type { + #[cfg(feature = "bincode")] SerializationType::Bincode => bincode::deserialize(raw_data) - .map_err(|e| Error::Serde(SerdeError::Bincode(*e))), - SerializationType::Postcard => postcard::from_bytes(raw_data) - .map_err(|e| Error::Serde(SerdeError::Postcard(e))), + .map_err(|e| DataParserError::Decode(SerdeError::Bincode(*e))), + #[cfg(feature = "json")] SerializationType::Json => self.deserialize_json(raw_data), + #[cfg(feature = "postcard")] + SerializationType::Postcard => postcard::from_bytes(raw_data) + .map_err(|e| DataParserError::Decode(SerdeError::Postcard(e))), } } - pub fn deserialize_json<'a, T: serde::Deserialize<'a>>( + #[cfg(feature = "json")] + fn deserialize_json( &self, - raw_data: &'a [u8], - ) -> Result { + raw_data: &[u8], + ) -> Result { serde_json::from_slice(raw_data) - .map_err(|e| Error::Serde(SerdeError::Json(e))) + .map_err(|e| DataParserError::DecodeJson(SerdeError::Json(e))) + } + + #[cfg(feature = "json")] + fn deserialize_json_value( + &self, + raw_data: &serde_json::Value, + ) -> Result { + serde_json::from_value(raw_data.clone()) + .map_err(|e| DataParserError::DecodeJson(SerdeError::Json(e))) } } @@ -332,6 +424,10 @@ mod tests { field: String, } + impl DataEncoder for TestData { + type Err = DataParserError; + } + #[tokio::test] async fn test_encode_decode() { let parser = DataParser::default(); @@ -350,8 +446,11 @@ mod tests { }; for serialization_type in [ + #[cfg(feature = "bincode")] SerializationType::Bincode, + #[cfg(feature = "postcard")] SerializationType::Postcard, + #[cfg(feature = "json")] SerializationType::Json, ] { let parser = DataParser::default() @@ -367,17 +466,10 @@ mod tests { let data = TestData { field: "test".to_string(), }; - let compression_strategies: Vec> = vec![ - Arc::new(ZLibCompressionStrategy), - #[cfg(feature = "bench-helpers")] - Arc::new(GzipCompressionStrategy), - #[cfg(feature = "bench-helpers")] - Arc::new(BrotliCompressionStrategy), - ]; - - for strategy in compression_strategies { + + for strategy in ALL_COMPRESSION_STRATEGIES.iter() { let parser = - DataParser::default().with_compression_strategy(&strategy); + DataParser::default().with_compression_strategy(strategy); let encoded = parser.encode(&data).await.unwrap(); let decoded: TestData = parser.decode(&encoded).await.unwrap(); assert_eq!(data, decoded); diff --git a/crates/fuel-streams-core/Cargo.toml b/crates/fuel-streams-core/Cargo.toml index 2087376f..1bb2426b 100644 --- a/crates/fuel-streams-core/Cargo.toml +++ b/crates/fuel-streams-core/Cargo.toml @@ -14,7 +14,6 @@ rust-version = { workspace = true } anyhow = { workspace = true } async-nats = { workspace = true } async-trait = { workspace = true } -chrono = { workspace = true } displaydoc = { workspace = true } fuel-core = { workspace = true, default-features = false, features = [ "p2p", @@ -34,8 +33,8 @@ fuel-core-storage = { workspace = true } fuel-core-types = { workspace = true, default-features = false, features = ["std", "serde"] } fuel-data-parser = { workspace = true } fuel-streams-macros = { workspace = true } -fuel-streams-nats = { workspace = true } -fuel-streams-storage = { workspace = true } +fuel-streams-nats = { workspace = true, features = ["test-helpers"] } +fuel-streams-storage = { workspace = true, features = ["test-helpers"] } futures = { workspace = true } hex = { workspace = true } pretty_assertions = { workspace = true, optional = true } diff --git a/crates/fuel-streams-core/README.md b/crates/fuel-streams-core/README.md index 69684062..5bdf9516 100644 --- a/crates/fuel-streams-core/README.md +++ b/crates/fuel-streams-core/README.md @@ -63,12 +63,11 @@ async fn main() -> BoxedResult<()> { // Connect to NATS server let nats_opts = NatsClientOpts::admin_opts(); let nats_client = NatsClient::connect(&nats_opts).await?; - - let s3_opts = S3ClientOpts::new(S3Env::Local, S3Role::Admin); - let s3_client = Arc::new(S3Client::new(&s3_opts).await?); + let storage_opts = S3StorageOpts::new(StorageEnv::Local, StorageRole::Admin); + let storage = Arc::new(S3Storage::new(storage_opts).await?); // Create a stream for blocks - let stream = Stream::::new(&nats_client, &s3_client).await; + let stream = Stream::::new(&nats_client, &storage).await; // Subscribe to the stream let wildcard = BlocksSubject::wildcard(None, None); // blocks.*.* diff --git a/crates/fuel-streams-core/src/blocks/mod.rs b/crates/fuel-streams-core/src/blocks/mod.rs index 2be88d23..6d8b5eef 100644 --- a/crates/fuel-streams-core/src/blocks/mod.rs +++ b/crates/fuel-streams-core/src/blocks/mod.rs @@ -4,9 +4,11 @@ pub mod types; pub use subjects::*; use super::types::*; -use crate::{StreamEncoder, Streamable}; +use crate::{DataEncoder, StreamError, Streamable}; -impl StreamEncoder for Block {} +impl DataEncoder for Block { + type Err = StreamError; +} impl Streamable for Block { const NAME: &'static str = "blocks"; const WILDCARD_LIST: &'static [&'static str] = &[BlocksSubject::WILDCARD]; @@ -18,6 +20,14 @@ mod tests { use super::*; + #[tokio::test] + async fn test_block_encode() { + let block = MockBlock::build(42); + let encoded = block.encode().await.unwrap(); + let decoded = Block::decode(&encoded).await.unwrap(); + assert_eq!(decoded, block, "Decoded block should match original"); + } + #[tokio::test] async fn test_serialization() { let header = BlockHeader { diff --git a/crates/fuel-streams-core/src/inputs/mod.rs b/crates/fuel-streams-core/src/inputs/mod.rs index 4e2e7db9..87e7e369 100644 --- a/crates/fuel-streams-core/src/inputs/mod.rs +++ b/crates/fuel-streams-core/src/inputs/mod.rs @@ -4,9 +4,11 @@ pub mod types; pub use subjects::*; use super::types::*; -use crate::{StreamEncoder, Streamable}; +use crate::{DataEncoder, StreamError, Streamable}; -impl StreamEncoder for Input {} +impl DataEncoder for Input { + type Err = StreamError; +} impl Streamable for Input { const NAME: &'static str = "inputs"; const WILDCARD_LIST: &'static [&'static str] = &[ diff --git a/crates/fuel-streams-core/src/inputs/types.rs b/crates/fuel-streams-core/src/inputs/types.rs index 91ff8c1b..1781e773 100644 --- a/crates/fuel-streams-core/src/inputs/types.rs +++ b/crates/fuel-streams-core/src/inputs/types.rs @@ -25,8 +25,8 @@ impl From<&FuelCoreInput> for Input { amount: input.amount, asset_id: input.asset_id.into(), owner: input.owner.into(), - predicate: HexString::default(), - predicate_data: HexString::default(), + predicate: HexData::default(), + predicate_data: HexData::default(), predicate_gas_used: 0, tx_pointer: input.tx_pointer.into(), utxo_id: input.utxo_id.into(), @@ -36,8 +36,8 @@ impl From<&FuelCoreInput> for Input { amount: input.amount, asset_id: input.asset_id.into(), owner: input.owner.into(), - predicate: input.predicate.as_slice().into(), - predicate_data: input.predicate_data.as_slice().into(), + predicate: HexData(input.predicate.as_slice().into()), + predicate_data: HexData(input.predicate_data.as_slice().into()), predicate_gas_used: input.predicate_gas_used, tx_pointer: input.tx_pointer.into(), utxo_id: input.utxo_id.into(), @@ -46,11 +46,11 @@ impl From<&FuelCoreInput> for Input { FuelCoreInput::MessageCoinSigned(input) => { Input::Message(InputMessage { amount: input.amount, - data: HexString::default(), + data: HexData::default(), nonce: input.nonce.into(), - predicate: HexString::default(), + predicate: HexData::default(), predicate_length: 0, - predicate_data: HexString::default(), + predicate_data: HexData::default(), predicate_data_length: 0, predicate_gas_used: 0, recipient: input.recipient.into(), @@ -61,11 +61,13 @@ impl From<&FuelCoreInput> for Input { FuelCoreInput::MessageCoinPredicate(input) => { Input::Message(InputMessage { amount: input.amount, - data: HexString::default(), + data: HexData::default(), nonce: input.nonce.into(), - predicate: input.predicate.as_slice().into(), + predicate: HexData(input.predicate.as_slice().into()), predicate_length: input.predicate.as_slice().len(), - predicate_data: input.predicate_data.as_slice().into(), + predicate_data: HexData( + input.predicate_data.as_slice().into(), + ), predicate_data_length: input .predicate_data .as_slice() @@ -79,11 +81,11 @@ impl From<&FuelCoreInput> for Input { FuelCoreInput::MessageDataSigned(input) => { Input::Message(InputMessage { amount: input.amount, - data: input.data.as_slice().into(), + data: HexData(input.data.as_slice().into()), nonce: input.nonce.into(), - predicate: HexString::default(), + predicate: HexData::default(), predicate_length: 0, - predicate_data: HexString::default(), + predicate_data: HexData::default(), predicate_data_length: 0, predicate_gas_used: 0, recipient: input.recipient.into(), @@ -94,11 +96,13 @@ impl From<&FuelCoreInput> for Input { FuelCoreInput::MessageDataPredicate(input) => { Input::Message(InputMessage { amount: input.amount, - data: input.data.as_slice().into(), + data: HexData(input.data.as_slice().into()), nonce: input.nonce.into(), - predicate: input.predicate.as_slice().into(), + predicate: HexData(input.predicate.as_slice().into()), predicate_length: input.predicate.as_slice().len(), - predicate_data: input.predicate_data.as_slice().into(), + predicate_data: HexData( + input.predicate_data.as_slice().into(), + ), predicate_data_length: input .predicate_data .as_slice() @@ -126,8 +130,8 @@ pub struct InputCoin { pub amount: u64, pub asset_id: AssetId, pub owner: Address, - pub predicate: HexString, - pub predicate_data: HexString, + pub predicate: HexData, + pub predicate_data: HexData, pub predicate_gas_used: u64, pub tx_pointer: TxPointer, pub utxo_id: UtxoId, @@ -162,11 +166,11 @@ impl From<&FuelCoreInputContract> for InputContract { #[serde(rename_all = "camelCase")] pub struct InputMessage { pub amount: u64, - pub data: HexString, + pub data: HexData, pub nonce: Nonce, - pub predicate: HexString, + pub predicate: HexData, pub predicate_length: usize, - pub predicate_data: HexString, + pub predicate_data: HexData, pub predicate_gas_used: u64, pub predicate_data_length: usize, pub recipient: Address, @@ -181,7 +185,7 @@ impl InputMessage { .chain(self.recipient.as_ref()) .chain(self.nonce.as_ref()) .chain(self.amount.to_be_bytes()) - .chain(self.data.as_ref()); + .chain(self.data.0.as_ref()); (*hasher.finalize()).into() } diff --git a/crates/fuel-streams-core/src/lib.rs b/crates/fuel-streams-core/src/lib.rs index 219576fc..c680eacc 100644 --- a/crates/fuel-streams-core/src/lib.rs +++ b/crates/fuel-streams-core/src/lib.rs @@ -12,8 +12,12 @@ pub mod nats { pub use fuel_streams_nats::*; } -pub mod s3 { - pub use fuel_streams_storage::s3::*; +pub mod storage { + pub use fuel_streams_storage::*; +} + +pub(crate) mod data_parser { + pub use fuel_data_parser::*; } pub mod stream; @@ -24,6 +28,7 @@ mod fuel_core_types; mod primitive_types; pub mod types; +pub(crate) use data_parser::*; pub use stream::*; pub mod prelude { @@ -31,9 +36,10 @@ pub mod prelude { pub use fuel_streams_macros::subject::*; pub use crate::{ + data_parser::*, fuel_core_like::*, nats::*, - s3::*, + storage::*, stream::*, subjects::*, types::*, diff --git a/crates/fuel-streams-core/src/logs/mod.rs b/crates/fuel-streams-core/src/logs/mod.rs index 08960304..49459fcf 100644 --- a/crates/fuel-streams-core/src/logs/mod.rs +++ b/crates/fuel-streams-core/src/logs/mod.rs @@ -4,9 +4,11 @@ pub mod types; pub use subjects::*; use super::types::*; -use crate::{StreamEncoder, Streamable}; +use crate::{DataEncoder, StreamError, Streamable}; -impl StreamEncoder for Log {} +impl DataEncoder for Log { + type Err = StreamError; +} impl Streamable for Log { const NAME: &'static str = "logs"; const WILDCARD_LIST: &'static [&'static str] = &[LogsSubject::WILDCARD]; diff --git a/crates/fuel-streams-core/src/outputs/mod.rs b/crates/fuel-streams-core/src/outputs/mod.rs index 568a215d..160f656a 100644 --- a/crates/fuel-streams-core/src/outputs/mod.rs +++ b/crates/fuel-streams-core/src/outputs/mod.rs @@ -4,9 +4,11 @@ pub mod types; pub use subjects::*; use super::types::*; -use crate::{StreamEncoder, Streamable}; +use crate::{DataEncoder, StreamError, Streamable}; -impl StreamEncoder for Output {} +impl DataEncoder for Output { + type Err = StreamError; +} impl Streamable for Output { const NAME: &'static str = "outputs"; const WILDCARD_LIST: &'static [&'static str] = &[ diff --git a/crates/fuel-streams-core/src/primitive_types.rs b/crates/fuel-streams-core/src/primitive_types.rs index 158f23af..c03696e6 100644 --- a/crates/fuel-streams-core/src/primitive_types.rs +++ b/crates/fuel-streams-core/src/primitive_types.rs @@ -7,57 +7,75 @@ pub use serde::{Deserialize, Serialize}; use crate::fuel_core_types::*; -/// Implements hex-formatted serialization and deserialization for a type -/// that implements Display and FromStr -macro_rules! impl_hex_serde { - ($type:ty) => { - impl Serialize for $type { +#[derive( + Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, Default, +)] +pub struct LongBytes(pub Vec); + +impl LongBytes { + pub fn zeroed() -> Self { + Self(vec![0; 32]) + } +} +impl AsRef<[u8]> for LongBytes { + fn as_ref(&self) -> &[u8] { + &self.0 + } +} +impl AsMut<[u8]> for LongBytes { + fn as_mut(&mut self) -> &mut [u8] { + &mut self.0 + } +} +impl From> for LongBytes { + fn from(value: Vec) -> Self { + Self(value) + } +} +impl std::fmt::Display for LongBytes { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", hex::encode(&self.0)) + } +} +impl From<&[u8]> for LongBytes { + fn from(value: &[u8]) -> Self { + Self(value.to_vec()) + } +} + +macro_rules! common_wrapper_type { + ($wrapper_type:ident, $inner_type:ty) => { + #[derive(Debug, Clone, PartialEq, Eq, Hash)] + pub struct $wrapper_type(pub $inner_type); + + // Custom serialization + impl Serialize for $wrapper_type { fn serialize(&self, serializer: S) -> Result where S: serde::Serializer, { - serializer.serialize_str(&self.to_string()) + if serializer.is_human_readable() { + serializer.serialize_str(&format!("0x{}", self.0)) + } else { + self.0.serialize(serializer) + } } } - impl<'de> Deserialize<'de> for $type { + // Custom deserialization using FromStr + impl<'de> Deserialize<'de> for $wrapper_type { fn deserialize(deserializer: D) -> Result where D: serde::Deserializer<'de>, { - let s = String::deserialize(deserializer)?; - s.parse().map_err(serde::de::Error::custom) + if deserializer.is_human_readable() { + let s = String::deserialize(deserializer)?; + s.parse().map_err(serde::de::Error::custom) + } else { + Ok($wrapper_type(<$inner_type>::deserialize(deserializer)?)) + } } } - }; -} - -/// Macro to generate a wrapper type for different byte-based types (including Address type). -/// -/// This macro creates a new struct that wraps the specified inner type, -/// typically used for various byte-based identifiers in the Fuel ecosystem. -/// It automatically implements: -/// -/// - `From` for easy conversion from the inner type -/// - `Display` for formatting (prefixes the output with "0x") -/// - `PartialEq` for equality comparison -/// - A `zeroed()` method to create an instance filled with zeros -/// -/// # Usage -/// -/// ```no_compile -/// # use fuel_streams_core::generate_byte_type_wrapper; -/// generate_byte_type_wrapper!(AddressWrapped, fuel_core_types::fuel_tx::Address); -/// ``` -/// -/// Where `WrapperType` is the name of the new wrapper struct to be created, -/// and `InnerType` is the type being wrapped. -macro_rules! generate_byte_type_wrapper { - ($wrapper_type:ident, $inner_type:ty, $byte_size:expr) => { - #[derive(Debug, Clone, PartialEq, Eq, Hash)] - pub struct $wrapper_type(pub $inner_type); - - impl_hex_serde!($wrapper_type); impl From<$inner_type> for $wrapper_type { fn from(value: $inner_type) -> Self { @@ -71,15 +89,9 @@ macro_rules! generate_byte_type_wrapper { } } - impl From<[u8; $byte_size]> for $wrapper_type { - fn from(value: [u8; $byte_size]) -> Self { - $wrapper_type(<$inner_type>::from(value)) - } - } - impl From<&$inner_type> for $wrapper_type { fn from(value: &$inner_type) -> Self { - $wrapper_type(*value) + $wrapper_type(value.clone()) } } @@ -89,27 +101,6 @@ macro_rules! generate_byte_type_wrapper { } } - impl std::str::FromStr for $wrapper_type { - type Err = String; - - fn from_str(s: &str) -> Result { - let s = s.strip_prefix("0x").unwrap_or(s); - if s.len() != std::mem::size_of::<$inner_type>() * 2 { - panic!("Invalid length for {}", stringify!($wrapper_type)); - } - let mut inner = <$inner_type>::zeroed(); - for (i, chunk) in s.as_bytes().chunks(2).enumerate() { - let byte = u8::from_str_radix( - std::str::from_utf8(chunk).unwrap(), - 16, - ) - .unwrap(); - inner.as_mut()[i] = byte; - } - Ok($wrapper_type(inner)) - } - } - impl From<&str> for $wrapper_type { fn from(s: &str) -> Self { s.parse().unwrap_or_else(|e| { @@ -152,6 +143,61 @@ macro_rules! generate_byte_type_wrapper { }; } +macro_rules! generate_byte_type_wrapper { + // Pattern with byte_size specified + ($wrapper_type:ident, $inner_type:ty, $byte_size:expr) => { + common_wrapper_type!($wrapper_type, $inner_type); + + impl From<[u8; $byte_size]> for $wrapper_type { + fn from(value: [u8; $byte_size]) -> Self { + $wrapper_type(<$inner_type>::from(value)) + } + } + + impl std::str::FromStr for $wrapper_type { + type Err = String; + + fn from_str(s: &str) -> Result { + let s = s.strip_prefix("0x").unwrap_or(s); + if s.len() != std::mem::size_of::<$inner_type>() * 2 { + return Err(format!( + "Invalid length for {}, expected {} characters", + stringify!($wrapper_type), + std::mem::size_of::<$inner_type>() * 2 + )); + } + let bytes = hex::decode(s).map_err(|e| { + format!("Failed to decode hex string: {}", e) + })?; + let array: [u8; $byte_size] = bytes + .try_into() + .map_err(|_| "Invalid byte length".to_string())?; + Ok($wrapper_type(<$inner_type>::from(array))) + } + } + }; + + ($wrapper_type:ident, $inner_type:ty) => { + common_wrapper_type!($wrapper_type, $inner_type); + + impl From> for $wrapper_type { + fn from(value: Vec) -> Self { + $wrapper_type(<$inner_type>::from(value)) + } + } + impl std::str::FromStr for $wrapper_type { + type Err = String; + fn from_str(s: &str) -> Result { + let s = s.strip_prefix("0x").unwrap_or(s); + let bytes = hex::decode(s).map_err(|e| { + format!("Failed to decode hex string: {}", e) + })?; + Ok($wrapper_type(bytes.into())) + } + } + }; +} + generate_byte_type_wrapper!(Address, fuel_types::Address, 32); generate_byte_type_wrapper!(Bytes32, fuel_types::Bytes32, 32); generate_byte_type_wrapper!(ContractId, fuel_types::ContractId, 32); @@ -163,6 +209,7 @@ generate_byte_type_wrapper!(MessageId, fuel_types::MessageId, 32); generate_byte_type_wrapper!(BlockId, fuel_types::Bytes32, 32); generate_byte_type_wrapper!(Signature, fuel_types::Bytes64, 64); generate_byte_type_wrapper!(TxId, fuel_types::TxId, 32); +generate_byte_type_wrapper!(HexData, LongBytes); /// Implements bidirectional conversions between `Bytes32` and a given type. /// @@ -221,53 +268,6 @@ impl From for BlockId { } } -#[derive(Clone, Debug, Default, PartialEq, Eq, Hash)] -pub struct HexString(pub Vec); -impl_hex_serde!(HexString); - -impl From<&[u8]> for HexString { - fn from(value: &[u8]) -> Self { - HexString(value.to_vec()) - } -} -impl From for HexString { - fn from(value: Bytes32) -> Self { - Self::from(value.0.as_ref()) - } -} -impl TryFrom for Bytes32 { - type Error = String; - fn try_from(value: HexString) -> Result { - let bytes: [u8; 32] = value - .0 - .try_into() - .map_err(|_| "Invalid length for Bytes32".to_string())?; - Ok(Bytes32::from(bytes)) - } -} -impl std::fmt::Display for HexString { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "0x{}", hex::encode(&self.0)) - } -} -impl std::str::FromStr for HexString { - type Err = String; - fn from_str(s: &str) -> Result { - let s = s.strip_prefix("0x").unwrap_or(s); - hex::decode(s).map(HexString).map_err(|e| e.to_string()) - } -} -impl AsRef<[u8]> for HexString { - fn as_ref(&self) -> &[u8] { - &self.0 - } -} -impl HexString { - pub fn zeroed() -> Self { - HexString(vec![0u8; 32]) - } -} - #[derive( Debug, Default, @@ -281,7 +281,6 @@ impl HexString { Deserialize, Serialize, )] -#[serde(rename_all = "camelCase")] pub struct TxPointer { block_height: FuelCoreBlockHeight, tx_index: u16, @@ -299,12 +298,11 @@ impl From for TxPointer { #[derive( Debug, Default, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, )] -#[serde(rename_all = "camelCase")] pub struct UtxoId { pub tx_id: Bytes32, pub output_index: u16, } -impl From<&UtxoId> for HexString { +impl From<&UtxoId> for HexData { fn from(value: &UtxoId) -> Self { value.to_owned().into() } @@ -322,12 +320,12 @@ impl From<&FuelCoreUtxoId> for UtxoId { } } } -impl From for HexString { +impl From for HexData { fn from(value: UtxoId) -> Self { let mut bytes = Vec::with_capacity(34); bytes.extend_from_slice(value.tx_id.0.as_ref()); bytes.extend_from_slice(&value.output_index.to_be_bytes()); - HexString(bytes) + HexData(bytes.into()) } } diff --git a/crates/fuel-streams-core/src/receipts/mod.rs b/crates/fuel-streams-core/src/receipts/mod.rs index c2ba4f4a..6779f6b1 100644 --- a/crates/fuel-streams-core/src/receipts/mod.rs +++ b/crates/fuel-streams-core/src/receipts/mod.rs @@ -4,9 +4,11 @@ pub mod types; pub use subjects::*; use super::types::*; -use crate::{StreamEncoder, Streamable}; +use crate::{DataEncoder, StreamError, Streamable}; -impl StreamEncoder for Receipt {} +impl DataEncoder for Receipt { + type Err = StreamError; +} impl Streamable for Receipt { const NAME: &'static str = "receipts"; const WILDCARD_LIST: &'static [&'static str] = &[ diff --git a/crates/fuel-streams-core/src/receipts/types.rs b/crates/fuel-streams-core/src/receipts/types.rs index 565185d8..2f9f5f45 100644 --- a/crates/fuel-streams-core/src/receipts/types.rs +++ b/crates/fuel-streams-core/src/receipts/types.rs @@ -54,7 +54,6 @@ pub struct ReturnDataReceipt { pub digest: Bytes32, pub pc: Word, pub is: Word, - #[serde(skip_serializing_if = "Option::is_none")] pub data: Option>, } @@ -65,7 +64,6 @@ pub struct PanicReceipt { pub reason: PanicInstruction, pub pc: Word, pub is: Word, - #[serde(skip_serializing_if = "Option::is_none")] pub contract_id: Option, } @@ -101,7 +99,6 @@ pub struct LogDataReceipt { pub digest: Bytes32, pub pc: Word, pub is: Word, - #[serde(skip_serializing_if = "Option::is_none")] pub data: Option>, } @@ -143,7 +140,6 @@ pub struct MessageOutReceipt { pub nonce: Nonce, pub len: Word, pub digest: Bytes32, - #[serde(skip_serializing_if = "Option::is_none")] pub data: Option>, } diff --git a/crates/fuel-streams-core/src/stream/error.rs b/crates/fuel-streams-core/src/stream/error.rs index 168155da..03625d9d 100644 --- a/crates/fuel-streams-core/src/stream/error.rs +++ b/crates/fuel-streams-core/src/stream/error.rs @@ -8,6 +8,7 @@ use async_nats::{ }, }; use displaydoc::Display as DisplayDoc; +use fuel_data_parser::DataParserError; use thiserror::Error; #[derive(Error, DisplayDoc, Debug)] @@ -18,31 +19,24 @@ pub enum StreamError { #[source] source: error::Error, }, - - /// Failed to publish to S3: {0} - S3PublishError(#[from] fuel_streams_storage::s3::S3ClientError), - + /// Failed to publish to storage: {0} + Storage(#[from] fuel_streams_storage::StorageError), /// Failed to retrieve last published message from stream: {0} GetLastPublishedFailed(#[from] error::Error), - /// Failed to create Key-Value Store: {0} StoreCreation(#[from] error::Error), - /// Failed to publish item to Key-Value Store: {0} StorePublish(#[from] PutError), - /// Failed to subscribe to subject in Key-Value Store: {0} StoreSubscribe(#[from] error::Error), - /// Failed to publish item to stream: {0} StreamPublish(#[from] CreateError), - /// Failed to create stream: {0} StreamCreation(#[from] error::Error), - /// Failed to create consumer for stream: {0} ConsumerCreate(#[from] error::Error), - /// Failed to consume messages from stream: {0} ConsumerMessages(#[from] error::Error), + /// Failed to encode or decode data: {0} + Encoder(#[from] DataParserError), } diff --git a/crates/fuel-streams-core/src/stream/fuel_streams.rs b/crates/fuel-streams-core/src/stream/fuel_streams.rs index 5e8781ba..e2ef4b7b 100644 --- a/crates/fuel-streams-core/src/stream/fuel_streams.rs +++ b/crates/fuel-streams-core/src/stream/fuel_streams.rs @@ -57,31 +57,31 @@ impl FuelStreamsUtils { impl FuelStreams { pub async fn new( nats_client: &NatsClient, - s3_client: &Arc, + storage: &Arc, ) -> Self { Self { - transactions: Stream::::new(nats_client, s3_client) + transactions: Stream::::new(nats_client, storage) .await, - blocks: Stream::::new(nats_client, s3_client).await, - inputs: Stream::::new(nats_client, s3_client).await, - outputs: Stream::::new(nats_client, s3_client).await, - receipts: Stream::::new(nats_client, s3_client).await, - utxos: Stream::::new(nats_client, s3_client).await, - logs: Stream::::new(nats_client, s3_client).await, + blocks: Stream::::new(nats_client, storage).await, + inputs: Stream::::new(nats_client, storage).await, + outputs: Stream::::new(nats_client, storage).await, + receipts: Stream::::new(nats_client, storage).await, + utxos: Stream::::new(nats_client, storage).await, + logs: Stream::::new(nats_client, storage).await, } } pub async fn setup_all( core_client: &NatsClient, publisher_client: &NatsClient, - s3_client: &Arc, + storage: &Arc, ) -> (Self, Self) { - let core_stream = Self::new(core_client, s3_client).await; - let publisher_stream = Self::new(publisher_client, s3_client).await; + let core_stream = Self::new(core_client, storage).await; + let publisher_stream = Self::new(publisher_client, storage).await; (core_stream, publisher_stream) } - pub async fn subscribe( + pub async fn subscribe_raw( &self, sub_subject: &str, subscription_config: Option, @@ -122,7 +122,6 @@ pub trait FuelStreamsExt: Sync + Send { fn logs(&self) -> &Stream; async fn get_last_published_block(&self) -> anyhow::Result>; - async fn get_consumers_and_state( &self, ) -> Result, StreamState)>, RequestErrorKind>; diff --git a/crates/fuel-streams-core/src/stream/mod.rs b/crates/fuel-streams-core/src/stream/mod.rs index a204e8d1..028c58b6 100644 --- a/crates/fuel-streams-core/src/stream/mod.rs +++ b/crates/fuel-streams-core/src/stream/mod.rs @@ -1,9 +1,7 @@ mod error; mod fuel_streams; -mod stream_encoding; mod stream_impl; pub use error::*; pub use fuel_streams::*; -pub use stream_encoding::*; pub use stream_impl::*; diff --git a/crates/fuel-streams-core/src/stream/stream_encoding.rs b/crates/fuel-streams-core/src/stream/stream_encoding.rs deleted file mode 100644 index 31c7e0b2..00000000 --- a/crates/fuel-streams-core/src/stream/stream_encoding.rs +++ /dev/null @@ -1,83 +0,0 @@ -use std::fmt::Debug; - -use async_trait::async_trait; -use fuel_data_parser::{DataParseable, DataParser}; -use serde::{Deserialize, Serialize}; - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct StreamData { - pub subject: String, - pub timestamp: String, - /// The payload published for the subject - pub payload: T, -} - -impl StreamData -where - T: serde::de::DeserializeOwned + Clone, -{ - pub fn new(subject: &str, payload: T) -> Self { - let now: chrono::DateTime = chrono::Utc::now(); - // Formatting the datetime as an ISO 8601 string - let timestamp = now.to_rfc3339_opts(chrono::SecondsFormat::Secs, true); - Self { - subject: subject.to_string(), - timestamp, - payload, - } - } - - #[cfg(feature = "bench-helpers")] - pub fn ts_as_millis(&self) -> u128 { - use chrono::{DateTime, Utc}; - - DateTime::parse_from_rfc3339(&self.timestamp) - .ok() - .map(|ts| ts.timestamp_millis() as u128) - .unwrap_or_else(|| Utc::now().timestamp_millis() as u128) - } -} - -#[async_trait] -pub trait StreamEncoder: DataParseable { - // TODO: Should we remove the `StreamData` type and encode/decode the raw data only - fn encode(&self, subject: &str) -> Vec { - let data = StreamData::new(subject, self.clone()); - - Self::data_parser() - .encode_json(&data) - .expect("Streamable must encode correctly") - } - - fn encode_self(&self) -> Vec { - Self::data_parser() - .encode_json(self) - .expect("Streamable must encode correctly") - } - - fn decode(encoded: Vec) -> Result { - Ok(Self::decode_raw(encoded)?.payload) - } - - fn decode_or_panic(encoded: Vec) -> Self { - Self::decode_raw(encoded) - .expect("Streamable must decode correctly") - .payload - } - - fn decode_raw( - encoded: Vec, - ) -> Result, fuel_data_parser::Error> { - Self::data_parser().decode_json(&encoded) - } - - fn decode_raw_or_panic(encoded: Vec) -> StreamData { - Self::data_parser() - .decode_json(&encoded) - .expect("Streamable must decode correctly") - } - - fn data_parser() -> DataParser { - DataParser::default() - } -} diff --git a/crates/fuel-streams-core/src/stream/stream_impl.rs b/crates/fuel-streams-core/src/stream/stream_impl.rs index 68d57d42..8e376904 100644 --- a/crates/fuel-streams-core/src/stream/stream_impl.rs +++ b/crates/fuel-streams-core/src/stream/stream_impl.rs @@ -1,4 +1,4 @@ -use std::{fmt::Debug, sync::Arc}; +use std::sync::Arc; use async_nats::{ jetstream::{ @@ -31,10 +31,13 @@ impl PublishPacket { pub fn get_s3_path(&self) -> String { let subject = self.subject.parse(); - subject.replace('.', "/").to_string() + format!("{}.json.zstd", subject.replace('.', "/")) } } +pub trait StreamEncoder: DataEncoder {} +impl> StreamEncoder for T {} + /// Trait for types that can be streamed. /// /// # Examples @@ -42,13 +45,16 @@ impl PublishPacket { /// ```no_run /// use async_trait::async_trait; /// use fuel_streams_core::prelude::*; +/// use fuel_data_parser::*; /// /// #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] /// struct MyStreamable { /// data: String, /// } /// -/// impl StreamEncoder for MyStreamable {} +/// impl DataEncoder for MyStreamable { +/// type Err = StreamError; +/// } /// /// #[async_trait] /// impl Streamable for MyStreamable { @@ -74,6 +80,7 @@ pub trait Streamable: StreamEncoder + std::marker::Sized { /// use std::sync::Arc; /// use fuel_streams_core::prelude::*; /// use fuel_streams_macros::subject::IntoSubject; +/// use fuel_data_parser::*; /// use futures::StreamExt; /// /// #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] @@ -81,7 +88,9 @@ pub trait Streamable: StreamEncoder + std::marker::Sized { /// data: String, /// } /// -/// impl StreamEncoder for MyStreamable {} +/// impl DataEncoder for MyStreamable { +/// type Err = StreamError; +/// } /// /// #[async_trait::async_trait] /// impl Streamable for MyStreamable { @@ -89,8 +98,8 @@ pub trait Streamable: StreamEncoder + std::marker::Sized { /// const WILDCARD_LIST: &'static [&'static str] = &["*"]; /// } /// -/// async fn example(nats_client: &NatsClient, s3_client: &Arc) { -/// let stream = Stream::::new(nats_client, s3_client).await; +/// async fn example(nats_client: &NatsClient, storage: &Arc) { +/// let stream = Stream::::new(nats_client, storage).await; /// /// // Publish /// let subject = BlocksSubject::new().with_height(Some(23.into())).arc(); @@ -110,7 +119,7 @@ pub trait Streamable: StreamEncoder + std::marker::Sized { #[derive(Debug, Clone)] pub struct Stream { store: Arc, - s3_client: Arc, + storage: Arc, _marker: std::marker::PhantomData, } @@ -120,11 +129,11 @@ impl Stream { pub async fn get_or_init( nats_client: &NatsClient, - s3_client: &Arc, + storage: &Arc, ) -> Self { let cell = Self::INSTANCE; cell.get_or_init(|| async { - Self::new(nats_client, s3_client).await.to_owned() + Self::new(nats_client, storage).await.to_owned() }) .await .to_owned() @@ -132,7 +141,7 @@ impl Stream { pub async fn new( nats_client: &NatsClient, - s3_client: &Arc, + storage: &Arc, ) -> Self { let namespace = &nats_client.namespace; let bucket_name = namespace.stream_name(S::NAME); @@ -140,7 +149,6 @@ impl Stream { bucket: bucket_name.to_owned(), storage: stream::StorageType::File, history: 1, - compression: true, ..Default::default() }; @@ -151,7 +159,7 @@ impl Stream { Self { store: Arc::new(store), - s3_client: Arc::clone(s3_client), + storage: Arc::clone(storage), _marker: std::marker::PhantomData, } } @@ -163,11 +171,8 @@ impl Stream { let payload = &packet.payload; let s3_path = packet.get_s3_path(); let subject_name = &packet.subject.parse(); - - self.s3_client - .put_object(&s3_path, payload.encode(subject_name)) - .await?; - + let encoded = payload.encode().await?; + self.storage.store(&s3_path, encoded).await?; self.publish_s3_path_to_nats(subject_name, &s3_path).await } @@ -212,7 +217,6 @@ impl Stream { self.store.stream_name.as_str() } - // Less performant due to our hybrid use of NATS and S3 pub async fn subscribe_raw( &self, subscription_config: Option, @@ -220,25 +224,19 @@ impl Stream { let config = self.get_consumer_config(subscription_config); let config = self.prefix_filter_subjects(config); let consumer = self.store.stream.create_consumer(config).await?; - - Ok(consumer - .messages() - .await? - .then(|message| { - let s3_client = Arc::clone(&self.s3_client); - + let messages = consumer.messages().await?; + let storage = Arc::clone(&self.storage); + + Ok(messages + .then(move |message| { + let nats_payload = + message.expect("Message must be valid").payload.to_vec(); + let storage = storage.clone(); async move { - let nats_payload = message - .expect("Message must be valid") - .payload - .to_vec(); - - // TODO: Bubble up the error to users let s3_path = String::from_utf8(nats_payload) .expect("Must be S3 path"); - - s3_client - .get_object(&s3_path) + storage + .retrieve(&s3_path) .await .expect("S3 object must exist") } @@ -250,31 +248,10 @@ impl Stream { &self, subscription_config: Option, ) -> Result, StreamError> { - let config = self.get_consumer_config(subscription_config); - let config = self.prefix_filter_subjects(config); - let consumer = self.store.stream.create_consumer(config).await?; - - Ok(consumer - .messages() - .await? - .map(|item| { - String::from_utf8( - item.expect("Message must be valid").payload.to_vec(), - ) - .expect("Must be S3 path") - }) - .then(|s3_path| { - let s3_client = Arc::clone(&self.s3_client); - - async move { - // TODO: Bubble up the error? - S::decode_or_panic( - s3_client - .get_object(&s3_path) - .await - .expect("Could not get S3 object"), - ) - } + let raw_stream = self.subscribe_raw(subscription_config).await?; + Ok(raw_stream + .then(|s3_data| async move { + S::decode(&s3_data).await.expect("Failed to decode") }) .boxed()) } @@ -386,12 +363,11 @@ impl Stream { ) -> Result { let s3_path = String::from_utf8(nats_payload).expect("Must be S3 path"); let s3_object = self - .s3_client - .get_object(&s3_path) + .storage + .retrieve(&s3_path) .await .expect("S3 object must exist"); - - Ok(S::decode_or_panic(s3_object)) + Ok(S::decode(&s3_object).await.expect("Failed to decode")) } #[cfg(any(test, feature = "test-helpers"))] @@ -452,3 +428,86 @@ pub struct SubscriptionConfig { pub filter_subjects: Vec, pub deliver_policy: NatsDeliverPolicy, } + +#[cfg(any(test, feature = "test-helpers"))] +mod tests { + use serde::{Deserialize, Serialize}; + + use super::*; + + #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] + struct TestStreamable { + data: String, + } + + impl DataEncoder for TestStreamable { + type Err = StreamError; + } + + #[async_trait] + impl Streamable for TestStreamable { + const NAME: &'static str = "test_streamable"; + const WILDCARD_LIST: &'static [&'static str] = &["*"]; + } + + #[tokio::test] + async fn test_stream_item_s3_encoding_flow() { + let (stream, _, test_data, subject) = setup_test().await; + let packet = test_data.to_packet(subject); + + // Publish (this will encode and store in S3) + stream.publish(&packet).await.unwrap(); + + // Get the S3 path that was used + let s3_path = packet.get_s3_path(); + + // Retrieve directly from S3 and verify encoding + let raw_s3_data = stream.storage.retrieve(&s3_path).await.unwrap(); + let decoded = TestStreamable::decode(&raw_s3_data).await.unwrap(); + assert_eq!(decoded, test_data, "Retrieved data should match original"); + } + + #[tokio::test] + async fn test_stream_item_json_encoding_flow() { + use fuel_data_parser::DataParser; + let (_, _, test_data, _) = setup_test().await; + let encoded = test_data.encode().await.unwrap(); + let decoded = TestStreamable::decode(&encoded).await.unwrap(); + assert_eq!(decoded, test_data, "Decoded data should match original"); + + let json = DataParser::default().encode_json(&test_data).unwrap(); + let json_str = String::from_utf8(json).unwrap(); + let expected_json = r#"{"data":"test content"}"#; + assert_eq!( + json_str, expected_json, + "JSON structure should exactly match expected format" + ); + } + + #[cfg(test)] + async fn setup_test() -> ( + Stream, + Arc, + TestStreamable, + Arc, + ) { + let storage = S3Storage::new_for_testing().await.unwrap(); + let nats_client_opts = + NatsClientOpts::admin_opts().with_rdn_namespace(); + let nats_client = NatsClient::connect(&nats_client_opts).await.unwrap(); + let stream = Stream::::new( + &nats_client, + &Arc::new(storage.clone()), + ) + .await; + let test_data = TestStreamable { + data: "test content".to_string(), + }; + let subject = Arc::new( + BlocksSubject::new() + .with_producer(Some(Address::zeroed())) + .with_height(Some(1.into())), + ); + (stream, Arc::new(storage), test_data, subject) + } +} diff --git a/crates/fuel-streams-core/src/transactions/mod.rs b/crates/fuel-streams-core/src/transactions/mod.rs index 8211ae26..2094d796 100644 --- a/crates/fuel-streams-core/src/transactions/mod.rs +++ b/crates/fuel-streams-core/src/transactions/mod.rs @@ -4,9 +4,11 @@ pub mod types; pub use subjects::*; use super::types::*; -use crate::{StreamEncoder, Streamable}; +use crate::{DataEncoder, StreamError, Streamable}; -impl StreamEncoder for Transaction {} +impl DataEncoder for Transaction { + type Err = StreamError; +} impl Streamable for Transaction { const NAME: &'static str = "transactions"; const WILDCARD_LIST: &'static [&'static str] = &[ diff --git a/crates/fuel-streams-core/src/transactions/types.rs b/crates/fuel-streams-core/src/transactions/types.rs index adeb64fc..bb536110 100644 --- a/crates/fuel-streams-core/src/transactions/types.rs +++ b/crates/fuel-streams-core/src/transactions/types.rs @@ -5,8 +5,8 @@ use crate::types::*; #[derive(Debug, Default, Clone, PartialEq, Serialize, Deserialize)] pub struct StorageSlot { - pub key: HexString, - pub value: HexString, + pub key: HexData, + pub value: HexData, } impl From for StorageSlot { @@ -18,8 +18,8 @@ impl From for StorageSlot { impl From<&FuelCoreStorageSlot> for StorageSlot { fn from(slot: &FuelCoreStorageSlot) -> Self { Self { - key: slot.key().as_slice().into(), - value: slot.value().as_slice().into(), + key: HexData(slot.key().as_slice().into()), + value: HexData(slot.value().as_slice().into()), } } } @@ -50,11 +50,11 @@ pub struct Transaction { pub mint_gas_price: Option, pub policies: Option, pub proof_set: Vec, - pub raw_payload: HexString, + pub raw_payload: HexData, pub receipts_root: Option, pub salt: Option, - pub script: Option, - pub script_data: Option, + pub script: Option, + pub script_data: Option, pub script_gas_limit: Option, pub status: TransactionStatus, pub storage_slots: Vec, @@ -62,7 +62,7 @@ pub struct Transaction { pub subsections_number: Option, pub tx_pointer: Option, pub upgrade_purpose: Option, - pub witnesses: Vec, + pub witnesses: Vec, pub receipts: Vec, } @@ -238,7 +238,7 @@ impl Transaction { let raw_payload = { use fuel_core_types::fuel_types::canonical::Serialize; - HexString(transaction.to_bytes()) + HexData(transaction.to_bytes().into()) }; let receipts_root = { @@ -265,7 +265,7 @@ impl Transaction { use fuel_core_types::fuel_tx::field::Script; match transaction { FuelCoreTransaction::Script(script) => { - Some(HexString(script.script().clone())) + Some(HexData(script.script().clone().into())) } _ => None, } @@ -275,7 +275,7 @@ impl Transaction { use fuel_core_types::fuel_tx::field::ScriptData; match transaction { FuelCoreTransaction::Script(script) => { - Some(HexString(script.script_data().clone())) + Some(HexData(script.script_data().clone().into())) } _ => None, } @@ -337,35 +337,35 @@ impl Transaction { } }; - // hexstring encode should be HexString(data) + // hexstring encode should be HexData(data) let witnesses = { use fuel_core_types::fuel_tx::field::Witnesses; match transaction { FuelCoreTransaction::Script(tx) => tx .witnesses() .iter() - .map(|w| HexString(w.clone().into_inner())) + .map(|w| HexData(w.clone().into_inner().into())) .collect(), FuelCoreTransaction::Create(tx) => tx .witnesses() .iter() - .map(|w| HexString(w.clone().into_inner())) + .map(|w| HexData(w.clone().into_inner().into())) .collect(), FuelCoreTransaction::Mint(_) => vec![], FuelCoreTransaction::Upgrade(tx) => tx .witnesses() .iter() - .map(|w| HexString(w.clone().into_inner())) + .map(|w| HexData(w.clone().into_inner().into())) .collect(), FuelCoreTransaction::Upload(tx) => tx .witnesses() .iter() - .map(|w| HexString(w.clone().into_inner())) + .map(|w| HexData(w.clone().into_inner().into())) .collect(), FuelCoreTransaction::Blob(tx) => tx .witnesses() .iter() - .map(|w| HexString(w.clone().into_inner())) + .map(|w| HexData(w.clone().into_inner().into())) .collect(), } }; @@ -412,7 +412,7 @@ impl Transaction { } #[derive(Debug, Default, Clone, PartialEq, Serialize, Deserialize)] -#[serde(rename_all = "PascalCase")] +#[serde(tag = "type")] pub enum TransactionKind { #[default] Create, @@ -462,6 +462,7 @@ impl MockTransaction { } #[derive(Debug, Default, Clone, PartialEq, Serialize, Deserialize)] +#[serde(tag = "status")] pub enum TransactionStatus { Failed, Submitted, diff --git a/crates/fuel-streams-core/src/utxos/mod.rs b/crates/fuel-streams-core/src/utxos/mod.rs index 46744a53..f52e7414 100644 --- a/crates/fuel-streams-core/src/utxos/mod.rs +++ b/crates/fuel-streams-core/src/utxos/mod.rs @@ -4,9 +4,11 @@ pub mod types; pub use subjects::*; use super::types::*; -use crate::{StreamEncoder, Streamable}; +use crate::{DataEncoder, StreamError, Streamable}; -impl StreamEncoder for Utxo {} +impl DataEncoder for Utxo { + type Err = StreamError; +} impl Streamable for Utxo { const NAME: &'static str = "utxos"; const WILDCARD_LIST: &'static [&'static str] = &[UtxosSubject::WILDCARD]; diff --git a/crates/fuel-streams-core/src/utxos/subjects.rs b/crates/fuel-streams-core/src/utxos/subjects.rs index 2d89d4c7..eec48b98 100644 --- a/crates/fuel-streams-core/src/utxos/subjects.rs +++ b/crates/fuel-streams-core/src/utxos/subjects.rs @@ -16,7 +16,7 @@ use crate::types::*; /// # use fuel_streams_core::types::*; /// # use fuel_streams_macros::subject::*; /// let subject = UtxosSubject { -/// utxo_id: Some(HexString::zeroed()), +/// utxo_id: Some(HexData::zeroed()), /// utxo_type: Some(UtxoType::Message), /// }; /// assert_eq!( @@ -40,7 +40,7 @@ use crate::types::*; /// # use fuel_streams_core::types::*; /// # use fuel_streams_macros::subject::*; /// let wildcard = UtxosSubject::wildcard( -/// Some(HexString::zeroed()), +/// Some(HexData::zeroed()), /// None, /// ); /// assert_eq!(wildcard, "utxos.*.0x0000000000000000000000000000000000000000000000000000000000000000"); @@ -53,7 +53,7 @@ use crate::types::*; /// # use fuel_streams_core::types::*; /// # use fuel_streams_macros::subject::*; /// let subject = UtxosSubject::new() -/// .with_utxo_id(Some(HexString::zeroed())) +/// .with_utxo_id(Some(HexData::zeroed())) /// .with_utxo_type(Some(UtxoType::Message)); /// assert_eq!(subject.parse(), "utxos.message.0x0000000000000000000000000000000000000000000000000000000000000000"); /// ``` @@ -62,7 +62,7 @@ use crate::types::*; #[subject_wildcard = "utxos.>"] #[subject_format = "utxos.{utxo_type}.{utxo_id}"] pub struct UtxosSubject { - pub utxo_id: Option, + pub utxo_id: Option, pub utxo_type: Option, } @@ -80,7 +80,7 @@ mod tests { #[test] fn test_utxos_message_subject_creation() { let utxo_subject = UtxosSubject::new() - .with_utxo_id(Some(HexString::zeroed())) + .with_utxo_id(Some(HexData::zeroed())) .with_utxo_type(Some(UtxoType::Message)); assert_eq!( utxo_subject.to_string(), @@ -91,7 +91,7 @@ mod tests { #[test] fn test_utxos_coin_subject_creation() { let utxo_subject = UtxosSubject::new() - .with_utxo_id(Some(HexString::zeroed())) + .with_utxo_id(Some(HexData::zeroed())) .with_utxo_type(Some(UtxoType::Coin)); assert_eq!( utxo_subject.to_string(), @@ -102,7 +102,7 @@ mod tests { #[test] fn test_utxos_contract_subject_creation() { let utxo_subject = UtxosSubject::new() - .with_utxo_id(Some(HexString::zeroed())) + .with_utxo_id(Some(HexData::zeroed())) .with_utxo_type(Some(UtxoType::Contract)); assert_eq!( utxo_subject.to_string(), diff --git a/crates/fuel-streams-core/src/utxos/types.rs b/crates/fuel-streams-core/src/utxos/types.rs index 1fd8daec..4c7dcac7 100644 --- a/crates/fuel-streams-core/src/utxos/types.rs +++ b/crates/fuel-streams-core/src/utxos/types.rs @@ -7,9 +7,9 @@ pub struct Utxo { pub sender: Option
, pub recipient: Option
, pub nonce: Option, - pub data: Option, + pub data: Option, pub amount: Option, - pub tx_id: Bytes32, + pub tx_id: TxId, } #[derive(Debug, Clone, Default)] diff --git a/crates/fuel-streams-executors/Cargo.toml b/crates/fuel-streams-executors/Cargo.toml index 7b70773d..98c0f71b 100644 --- a/crates/fuel-streams-executors/Cargo.toml +++ b/crates/fuel-streams-executors/Cargo.toml @@ -13,8 +13,10 @@ publish = false [dependencies] anyhow = { workspace = true } -async-nats = { workspace = true } +displaydoc = { workspace = true } +dotenvy = { workspace = true } fuel-core = { workspace = true } +fuel-data-parser = { workspace = true, features = ["test-helpers"] } fuel-streams-core = { workspace = true, features = ["test-helpers"] } futures = { workspace = true } num_cpus = { workspace = true } diff --git a/crates/fuel-streams-executors/src/inputs.rs b/crates/fuel-streams-executors/src/inputs.rs index 0c201b3b..33394171 100644 --- a/crates/fuel-streams-executors/src/inputs.rs +++ b/crates/fuel-streams-executors/src/inputs.rs @@ -115,7 +115,7 @@ pub fn identifiers( match input { Input::Coin(InputCoin { predicate, .. }) | Input::Message(InputMessage { predicate, .. }) => { - let predicate_tag = super::sha256(&predicate.0); + let predicate_tag = super::sha256(&predicate.0 .0); identifiers.push(Identifier::PredicateID( tx_id.to_owned(), index, diff --git a/crates/fuel-streams-executors/src/lib.rs b/crates/fuel-streams-executors/src/lib.rs index a227758d..d5fa6a36 100644 --- a/crates/fuel-streams-executors/src/lib.rs +++ b/crates/fuel-streams-executors/src/lib.rs @@ -7,12 +7,12 @@ pub mod transactions; pub mod utxos; use std::{ - env, marker::PhantomData, sync::{Arc, LazyLock}, }; -use async_nats::jetstream::context::Publish; +use displaydoc::Display as DisplayDoc; +use fuel_data_parser::DataParserError; use fuel_streams_core::prelude::*; use serde::{Deserialize, Serialize}; use sha2::{Digest, Sha256}; @@ -20,7 +20,7 @@ use tokio::task::JoinHandle; pub static PUBLISHER_MAX_THREADS: LazyLock = LazyLock::new(|| { let available_cpus = num_cpus::get(); - env::var("PUBLISHER_MAX_THREADS") + dotenvy::var("PUBLISHER_MAX_THREADS") .ok() .and_then(|val| val.parse().ok()) .unwrap_or(available_cpus) @@ -38,20 +38,22 @@ pub fn sha256(bytes: &[u8]) -> Bytes32 { bytes.into() } -#[derive(Debug, thiserror::Error)] +#[derive(Debug, thiserror::Error, DisplayDoc)] pub enum ExecutorError { - #[error("Failed to publish: {0}")] + /// Failed to publish: {0} PublishFailed(String), - #[error("Failed to acquire semaphore: {0}")] + /// Failed to acquire semaphore: {0} SemaphoreError(#[from] tokio::sync::AcquireError), - #[error("Failed to serialize block payload: {0}")] + /// Failed to serialize block payload: {0} Serialization(#[from] serde_json::Error), - #[error("Failed to fetch transaction status: {0}")] + /// Failed to fetch transaction status: {0} TransactionStatus(String), - #[error("Failed to access offchain database")] + /// Failed to access offchain database: {0} OffchainDatabase(#[from] anyhow::Error), - #[error("Failed to join tasks: {0}")] + /// Failed to join tasks: {0} JoinError(#[from] tokio::task::JoinError), + /// Failed to encode or decode data: {0} + Encoder(#[from] DataParserError), } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -90,6 +92,10 @@ pub struct BlockPayload { metadata: Metadata, } +impl DataEncoder for BlockPayload { + type Err = ExecutorError; +} + impl BlockPayload { pub fn new( fuel_core: Arc, @@ -113,14 +119,6 @@ impl BlockPayload { }) } - pub fn encode(&self) -> Result { - serde_json::to_string(self).map_err(ExecutorError::from) - } - - pub fn decode(json: &str) -> Result { - serde_json::from_str(json).map_err(ExecutorError::from) - } - pub fn tx_ids(&self) -> Vec { self.transactions .iter() @@ -179,16 +177,6 @@ impl BlockPayload { } } -impl TryFrom for Publish { - type Error = ExecutorError; - fn try_from(payload: BlockPayload) -> Result { - let message_id = payload.message_id(); - Ok(Publish::build() - .message_id(message_id) - .payload(payload.encode()?.into())) - } -} - pub struct Executor { pub stream: Arc>, payload: Arc, diff --git a/crates/fuel-streams-executors/src/transactions.rs b/crates/fuel-streams-executors/src/transactions.rs index 396364ca..fc1f2763 100644 --- a/crates/fuel-streams-executors/src/transactions.rs +++ b/crates/fuel-streams-executors/src/transactions.rs @@ -74,7 +74,7 @@ fn identifiers( match kind { TransactionKind::Script => { let script_data = &tx.script_data.to_owned().unwrap_or_default().0; - let script_tag = sha256(script_data); + let script_tag = sha256(&script_data.0); vec![Identifier::ScriptID(tx_id.to_owned(), index, script_tag)] } _ => Vec::new(), diff --git a/crates/fuel-streams-executors/src/utxos.rs b/crates/fuel-streams-executors/src/utxos.rs index 81fdaaf2..cdd01afe 100644 --- a/crates/fuel-streams-executors/src/utxos.rs +++ b/crates/fuel-streams-executors/src/utxos.rs @@ -28,7 +28,7 @@ fn utxo_packet(input: &Input, tx_id: &Bytes32) -> Option> { Input::Contract(InputContract { utxo_id, .. }) => { let utxo = Utxo { utxo_id: utxo_id.to_owned(), - tx_id: tx_id.to_owned(), + tx_id: tx_id.to_owned().into(), ..Default::default() }; let subject = UtxosSubject { @@ -44,7 +44,7 @@ fn utxo_packet(input: &Input, tx_id: &Bytes32) -> Option> { let utxo = Utxo { utxo_id: utxo_id.to_owned(), amount: Some(*amount), - tx_id: tx_id.to_owned(), + tx_id: tx_id.to_owned().into(), ..Default::default() }; let subject = UtxosSubject { @@ -66,7 +66,7 @@ fn utxo_packet(input: &Input, tx_id: &Bytes32) -> Option> { ) => { let utxo_id = input.computed_utxo_id(); let utxo = Utxo { - tx_id: tx_id.to_owned(), + tx_id: tx_id.to_owned().into(), utxo_id: utxo_id.to_owned(), sender: Some(sender.to_owned()), recipient: Some(recipient.to_owned()), diff --git a/crates/fuel-streams-storage/Cargo.toml b/crates/fuel-streams-storage/Cargo.toml index 2aefacfb..7eb8af8d 100644 --- a/crates/fuel-streams-storage/Cargo.toml +++ b/crates/fuel-streams-storage/Cargo.toml @@ -11,19 +11,21 @@ version = { workspace = true } rust-version = { workspace = true } [dependencies] +async-trait = "0.1.83" aws-config = { version = "1.5.10", features = ["behavior-version-latest"] } aws-sdk-s3 = "1.65.0" -aws-smithy-runtime-api = "1.7.3" -aws-smithy-types = "=1.2.9" +displaydoc = { workspace = true } dotenvy = { workspace = true } rand = { workspace = true } thiserror = { workspace = true } +tokio = { workspace = true } tracing = { workspace = true } [dev-dependencies] pretty_assertions = { workspace = true } serde_json = { workspace = true } tokio = { workspace = true, features = ["rt-multi-thread", "macros", "test-util"] } +tracing-test = "0.2.0" [features] default = [] diff --git a/crates/fuel-streams-storage/src/lib.rs b/crates/fuel-streams-storage/src/lib.rs index f5cf85aa..130a9ba0 100644 --- a/crates/fuel-streams-storage/src/lib.rs +++ b/crates/fuel-streams-storage/src/lib.rs @@ -1,3 +1,9 @@ // TODO: Introduce Adapters for Transient and FileStorage (NATS and S3 clients would implement those) +pub mod retry; pub mod s3; +pub mod storage; +pub mod storage_config; + pub use s3::*; +pub use storage::*; +pub use storage_config::*; diff --git a/crates/fuel-streams-storage/src/retry.rs b/crates/fuel-streams-storage/src/retry.rs new file mode 100644 index 00000000..f50f25fb --- /dev/null +++ b/crates/fuel-streams-storage/src/retry.rs @@ -0,0 +1,120 @@ +use std::{future::Future, sync::LazyLock, time::Duration}; + +use tracing; + +pub static STORAGE_MAX_RETRIES: LazyLock = LazyLock::new(|| { + dotenvy::var("STORAGE_MAX_RETRIES") + .ok() + .and_then(|val| val.parse().ok()) + .unwrap_or(5) +}); + +#[derive(Debug, Clone)] +pub struct RetryConfig { + pub max_retries: u32, + pub initial_backoff: Duration, +} + +impl Default for RetryConfig { + fn default() -> Self { + Self { + max_retries: *STORAGE_MAX_RETRIES as u32, + initial_backoff: Duration::from_millis(100), + } + } +} + +pub async fn with_retry( + config: &RetryConfig, + operation_name: &str, + f: F, +) -> Result +where + F: Fn() -> Fut, + Fut: Future>, + E: std::fmt::Display, +{ + let mut attempt = 0; + let mut last_error = None; + while attempt < config.max_retries { + match f().await { + Ok(result) => return Ok(result), + Err(e) => { + last_error = Some(e); + attempt += 1; + + if attempt < config.max_retries { + let backoff = + config.initial_backoff * 2u32.pow(attempt - 1); + tracing::warn!( + "{} failed, attempt {}/{}: {}. Retrying in {:?}", + operation_name, + attempt, + config.max_retries, + last_error.as_ref().unwrap(), + backoff + ); + tokio::time::sleep(backoff).await; + } + } + } + } + + Err(last_error.unwrap()) +} + +#[cfg(test)] +mod tests { + use std::sync::{ + atomic::{AtomicU32, Ordering}, + Arc, + }; + + use super::*; + + #[tokio::test] + async fn test_retry_mechanism() { + let config = RetryConfig { + max_retries: 3, + initial_backoff: Duration::from_millis(10), /* Shorter duration for tests */ + }; + + let attempt_counter = Arc::new(AtomicU32::new(0)); + let counter_clone = attempt_counter.clone(); + + let result: Result<(), String> = with_retry(&config, "test", || { + let value = counter_clone.clone(); + async move { + let attempt = value.fetch_add(1, Ordering::SeqCst); + if attempt < 2 { + // Fail first two attempts + Err("Simulated failure".to_string()) + } else { + // Succeed on third attempt + Ok(()) + } + } + }) + .await; + + assert!(result.is_ok()); + assert_eq!(attempt_counter.load(Ordering::SeqCst), 3); + } + + #[tokio::test] + async fn test_retry_exhaustion() { + let config = RetryConfig { + max_retries: 3, + initial_backoff: Duration::from_millis(10), + }; + + let result: Result<(), String> = + with_retry(&config, "test", || async { + Err("Always fails".to_string()) + }) + .await; + + assert!(result.is_err()); + assert_eq!(result.unwrap_err(), "Always fails"); + } +} diff --git a/crates/fuel-streams-storage/src/s3/s3_client.rs b/crates/fuel-streams-storage/src/s3/s3_client.rs index dad2ec35..0483ee5d 100644 --- a/crates/fuel-streams-storage/src/s3/s3_client.rs +++ b/crates/fuel-streams-storage/src/s3/s3_client.rs @@ -1,289 +1,409 @@ -use aws_config::{BehaviorVersion, Region}; -use aws_sdk_s3::{ - config::http::HttpResponse, - operation::{ - create_bucket::CreateBucketError, - delete_bucket::DeleteBucketError, - delete_object::DeleteObjectError, - get_object::GetObjectError, - put_bucket_policy::PutBucketPolicyError, - put_object::PutObjectError, - put_public_access_block::PutPublicAccessBlockError, - }, - Client, +use async_trait::async_trait; +use aws_config::BehaviorVersion; +use aws_sdk_s3::Client; + +use super::s3_client_opts::S3StorageOpts; +use crate::{ + retry::{with_retry, RetryConfig}, + storage::{Storage, StorageError}, + StorageConfig, }; -use aws_smithy_runtime_api::client::result::SdkError; -use aws_smithy_types::byte_stream::error::Error as BytesStreamError; -use thiserror::Error; - -use super::s3_client_opts::S3ClientOpts; - -#[derive(Error, Debug)] -pub enum S3ClientError { - #[error("AWS SDK Create Error: {0}")] - CreateBucketError(#[from] SdkError), - #[error("AWS SDK Delete bucket Error: {0}")] - DeleteBucketError(#[from] SdkError), - #[error("AWS SDK Put Error: {0}")] - PutObjectError(#[from] SdkError), - #[error("AWS SDK Get Error: {0}")] - GetObjectError(#[from] SdkError), - #[error("Error aggregating bytes from S3: {0}")] - BuildObjectAfterGettingError(#[from] BytesStreamError), - #[error("AWS SDK Delete object Error: {0}")] - DeleteObjectError(#[from] SdkError), - #[error("Environment variable missing: {0}")] - MissingEnvVar(String), - #[error("Failed to stream objects because: {0}")] - StreamingError(String), - #[error("Failed to put bucket policy: {0}")] - PutBucketPolicyError(#[from] SdkError), - #[error("Failed to put public access block: {0}")] - PutPublicAccessBlockError( - #[from] SdkError, - ), - #[error("IO Error: {0}")] - IoError(#[from] std::io::Error), -} #[derive(Debug, Clone)] -pub struct S3Client { +pub struct S3Storage { client: Client, - bucket: String, + config: S3StorageOpts, + retry_config: RetryConfig, } -impl S3Client { - pub async fn new(opts: &S3ClientOpts) -> Result { - let config = aws_config::defaults(BehaviorVersion::latest()) - .endpoint_url(opts.endpoint_url().to_string()) - .region(Region::new(opts.region().to_string())) - // TODO: Remove this once we have a proper S3 bucket created - // for now this is a workaround to avoid signing requests +#[async_trait] +impl Storage for S3Storage { + type Config = S3StorageOpts; + + async fn new(config: Self::Config) -> Result { + let aws_config = aws_config::defaults(BehaviorVersion::latest()) + .endpoint_url(config.endpoint_url()) + .region(config.region()) .no_credentials() .load() .await; - // Create S3 config without signing - let s3_config = aws_sdk_s3::config::Builder::from(&config) + let s3_config = aws_sdk_s3::config::Builder::from(&aws_config) .force_path_style(true) .disable_s3_express_session_auth(true) .build(); let client = aws_sdk_s3::Client::from_conf(s3_config); - let s3_client = Self { + Ok(Self { client, - bucket: opts.bucket(), - }; + config, + retry_config: RetryConfig::default(), + }) + } - Ok(s3_client) + async fn store( + &self, + key: &str, + data: Vec, + ) -> Result<(), StorageError> { + with_retry(&self.retry_config, "store", || { + let data = data.clone(); + async move { + #[allow(clippy::identity_op)] + const LARGE_FILE_THRESHOLD: usize = 1 * 1024 * 1024; // 1MB + if data.len() >= LARGE_FILE_THRESHOLD { + tracing::debug!( + "Uploading file to S3 using multipart_upload" + ); + self.upload_multipart(key, data).await + } else { + tracing::debug!("Uploading file to S3 using put_object"); + self.put_object(key, data).await + } + } + }) + .await } - pub fn arc(self) -> std::sync::Arc { - std::sync::Arc::new(self) + async fn retrieve(&self, key: &str) -> Result, StorageError> { + with_retry(&self.retry_config, "retrieve", || async { + let result = self + .client + .get_object() + .bucket(self.config.bucket()) + .key(key) + .send() + .await + .map_err(|e| StorageError::RetrieveError(e.to_string()))?; + + Ok(result + .body + .collect() + .await + .map_err(|e| StorageError::RetrieveError(e.to_string()))? + .into_bytes() + .to_vec()) + }) + .await } - pub fn client(&self) -> &Client { - &self.client + async fn delete(&self, key: &str) -> Result<(), StorageError> { + with_retry(&self.retry_config, "delete", || async { + self.client + .delete_object() + .bucket(self.config.bucket()) + .key(key) + .send() + .await + .map_err(|e| StorageError::DeleteError(e.to_string()))?; + Ok(()) + }) + .await } +} - pub fn bucket(&self) -> &str { - &self.bucket +impl S3Storage { + pub async fn create_bucket(&self) -> Result<(), StorageError> { + self.client + .create_bucket() + .bucket(self.config.bucket()) + .send() + .await + .map_err(|e| StorageError::StoreError(e.to_string()))?; + Ok(()) } - pub async fn put_object( + async fn put_object( &self, key: &str, object: Vec, - ) -> Result<(), S3ClientError> { - match self - .client + ) -> Result<(), StorageError> { + self.client .put_object() - .bucket(&self.bucket) + .bucket(self.config.bucket()) .key(key) .body(object.into()) .send() .await - { - Ok(_) => Ok(()), - Err(error) => match error { - SdkError::ServiceError(error) => { - tracing::error!( - "Failed to put object in S3 bucket={} key={}: {}", - self.bucket, - key, - error.err() - ); - Err(S3ClientError::PutObjectError(SdkError::ServiceError( - error, - ))) - } - SdkError::ConstructionFailure(error) => { - tracing::error!( - "Failed to construct S3 request for bucket={} key={}", - self.bucket, - key, - ); - Err(S3ClientError::PutObjectError( - SdkError::ConstructionFailure(error), - )) - } - SdkError::TimeoutError(error) => { - tracing::error!( - "Timeout putting object in S3 bucket={} key={}", - self.bucket, - key, - ); - Err(S3ClientError::PutObjectError(SdkError::TimeoutError( - error, - ))) - } - SdkError::DispatchFailure(error) => { - tracing::error!( - "Failed to dispatch S3 request for bucket={} key={}: {}", - self.bucket, - key, - error.as_connector_error().unwrap() - ); - Err(S3ClientError::PutObjectError( - SdkError::DispatchFailure(error), - )) - } - SdkError::ResponseError(error) => { - tracing::error!( - "Invalid response from S3 for bucket={} key={}", - self.bucket, - key, - ); - Err(S3ClientError::PutObjectError(SdkError::ResponseError( - error, - ))) - } - _ => { - tracing::error!( - "Failed to put object in S3 bucket={} key={}: {:?}", - self.bucket, - key, - error - ); - Err(S3ClientError::PutObjectError(error)) - } - }, - } + .map_err(|e| StorageError::StoreError(e.to_string()))?; + + Ok(()) } - pub async fn get_object( + async fn upload_multipart( &self, key: &str, - ) -> Result, S3ClientError> { - let result = self + data: Vec, + ) -> Result<(), StorageError> { + const CHUNK_SIZE: usize = 5 * 1024 * 1024; // 5MB chunks + + // Create multipart upload + let create_multipart = self .client - .get_object() - .bucket(&self.bucket) + .create_multipart_upload() + .bucket(self.config.bucket()) .key(key) .send() - .await?; + .await + .map_err(|e| { + StorageError::StoreError(format!( + "Failed to create multipart upload: {}", + e + )) + })?; - Ok(result.body.collect().await?.into_bytes().to_vec()) - } + let upload_id = create_multipart.upload_id().ok_or_else(|| { + StorageError::StoreError("Failed to get upload ID".to_string()) + })?; + + let mut completed_parts = Vec::new(); + let chunks = data.chunks(CHUNK_SIZE); + let total_chunks = chunks.len(); + + // Upload parts + for (i, chunk) in chunks.enumerate() { + let part_number = (i + 1) as i32; + + match self + .client + .upload_part() + .bucket(self.config.bucket()) + .key(key) + .upload_id(upload_id) + .body(chunk.to_vec().into()) + .part_number(part_number) + .send() + .await + { + Ok(response) => { + if let Some(e_tag) = response.e_tag() { + completed_parts.push( + aws_sdk_s3::types::CompletedPart::builder() + .e_tag(e_tag) + .part_number(part_number) + .build(), + ); + } + } + Err(err) => { + // Abort the multipart upload if a part fails + self.client + .abort_multipart_upload() + .bucket(self.config.bucket()) + .key(key) + .upload_id(upload_id) + .send() + .await + .map_err(|e| { + StorageError::StoreError(format!( + "Failed to abort multipart upload: {}", + e + )) + })?; + + return Err(StorageError::StoreError(format!( + "Failed to upload part: {}", + err + ))); + } + } + + tracing::debug!( + "Uploaded part {}/{} for key={}", + part_number, + total_chunks, + key + ); + } - /// Delete a single object from S3. - pub async fn delete_object(&self, key: &str) -> Result<(), S3ClientError> { + // Complete multipart upload self.client - .delete_object() - .bucket(&self.bucket) + .complete_multipart_upload() + .bucket(self.config.bucket()) .key(key) + .upload_id(upload_id) + .multipart_upload( + aws_sdk_s3::types::CompletedMultipartUpload::builder() + .set_parts(Some(completed_parts)) + .build(), + ) .send() - .await?; + .await + .map_err(|e| { + StorageError::StoreError(format!( + "Failed to complete multipart upload: {}", + e + )) + })?; Ok(()) } #[cfg(any(test, feature = "test-helpers"))] - pub async fn create_bucket(&self) -> Result<(), S3ClientError> { - // Create bucket - self.client - .create_bucket() - .bucket(&self.bucket) - .send() - .await?; + pub async fn new_for_testing() -> Result { + dotenvy::dotenv().ok(); - Ok(()) - } + use crate::{StorageEnv, StorageRole}; + let config = S3StorageOpts::new(StorageEnv::Local, StorageRole::Admin) + .with_random_namespace(); - #[cfg(any(test, feature = "test-helpers"))] - pub async fn new_for_testing() -> Self { - dotenvy::dotenv().expect(".env file not found"); + let aws_config = aws_config::defaults(BehaviorVersion::latest()) + .endpoint_url(config.endpoint_url()) + .region(config.region()) + .credentials_provider(aws_sdk_s3::config::Credentials::new( + "test", "test", None, None, "static", + )) + .load() + .await; - let s3_client = Self::new(&S3ClientOpts::new( - crate::S3Env::Local, - crate::S3Role::Admin, - )) - .await - .expect( - "S3Client creation failed. Check AWS Env vars and Localstack setup", - ); + let s3_config = aws_sdk_s3::config::Builder::from(&aws_config) + .force_path_style(true) + .disable_s3_express_session_auth(true) + .build(); - s3_client - .create_bucket() - .await - .expect("Failed to create bucket"); + let client = aws_sdk_s3::Client::from_conf(s3_config); - s3_client + // Ensure bucket exists before running tests + let storage = Self { + client, + config, + retry_config: RetryConfig::default(), + }; + storage.ensure_bucket().await?; + Ok(storage) } #[cfg(any(test, feature = "test-helpers"))] - pub async fn cleanup_after_testing(&self) { - let client = &self.client; - let bucket = &self.bucket; - - let objects = client - .list_objects_v2() - .bucket(bucket) + pub async fn ensure_bucket(&self) -> Result<(), StorageError> { + // Check if bucket exists + let exists = self + .client + .head_bucket() + .bucket(self.config.bucket()) .send() .await - .unwrap(); + .is_ok(); - for object in objects.contents() { - if let Some(key) = object.key() { - client - .delete_object() - .bucket(bucket) - .key(key) - .send() - .await - .unwrap(); - } + // Create bucket if it doesn't exist + if !exists { + self.create_bucket().await?; } + Ok(()) + } - client.delete_bucket().bucket(bucket).send().await.unwrap(); + pub fn with_retry_config(mut self, config: RetryConfig) -> Self { + self.retry_config = config; + self } } #[cfg(test)] mod tests { + use tracing_test::traced_test; + use super::*; + use crate::storage::Storage; #[tokio::test] - async fn test_put_and_get_object() { - let s3_client = S3Client::new_for_testing().await; + async fn test_basic_operations() { + let storage = S3Storage::new_for_testing().await.unwrap(); - // Put object + // Test store and retrieve let key = "test-key"; - let content = b"Hello, LocalStack!".to_vec(); - s3_client - .put_object(key, content.clone()) + let content = b"Hello, Storage!".to_vec(); + + storage.store(key, content.clone()).await.unwrap(); + let retrieved = storage.retrieve(key).await.unwrap(); + assert_eq!(retrieved, content); + + // Test delete + storage.delete(key).await.unwrap(); + let result = storage.retrieve(key).await; + assert!(result.is_err()); + } + + #[tokio::test] + #[traced_test] + async fn test_file_size_threshold() { + let storage = S3Storage::new_for_testing().await.unwrap(); + + // Test small file (under 1MB) + let small_content = vec![0u8; 500 * 1024]; + storage + .store("small-file", small_content.clone()) .await - .expect("Failed to put object"); + .unwrap(); + assert!(logs_contain("put_object")); + + // Verify small file was stored correctly + let retrieved_small = storage.retrieve("small-file").await.unwrap(); + assert_eq!(retrieved_small, small_content); - // Get object - let result = s3_client - .get_object(key) + // Test large file (over 1MB) + let large_content = vec![0u8; 2 * 1024 * 1024]; + storage + .store("large-file", large_content.clone()) .await - .expect("Failed to get object"); + .unwrap(); + assert!(logs_contain("multipart_upload")); + + // Verify large file was stored correctly + let retrieved_large = storage.retrieve("large-file").await.unwrap(); + assert_eq!(retrieved_large, large_content); + } - assert_eq!(result, content); + #[tokio::test] + async fn test_multipart_upload_with_multiple_chunks() { + let storage = S3Storage::new_for_testing().await.unwrap(); + + // Create a file that will require exactly 3 chunks (15MB + 1 byte) + // Since chunk size is 5MB, this will create 3 chunks: + // Chunk 1: 5MB + // Chunk 2: 5MB + // Chunk 3: 5MB + 1 byte + let content_size = (5 * 1024 * 1024 * 3) + 1; + let content: Vec = (0..content_size) + .map(|i| (i % 255) as u8) // Create pattern to verify data integrity + .collect(); + + let key = "multiple-chunks"; + + // Store the file + storage.store(key, content.clone()).await.unwrap(); + + // Retrieve and verify the file immediately after upload + let retrieved_after_upload = storage.retrieve(key).await.unwrap(); + assert_eq!( + retrieved_after_upload.len(), + content.len(), + "Retrieved file size should match original" + ); + assert_eq!( + retrieved_after_upload, content, + "Retrieved file content should match original" + ); + + // Wait a moment and retrieve again to verify persistence + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + let retrieved_after_wait = storage.retrieve(key).await.unwrap(); + assert_eq!( + retrieved_after_wait.len(), + content.len(), + "Retrieved file size should still match after waiting" + ); + assert_eq!( + retrieved_after_wait, content, + "Retrieved file content should still match after waiting" + ); + + // Clean up + storage.delete(key).await.unwrap(); - // Cleanup - s3_client.cleanup_after_testing().await; + // Verify deletion + let result = storage.retrieve(key).await; + assert!( + result.is_err(), + "File should no longer exist after deletion" + ); } } diff --git a/crates/fuel-streams-storage/src/s3/s3_client_opts.rs b/crates/fuel-streams-storage/src/s3/s3_client_opts.rs index 468efa30..782c4f00 100644 --- a/crates/fuel-streams-storage/src/s3/s3_client_opts.rs +++ b/crates/fuel-streams-storage/src/s3/s3_client_opts.rs @@ -1,77 +1,45 @@ use std::str::FromStr; -#[derive(Debug, Clone, Default)] -pub enum S3Role { - Admin, - #[default] - Public, -} - -#[derive(Debug, Clone, Default)] -pub enum S3Env { - #[default] - Local, - Testnet, - Mainnet, -} - -impl FromStr for S3Env { - type Err = String; +use aws_config::Region; - fn from_str(s: &str) -> Result { - match s { - "local" => Ok(S3Env::Local), - "testnet" => Ok(S3Env::Testnet), - "mainnet" => Ok(S3Env::Mainnet), - _ => Err(format!("unknown S3 type: {}", s)), - } - } -} +use crate::{StorageConfig, StorageEnv, StorageRole}; #[derive(Debug, Clone, Default)] -pub struct S3ClientOpts { - pub s3_env: S3Env, - pub role: S3Role, +pub struct S3StorageOpts { + pub env: StorageEnv, + pub role: StorageRole, pub namespace: Option, } -impl S3ClientOpts { - pub fn new(s3_env: S3Env, role: S3Role) -> Self { +impl StorageConfig for S3StorageOpts { + fn new(env: StorageEnv, role: StorageRole) -> Self { Self { - s3_env, + env, role, namespace: None, } } - pub fn from_env(role: Option) -> Self { - let s3_env = std::env::var("NETWORK") - .map(|s| S3Env::from_str(&s).unwrap_or_default()) + fn from_env(role: Option) -> Self { + let env = std::env::var("NETWORK") + .map(|s| StorageEnv::from_str(&s).unwrap_or_default()) .unwrap_or_default(); Self { - s3_env, + env, role: role.unwrap_or_default(), namespace: None, } } - pub fn admin_opts() -> Self { - Self::from_env(Some(S3Role::Admin)) - } - - pub fn public_opts() -> Self { - Self::from_env(Some(S3Role::Public)) - } - - pub fn endpoint_url(&self) -> String { + fn endpoint_url(&self) -> String { match self.role { - S3Role::Admin => dotenvy::var("AWS_ENDPOINT_URL") + StorageRole::Admin => dotenvy::var("AWS_ENDPOINT_URL") .expect("AWS_ENDPOINT_URL must be set for admin role"), - S3Role::Public => { - match self.s3_env { - S3Env::Local => "http://localhost:4566".to_string(), - S3Env::Testnet | S3Env::Mainnet => { + StorageRole::Public => { + match self.env { + StorageEnv::Local => "http://localhost:4566".to_string(), + StorageEnv::Testnet | StorageEnv::Mainnet => { let bucket = self.bucket(); let region = self.region(); format!("https://{bucket}.s3-website-{region}.amazonaws.com") @@ -81,11 +49,60 @@ impl S3ClientOpts { } } - pub fn region(&self) -> String { - match &self.role { - S3Role::Admin => dotenvy::var("AWS_REGION") + fn environment(&self) -> &StorageEnv { + &self.env + } + + fn role(&self) -> &StorageRole { + &self.role + } +} + +impl S3StorageOpts { + pub fn with_namespace(mut self, namespace: impl Into) -> Self { + self.namespace = Some(namespace.into()); + self + } + + pub fn region(&self) -> Region { + let region = match &self.role { + StorageRole::Admin => dotenvy::var("AWS_REGION") .expect("AWS_REGION must be set for admin role"), - S3Role::Public => "us-east-1".to_string(), + StorageRole::Public => "us-east-1".to_string(), + }; + Region::new(region) + } + + pub fn bucket(&self) -> String { + if matches!(self.role, StorageRole::Admin) { + return dotenvy::var("AWS_S3_BUCKET_NAME") + .expect("AWS_S3_BUCKET_NAME must be set for admin role"); + } + + let base_bucket = match self.env { + StorageEnv::Local => "fuel-streams-local", + StorageEnv::Testnet => "fuel-streams-testnet", + StorageEnv::Mainnet => "fuel-streams", + }; + + match &self.namespace { + Some(ns) => format!("{base_bucket}-{ns}"), + None => base_bucket.to_string(), + } + } + + pub fn credentials(&self) -> Option { + match self.role { + StorageRole::Admin => Some(aws_sdk_s3::config::Credentials::new( + dotenvy::var("AWS_ACCESS_KEY_ID") + .expect("AWS_ACCESS_KEY_ID must be set for admin role"), + dotenvy::var("AWS_SECRET_ACCESS_KEY") + .expect("AWS_SECRET_ACCESS_KEY must be set for admin role"), + None, + None, + "static", + )), + StorageRole::Public => None, } } @@ -99,22 +116,42 @@ impl S3ClientOpts { self.namespace = Some(random_namespace); self } +} - pub fn bucket(&self) -> String { - if matches!(self.role, S3Role::Admin) { - return dotenvy::var("AWS_S3_BUCKET_NAME") - .expect("AWS_S3_BUCKET_NAME must be set for admin role"); - } +#[cfg(test)] +mod tests { + use super::*; - let base_bucket = match self.s3_env { - S3Env::Local => "fuel-streams-local", - S3Env::Testnet => "fuel-streams-testnet", - S3Env::Mainnet => "fuel-streams", - }; + #[test] + fn test_bucket_names() { + let opts = S3StorageOpts::new(StorageEnv::Local, StorageRole::Public); + assert_eq!(opts.bucket(), "fuel-streams-local"); + + let opts = opts.with_namespace("test"); + assert_eq!(opts.bucket(), "fuel-streams-local-test"); + + let opts = S3StorageOpts::new(StorageEnv::Testnet, StorageRole::Public); + assert_eq!(opts.bucket(), "fuel-streams-testnet"); + + let opts = S3StorageOpts::new(StorageEnv::Mainnet, StorageRole::Public); + assert_eq!(opts.bucket(), "fuel-streams"); + } - self.namespace - .as_ref() - .map(|ns| format!("{base_bucket}-{ns}")) - .unwrap_or(base_bucket.to_string()) + #[test] + fn test_public_endpoint_urls() { + let opts = S3StorageOpts::new(StorageEnv::Local, StorageRole::Public); + assert_eq!(opts.endpoint_url(), "http://localhost:4566"); + + let opts = S3StorageOpts::new(StorageEnv::Testnet, StorageRole::Public); + assert_eq!( + opts.endpoint_url(), + "https://fuel-streams-testnet.s3-website-us-east-1.amazonaws.com" + ); + + let opts = S3StorageOpts::new(StorageEnv::Mainnet, StorageRole::Public); + assert_eq!( + opts.endpoint_url(), + "https://fuel-streams.s3-website-us-east-1.amazonaws.com" + ); } } diff --git a/crates/fuel-streams-storage/src/storage.rs b/crates/fuel-streams-storage/src/storage.rs new file mode 100644 index 00000000..b1e1afcd --- /dev/null +++ b/crates/fuel-streams-storage/src/storage.rs @@ -0,0 +1,47 @@ +use async_trait::async_trait; +use displaydoc::Display as DisplayDoc; +use thiserror::Error; + +use crate::StorageConfig; + +#[derive(Error, Debug, DisplayDoc)] +pub enum StorageError { + /// Failed to store object: {0} + StoreError(String), + /// Failed to retrieve object: {0} + RetrieveError(String), + /// Failed to delete object: {0} + DeleteError(String), + /// Failed to initialize storage: {0} + InitError(String), +} + +#[async_trait] +pub trait Storage: std::fmt::Debug + Send + Sync { + type Config: StorageConfig; + + async fn new(config: Self::Config) -> Result + where + Self: Sized; + + async fn new_admin() -> Result + where + Self: Sized, + { + Self::new(Self::Config::admin_opts()).await + } + + async fn new_public() -> Result + where + Self: Sized, + { + Self::new(Self::Config::public_opts()).await + } + + async fn store(&self, key: &str, data: Vec) + -> Result<(), StorageError>; + + async fn retrieve(&self, key: &str) -> Result, StorageError>; + + async fn delete(&self, key: &str) -> Result<(), StorageError>; +} diff --git a/crates/fuel-streams-storage/src/storage_config.rs b/crates/fuel-streams-storage/src/storage_config.rs new file mode 100644 index 00000000..3ed09426 --- /dev/null +++ b/crates/fuel-streams-storage/src/storage_config.rs @@ -0,0 +1,46 @@ +use std::str::FromStr; + +#[derive(Debug, Clone, Default)] +pub enum StorageRole { + Admin, + #[default] + Public, +} + +#[derive(Debug, Clone, Default)] +pub enum StorageEnv { + #[default] + Local, + Testnet, + Mainnet, +} + +impl FromStr for StorageEnv { + type Err = String; + + fn from_str(s: &str) -> Result { + match s { + "local" => Ok(StorageEnv::Local), + "testnet" => Ok(StorageEnv::Testnet), + "mainnet" => Ok(StorageEnv::Mainnet), + _ => Err(format!("unknown environment type: {}", s)), + } + } +} + +pub trait StorageConfig: Send + Sync + std::fmt::Debug + Sized { + fn new(env: StorageEnv, role: StorageRole) -> Self; + fn from_env(role: Option) -> Self; + + fn admin_opts() -> Self { + Self::from_env(Some(StorageRole::Admin)) + } + + fn public_opts() -> Self { + Self::from_env(Some(StorageRole::Public)) + } + + fn endpoint_url(&self) -> String; + fn environment(&self) -> &StorageEnv; + fn role(&self) -> &StorageRole; +} diff --git a/crates/sv-consumer/Cargo.toml b/crates/sv-consumer/Cargo.toml index 50d1e17e..ca105b03 100644 --- a/crates/sv-consumer/Cargo.toml +++ b/crates/sv-consumer/Cargo.toml @@ -19,11 +19,13 @@ path = "src/main.rs" anyhow = { workspace = true } async-nats = { workspace = true } clap = { workspace = true } +displaydoc = { workspace = true } dotenvy = { workspace = true } fuel-core = { workspace = true, default-features = false, features = ["p2p", "relayer", "rocksdb"] } fuel-streams-core = { workspace = true, features = ["test-helpers"] } fuel-streams-executors = { workspace = true, features = ["test-helpers"] } futures = { workspace = true } +hex = { workspace = true } num_cpus = { workspace = true } serde_json = { workspace = true } sv-publisher = { workspace = true } diff --git a/crates/sv-consumer/src/main.rs b/crates/sv-consumer/src/main.rs index 3e4fb6b2..5c5f0653 100644 --- a/crates/sv-consumer/src/main.rs +++ b/crates/sv-consumer/src/main.rs @@ -13,6 +13,7 @@ use async_nats::jetstream::{ stream::{ConsumerErrorKind, RetentionPolicy}, }; use clap::Parser; +use displaydoc::Display as DisplayDoc; use fuel_streams_core::prelude::*; use fuel_streams_executors::*; use futures::{future::try_join_all, stream::FuturesUnordered, StreamExt}; @@ -22,40 +23,30 @@ use tokio_util::sync::CancellationToken; use tracing::level_filters::LevelFilter; use tracing_subscriber::fmt::time; -#[derive(thiserror::Error, Debug)] +#[derive(thiserror::Error, Debug, DisplayDoc)] pub enum ConsumerError { - #[error("Failed to receive batch of messages from NATS: {0}")] + /// Failed to receive batch of messages from NATS: {0} BatchStream(#[from] async_nats::error::Error), - - #[error("Failed to create stream: {0}")] + /// Failed to create stream: {0} CreateStream(#[from] async_nats::error::Error), - - #[error("Failed to create consumer: {0}")] + /// Failed to create consumer: {0} CreateConsumer(#[from] async_nats::error::Error), - - #[error("Failed to connect to NATS client: {0}")] + /// Failed to connect to NATS client: {0} NatsClient(#[from] NatsError), - - #[error("Failed to communicate with NATS server: {0}")] + /// Failed to communicate with NATS server: {0} Nats(#[from] async_nats::Error), - - #[error("Failed to deserialize block payload from message: {0}")] + /// Failed to deserialize block payload from message: {0} Deserialization(#[from] serde_json::Error), - - #[error("Failed to decode UTF-8: {0}")] + /// Failed to decode UTF-8: {0} Utf8(#[from] std::str::Utf8Error), - - #[error("Failed to execute executor tasks: {0}")] + /// Failed to execute executor tasks: {0} Executor(#[from] ExecutorError), - - #[error("Failed to join tasks: {0}")] + /// Failed to join tasks: {0} JoinTasks(#[from] tokio::task::JoinError), - - #[error("Failed to acquire semaphore: {0}")] + /// Failed to acquire semaphore: {0} Semaphore(#[from] tokio::sync::AcquireError), - - #[error("Failed to setup S3 client: {0}")] - S3(#[from] S3ClientError), + /// Failed to setup storage: {0} + Storage(#[from] fuel_streams_core::storage::StorageError), } #[tokio::main] @@ -101,10 +92,10 @@ async fn main() -> anyhow::Result<()> { Ok(()) } -async fn setup_s3() -> Result, ConsumerError> { - let s3_client_opts = S3ClientOpts::admin_opts(); - let s3_client = S3Client::new(&s3_client_opts).await?; - Ok(Arc::new(s3_client)) +async fn setup_storage() -> Result, ConsumerError> { + let storage_opts = S3StorageOpts::admin_opts(); + let storage = S3Storage::new(storage_opts).await?; + Ok(Arc::new(storage)) } async fn setup_nats( @@ -141,7 +132,7 @@ async fn setup_nats( pub static CONSUMER_MAX_THREADS: LazyLock = LazyLock::new(|| { let available_cpus = num_cpus::get(); - env::var("CONSUMER_MAX_THREADS") + dotenvy::var("CONSUMER_MAX_THREADS") .ok() .and_then(|val| val.parse().ok()) .unwrap_or(available_cpus) @@ -152,10 +143,9 @@ async fn process_messages( token: &CancellationToken, ) -> Result<(), ConsumerError> { let (core_client, publisher_client, consumer) = setup_nats(cli).await?; - let s3_client = setup_s3().await?; + let storage = setup_storage().await?; let (_, publisher_stream) = - FuelStreams::setup_all(&core_client, &publisher_client, &s3_client) - .await; + FuelStreams::setup_all(&core_client, &publisher_client, &storage).await; let fuel_streams: Arc = publisher_stream.arc(); let semaphore = Arc::new(tokio::sync::Semaphore::new(64)); @@ -163,23 +153,41 @@ async fn process_messages( let mut messages = consumer.fetch().max_messages(100).messages().await?.fuse(); let mut futs = FuturesUnordered::new(); + while let Some(msg) = messages.next().await { let msg = msg?; let fuel_streams = fuel_streams.clone(); let semaphore = semaphore.clone(); + + tracing::debug!( + "Received message payload length: {}", + msg.payload.len() + ); + let future = async move { - let msg_str = std::str::from_utf8(&msg.payload)?; - let payload = Arc::new(BlockPayload::decode(msg_str)?); - let start_time = std::time::Instant::now(); - let futures = Executor::::process_all( - payload.clone(), - &fuel_streams, - &semaphore, - ); - let results = try_join_all(futures).await?; - let end_time = std::time::Instant::now(); - msg.ack().await?; - Ok::<_, ConsumerError>((results, start_time, end_time, payload)) + match BlockPayload::decode(&msg.payload).await { + Ok(payload) => { + let payload = Arc::new(payload); + let start_time = std::time::Instant::now(); + let futures = Executor::::process_all( + payload.clone(), + &fuel_streams, + &semaphore, + ); + let results = try_join_all(futures).await?; + let end_time = std::time::Instant::now(); + msg.ack().await.expect("Failed to ack message"); + Ok((results, start_time, end_time, payload)) + } + Err(e) => { + tracing::error!("Failed to decode payload: {:?}", e); + tracing::debug!( + "Raw payload (hex): {:?}", + hex::encode(&msg.payload) + ); + Err(e) + } + } }; futs.push(future); } diff --git a/crates/sv-publisher/Cargo.toml b/crates/sv-publisher/Cargo.toml index fdf306e6..b6fee200 100644 --- a/crates/sv-publisher/Cargo.toml +++ b/crates/sv-publisher/Cargo.toml @@ -19,6 +19,7 @@ path = "src/main.rs" anyhow = { workspace = true } async-nats = { workspace = true } clap = { workspace = true } +displaydoc = { workspace = true } fuel-core = { workspace = true, default-features = false, features = ["p2p", "relayer", "rocksdb"] } fuel-core-bin = { workspace = true, default-features = false, features = [ "p2p", diff --git a/crates/sv-publisher/src/main.rs b/crates/sv-publisher/src/main.rs index 38b1d491..63bbe695 100644 --- a/crates/sv-publisher/src/main.rs +++ b/crates/sv-publisher/src/main.rs @@ -1,11 +1,12 @@ use std::{sync::Arc, time::Duration}; use async_nats::jetstream::{ - context::PublishErrorKind, + context::{Publish, PublishErrorKind}, stream::RetentionPolicy, Context, }; use clap::Parser; +use displaydoc::Display as DisplayDoc; use fuel_core_types::blockchain::SealedBlock; use fuel_streams_core::prelude::*; use fuel_streams_executors::*; @@ -14,12 +15,11 @@ use sv_publisher::{cli::Cli, shutdown::ShutdownController}; use thiserror::Error; use tokio_util::sync::CancellationToken; -#[derive(Error, Debug)] +#[derive(Error, Debug, DisplayDoc)] pub enum LiveBlockProcessingError { - #[error("Failed to publish block: {0}")] + /// Failed to publish block: {0} PublishError(#[from] PublishError), - - #[error("Processing was cancelled")] + /// Processing was cancelled Cancelled, } @@ -30,11 +30,11 @@ async fn main() -> anyhow::Result<()> { let fuel_core: Arc = FuelCore::new(config).await?; fuel_core.start().await?; - let s3_client = setup_s3().await?; + let storage = setup_storage().await?; let nats_client = setup_nats(&cli.nats_url).await?; let last_block_height = Arc::new(fuel_core.get_latest_block_height()?); let last_published = - Arc::new(find_last_published_height(&nats_client, &s3_client).await?); + Arc::new(find_last_published_height(&nats_client, &storage).await?); let shutdown = Arc::new(ShutdownController::new()); shutdown.clone().spawn_signal_handler(); @@ -72,10 +72,10 @@ async fn main() -> anyhow::Result<()> { Ok(()) } -async fn setup_s3() -> anyhow::Result> { - let s3_client_opts = S3ClientOpts::admin_opts(); - let s3_client = S3Client::new(&s3_client_opts).await?; - Ok(Arc::new(s3_client)) +async fn setup_storage() -> anyhow::Result> { + let storage_opts = S3StorageOpts::admin_opts(); + let storage = S3Storage::new(storage_opts).await?; + Ok(Arc::new(storage)) } async fn setup_nats(nats_url: &str) -> anyhow::Result { @@ -100,10 +100,9 @@ async fn setup_nats(nats_url: &str) -> anyhow::Result { async fn find_last_published_height( nats_client: &NatsClient, - s3_client: &Arc, + storage: &Arc, ) -> anyhow::Result { - let block_stream = - Stream::::get_or_init(nats_client, s3_client).await; + let block_stream = Stream::::get_or_init(nats_client, storage).await; let last_publish_height = block_stream .get_last_published(BlocksSubject::WILDCARD) .await?; @@ -181,15 +180,13 @@ async fn process_live_blocks( Ok(()) } -#[derive(Error, Debug)] +#[derive(Error, Debug, DisplayDoc)] pub enum PublishError { - #[error("Failed to publish block to NATS server: {0}")] + /// Failed to publish block to NATS server: {0} NatsPublish(#[from] async_nats::error::Error), - - #[error("Failed to create block payload due to: {0}")] + /// Failed to create block payload due to: {0} BlockPayload(#[from] ExecutorError), - - #[error("Failed to access offchain database: {0}")] + /// Failed to access offchain database: {0} OffchainDatabase(String), } @@ -201,8 +198,12 @@ async fn publish_block( let metadata = Metadata::new(fuel_core, sealed_block); let fuel_core = Arc::clone(fuel_core); let payload = BlockPayload::new(fuel_core, sealed_block, &metadata)?; + let publish = Publish::build() + .message_id(payload.message_id()) + .payload(payload.encode().await?.into()); + jetstream - .send_publish(payload.subject(), payload.to_owned().try_into()?) + .send_publish(payload.subject(), publish) .await .map_err(PublishError::NatsPublish)? .await diff --git a/crates/sv-webserver/Cargo.toml b/crates/sv-webserver/Cargo.toml index 621f504c..81b60db4 100644 --- a/crates/sv-webserver/Cargo.toml +++ b/crates/sv-webserver/Cargo.toml @@ -29,6 +29,7 @@ derive_more = { version = "1.0", features = ["full"] } displaydoc = { workspace = true } dotenvy = { workspace = true } elasticsearch = "8.15.0-alpha.1" +fuel-data-parser = { workspace = true } fuel-streams-core = { workspace = true, features = ["test-helpers"] } fuel-streams-nats = { workspace = true, features = ["test-helpers"] } fuel-streams-storage = { workspace = true, features = ["test-helpers"] } @@ -55,7 +56,9 @@ urlencoding = "2.1" uuid = { version = "1.11.0", features = ["serde", "v4"] } validator = { version = "0.19.0", features = ["derive"] } -[dev-dependencies] +# in an individual package Cargo.toml +[package.metadata.cargo-machete] +ignored = ["fuel-data-parser"] [features] default = [] diff --git a/crates/sv-webserver/src/server/context.rs b/crates/sv-webserver/src/server/context.rs index c0af7476..0da19c2b 100644 --- a/crates/sv-webserver/src/server/context.rs +++ b/crates/sv-webserver/src/server/context.rs @@ -1,7 +1,7 @@ use std::{sync::Arc, time::Duration}; use fuel_streams_core::prelude::*; -use fuel_streams_storage::S3Client; +use fuel_streams_storage::S3Storage; use crate::{config::Config, telemetry::Telemetry}; @@ -13,7 +13,7 @@ pub struct Context { pub nats_client: NatsClient, pub fuel_streams: Arc, pub telemetry: Arc, - pub s3_client: Option>, + pub storage: Option>, pub jwt_secret: String, } @@ -23,20 +23,19 @@ impl Context { .with_url(config.nats.url.clone()) .with_domain("CORE"); let nats_client = NatsClient::connect(&nats_client_opts).await?; - let s3_client_opts = S3ClientOpts::admin_opts(); - let s3_client = Arc::new(S3Client::new(&s3_client_opts).await?); + let storage_opts = S3StorageOpts::admin_opts(); + let storage = Arc::new(S3Storage::new(storage_opts).await?); let fuel_streams = - Arc::new(FuelStreams::new(&nats_client, &s3_client).await); + Arc::new(FuelStreams::new(&nats_client, &storage).await); let telemetry = Telemetry::new(None).await?; telemetry.start().await?; Ok(Context { fuel_streams, nats_client, - // client, telemetry, - s3_client: if config.s3.enabled { - Some(s3_client) + storage: if config.s3.enabled { + Some(storage) } else { None }, diff --git a/crates/sv-webserver/src/server/ws/errors.rs b/crates/sv-webserver/src/server/ws/errors.rs index db76ca56..05f00c4c 100644 --- a/crates/sv-webserver/src/server/ws/errors.rs +++ b/crates/sv-webserver/src/server/ws/errors.rs @@ -5,14 +5,12 @@ use thiserror::Error; /// Ws Subscription-related errors #[derive(Debug, DisplayDoc, Error)] pub enum WsSubscriptionError { - /// Unparsable subscription payload: `{0}` - UnparsablePayload(serde_json::Error), /// Unknown subject name: `{0}` UnknownSubjectName(String), /// Unsupported wildcard pattern: `{0}` UnsupportedWildcardPattern(String), - /// Unserializable message payload: `{0}` - UnserializableMessagePayload(serde_json::Error), + /// Unserializable payload: `{0}` + UnserializablePayload(#[from] serde_json::Error), /// Stream Error: `{0}` Stream(#[from] StreamError), /// Closed by client with reason: `{0}` diff --git a/crates/sv-webserver/src/server/ws/socket.rs b/crates/sv-webserver/src/server/ws/socket.rs index d6f4a86e..07e04a1f 100644 --- a/crates/sv-webserver/src/server/ws/socket.rs +++ b/crates/sv-webserver/src/server/ws/socket.rs @@ -198,23 +198,24 @@ async fn handle_binary_message( deliver_policy: deliver_policy.into(), filter_subjects: vec![subject_wildcard.clone()], }; - dbg!(&config); - let mut sub = - match streams.subscribe(&sub_subject, Some(config)).await { - Ok(sub) => sub, - Err(e) => { - close_socket_with_error( - WsSubscriptionError::Stream(e), - user_id, - session, - Some(subject_wildcard.clone()), - telemetry, - ) - .await; - return; - } - }; + let mut sub = match streams + .subscribe_raw(&sub_subject, Some(config)) + .await + { + Ok(sub) => sub, + Err(e) => { + close_socket_with_error( + WsSubscriptionError::Stream(e), + user_id, + session, + Some(subject_wildcard.clone()), + telemetry, + ) + .await; + return; + } + }; // consume and forward to the ws while let Some(s3_serialized_payload) = sub.next().await { @@ -245,9 +246,7 @@ async fn handle_binary_message( ClientMessage::Unsubscribe(payload) => { tracing::info!("Received unsubscribe message: {:?}", payload); let subject_wildcard = payload.wildcard; - let deliver_policy = payload.deliver_policy; - if let Err(e) = verify_and_extract_subject_name(&subject_wildcard) { close_socket_with_error( e, @@ -279,7 +278,7 @@ fn parse_client_message( msg: Bytes, ) -> Result { let msg = serde_json::from_slice::(&msg) - .map_err(WsSubscriptionError::UnparsablePayload)?; + .map_err(WsSubscriptionError::UnserializablePayload)?; Ok(msg) } @@ -330,50 +329,40 @@ async fn decode( s3_payload: Vec, ) -> Result, WsSubscriptionError> { let subject = verify_and_extract_subject_name(subject_wildcard)?; - let entity = match subject.as_str() { - Transaction::NAME => { - let entity = Transaction::decode_or_panic(s3_payload); - serde_json::to_value(entity) - .map_err(WsSubscriptionError::UnparsablePayload)? - } + let payload = match subject.as_str() { Block::NAME => { - let entity = Block::decode_or_panic(s3_payload); - serde_json::to_value(entity) - .map_err(WsSubscriptionError::UnparsablePayload)? + let entity = Block::decode(&s3_payload).await?; + Ok(entity.encode_json_value()?) + } + Transaction::NAME => { + let entity = Transaction::decode(&s3_payload).await?; + Ok(entity.encode_json_value()?) } Input::NAME => { - let entity = Input::decode_or_panic(s3_payload); - serde_json::to_value(entity) - .map_err(WsSubscriptionError::UnparsablePayload)? + let entity = Input::decode(&s3_payload).await?; + Ok(entity.encode_json_value()?) } Output::NAME => { - let entity = Output::decode_or_panic(s3_payload); - serde_json::to_value(entity) - .map_err(WsSubscriptionError::UnparsablePayload)? + let entity = Output::decode(&s3_payload).await?; + Ok(entity.encode_json_value()?) } Receipt::NAME => { - let entity = Receipt::decode_or_panic(s3_payload); - serde_json::to_value(entity) - .map_err(WsSubscriptionError::UnparsablePayload)? + let entity = Receipt::decode(&s3_payload).await?; + Ok(entity.encode_json_value()?) } Utxo::NAME => { - let entity = Utxo::decode_or_panic(s3_payload); - serde_json::to_value(entity) - .map_err(WsSubscriptionError::UnparsablePayload)? + let entity = Utxo::decode(&s3_payload).await?; + Ok(entity.encode_json_value()?) } Log::NAME => { - let entity = Log::decode_or_panic(s3_payload); - serde_json::to_value(entity) - .map_err(WsSubscriptionError::UnparsablePayload)? - } - _ => { - return Err(WsSubscriptionError::UnknownSubjectName( - subject.to_string(), - )) + let entity = Log::decode(&s3_payload).await?; + Ok(entity.encode_json_value()?) } + _ => Err(WsSubscriptionError::UnknownSubjectName( + subject_wildcard.to_string(), + )), }; - // Wrap the entity in ServerMessage::Response and serialize once - serde_json::to_vec(&ServerMessage::Response(entity)) - .map_err(WsSubscriptionError::UnserializableMessagePayload) + serde_json::to_vec(&ServerMessage::Response(payload?)) + .map_err(WsSubscriptionError::UnserializablePayload) } diff --git a/crates/sv-webserver/src/telemetry/system.rs b/crates/sv-webserver/src/telemetry/system.rs index bae499a0..f795e20e 100644 --- a/crates/sv-webserver/src/telemetry/system.rs +++ b/crates/sv-webserver/src/telemetry/system.rs @@ -7,6 +7,7 @@ use std::{ }; use derive_more::Deref; +use displaydoc::Display as DisplayDoc; use rust_decimal::{ prelude::{FromPrimitive, ToPrimitive}, Decimal, @@ -37,9 +38,9 @@ impl From for SystemMetricsWrapper { } } -#[derive(Debug, Error)] +#[derive(Debug, Error, DisplayDoc)] pub enum Error { - #[error("The process {0} could not be found")] + /// The process {0} could not be found ProcessNotFound(Pid), } diff --git a/tests/src/lib.rs b/tests/src/lib.rs index 394fc05c..23376c6b 100644 --- a/tests/src/lib.rs +++ b/tests/src/lib.rs @@ -23,11 +23,11 @@ pub struct Streams { impl Streams { pub async fn new( nats_client: &NatsClient, - s3_client: &Arc, + storage: &Arc, ) -> Self { - let blocks = Stream::::get_or_init(nats_client, s3_client).await; + let blocks = Stream::::get_or_init(nats_client, storage).await; let transactions = - Stream::::get_or_init(nats_client, s3_client).await; + Stream::::get_or_init(nats_client, storage).await; Self { transactions, blocks, @@ -39,11 +39,11 @@ pub async fn server_setup() -> BoxedResult<(NatsClient, Streams, Connection)> { let nats_client_opts = NatsClientOpts::admin_opts().with_rdn_namespace(); let nats_client = NatsClient::connect(&nats_client_opts).await?; - let s3_client_opts = S3ClientOpts::admin_opts().with_random_namespace(); - let s3_client = Arc::new(S3Client::new(&s3_client_opts).await?); - s3_client.create_bucket().await?; + let storage_opts = S3StorageOpts::admin_opts().with_random_namespace(); + let storage = Arc::new(S3Storage::new(storage_opts).await?); + storage.create_bucket().await?; - let streams = Streams::new(&nats_client, &s3_client).await; + let streams = Streams::new(&nats_client, &storage).await; let mut client = Client::new(FuelNetwork::Local).await?; let connection = client.connect().await?; diff --git a/tests/tests/client.rs b/tests/tests/client.rs index 04e9c880..9bcfd6c3 100644 --- a/tests/tests/client.rs +++ b/tests/tests/client.rs @@ -44,7 +44,7 @@ // let nats_opts = NatsClientOpts::admin_opts(); // let client = NatsClient::connect(&nats_opts).await?; // assert!(client.is_connected()); -// let s3_opts = Arc::new(S3ClientOpts::admin_opts()); +// let s3_opts = Arc::new(S3StorageOpts::admin_opts()); // let client = Client::with_opts(&nats_opts, &s3_opts).await?; // assert!(client.nats_conn.is_connected()); // Ok(()) @@ -53,7 +53,7 @@ // #[tokio::test] // async fn multiple_client_connections() -> BoxedResult<()> { // let nats_opts = NatsClientOpts::admin_opts(); -// let s3_opts = Arc::new(S3ClientOpts::admin_opts()); +// let s3_opts = Arc::new(S3StorageOpts::admin_opts()); // let tasks: Vec<_> = (0..100) // .map(|_| { // let nats_opts = nats_opts.clone(); @@ -213,7 +213,7 @@ // .with_url(network.to_nats_url()) // .with_rdn_namespace() // .with_timeout(1); -// let s3_opts = Arc::new(S3ClientOpts::admin_opts()); +// let s3_opts = Arc::new(S3StorageOpts::admin_opts()); // let admin_tasks: Vec>> = (0..100) // .map(|_| { // let opts: NatsClientOpts = admin_opts.clone(); @@ -232,7 +232,7 @@ // .with_rdn_namespace() // .with_timeout(1); // let s3_public_opts = -// Arc::new(S3ClientOpts::new(S3Env::Local, S3Role::Public)); +// Arc::new(S3StorageOpts::new(S3Env::Local, S3Role::Public)); // let public_tasks: Vec>> = (0..100) // .map(|_| { // let opts: NatsClientOpts = public_opts.clone(); diff --git a/tests/tests/publisher.rs b/tests/tests/publisher.rs index 19eb20a2..61611acd 100644 --- a/tests/tests/publisher.rs +++ b/tests/tests/publisher.rs @@ -117,8 +117,8 @@ // #[tokio::test(flavor = "multi_thread")] // async fn doesnt_publish_any_message_when_no_block_has_been_mined() { // let (blocks_broadcaster, _) = broadcast::channel::(1); -// let s3_client = Arc::new(S3Client::new_for_testing().await); -// let publisher = new_publisher(blocks_broadcaster.clone(), &s3_client).await; +// let storage = Arc::new(S3Storage::new_for_testing().await); +// let publisher = new_publisher(blocks_broadcaster.clone(), &storage).await; // let shutdown_controller = start_publisher(&publisher).await; // stop_publisher(shutdown_controller).await; @@ -129,8 +129,8 @@ // #[tokio::test(flavor = "multi_thread")] // async fn publishes_a_block_message_when_a_single_block_has_been_mined() { // let (blocks_broadcaster, _) = broadcast::channel::(1); -// let s3_client = Arc::new(S3Client::new_for_testing().await); -// let publisher = new_publisher(blocks_broadcaster.clone(), &s3_client).await; +// let storage = Arc::new(S3Storage::new_for_testing().await); +// let publisher = new_publisher(blocks_broadcaster.clone(), &storage).await; // publish_block(&publisher, &blocks_broadcaster).await; @@ -140,14 +140,14 @@ // .get_last_published(BlocksSubject::WILDCARD) // .await // .is_ok_and(|result| result.is_some())); -// s3_client.cleanup_after_testing().await; +// storage.cleanup_after_testing().await; // } // #[tokio::test(flavor = "multi_thread")] // async fn publishes_transaction_for_each_published_block() { // let (blocks_broadcaster, _) = broadcast::channel::(1); -// let s3_client = Arc::new(S3Client::new_for_testing().await); -// let publisher = new_publisher(blocks_broadcaster.clone(), &s3_client).await; +// let storage = Arc::new(S3Storage::new_for_testing().await); +// let publisher = new_publisher(blocks_broadcaster.clone(), &storage).await; // publish_block(&publisher, &blocks_broadcaster).await; @@ -157,7 +157,7 @@ // .get_last_published(TransactionsSubject::WILDCARD) // .await // .is_ok_and(|result| result.is_some())); -// s3_client.cleanup_after_testing().await; +// storage.cleanup_after_testing().await; // } // #[tokio::test(flavor = "multi_thread")] @@ -253,9 +253,9 @@ // .with_receipts(receipts.to_vec()) // .arc(); -// let s3_client = Arc::new(S3Client::new_for_testing().await); +// let storage = Arc::new(S3Storage::new_for_testing().await); // let publisher = -// Publisher::new_for_testing(&nats_client().await, &s3_client, fuel_core) +// Publisher::new_for_testing(&nats_client().await, &storage, fuel_core) // .await // .unwrap(); @@ -323,14 +323,14 @@ // "Published receipt IDs don't match expected IDs" // ); -// s3_client.cleanup_after_testing().await; +// storage.cleanup_after_testing().await; // } // #[tokio::test(flavor = "multi_thread")] // async fn publishes_inputs() { // let (blocks_broadcaster, _) = broadcast::channel::(1); -// let s3_client = Arc::new(S3Client::new_for_testing().await); -// let publisher = new_publisher(blocks_broadcaster.clone(), &s3_client).await; +// let storage = Arc::new(S3Storage::new_for_testing().await); +// let publisher = new_publisher(blocks_broadcaster.clone(), &storage).await; // publish_block(&publisher, &blocks_broadcaster).await; @@ -340,15 +340,15 @@ // .get_last_published(InputsByIdSubject::WILDCARD) // .await // .is_ok_and(|result| result.is_some())); -// s3_client.cleanup_after_testing().await; +// storage.cleanup_after_testing().await; // } // async fn new_publisher( // broadcaster: Sender, -// s3_client: &Arc, +// storage: &Arc, // ) -> Publisher { // let fuel_core = TestFuelCore::default(broadcaster).arc(); -// Publisher::new_for_testing(&nats_client().await, s3_client, fuel_core) +// Publisher::new_for_testing(&nats_client().await, storage, fuel_core) // .await // .unwrap() // }