Skip to content

Commit

Permalink
fix(repo): stream creation
Browse files Browse the repository at this point in the history
  • Loading branch information
pedronauck committed Dec 13, 2024
1 parent ac294a3 commit 865118f
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 62 deletions.
31 changes: 10 additions & 21 deletions crates/fuel-streams-core/src/stream/fuel_streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,35 +16,24 @@ pub struct FuelStreams {
}

impl FuelStreams {
pub async fn new(
nats_client: &NatsClient,
config: Option<StreamOpts>,
) -> Self {
pub async fn new(nats_client: &NatsClient) -> Self {
Self {
transactions: Stream::<Transaction>::new(
nats_client,
config.to_owned(),
)
.await,
blocks: Stream::<Block>::new(nats_client, config.to_owned()).await,
inputs: Stream::<Input>::new(nats_client, config.to_owned()).await,
outputs: Stream::<Output>::new(nats_client, config.to_owned())
.await,
receipts: Stream::<Receipt>::new(nats_client, config.to_owned())
.await,
utxos: Stream::<Utxo>::new(nats_client, config.to_owned()).await,
logs: Stream::<Log>::new(nats_client, config.to_owned()).await,
transactions: Stream::<Transaction>::new(nats_client).await,
blocks: Stream::<Block>::new(nats_client).await,
inputs: Stream::<Input>::new(nats_client).await,
outputs: Stream::<Output>::new(nats_client).await,
receipts: Stream::<Receipt>::new(nats_client).await,
utxos: Stream::<Utxo>::new(nats_client).await,
logs: Stream::<Log>::new(nats_client).await,
}
}

pub async fn setup_all(
core_client: &NatsClient,
publisher_client: &NatsClient,
) -> (Self, Self) {
let core_stream = Self::new(core_client, None).await;
let publisher_stream =
Self::new(publisher_client, Some(StreamOpts { mirror: true }))
.await;
let core_stream = Self::new(core_client).await;
let publisher_stream = Self::new(publisher_client).await;
(core_stream, publisher_stream)
}

Expand Down
42 changes: 10 additions & 32 deletions crates/fuel-streams-core/src/stream/stream_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use async_nats::{
jetstream::{
consumer::AckPolicy,
kv::{self, CreateErrorKind},
stream::{self, LastRawMessageErrorKind, Source, State},
stream::{self, LastRawMessageErrorKind, State},
},
RequestErrorKind,
};
Expand Down Expand Up @@ -114,49 +114,27 @@ pub struct Stream<S: Streamable> {
_marker: std::marker::PhantomData<S>,
}

#[derive(Debug, Clone, Default)]
pub struct StreamOpts {
pub mirror: bool,
}

impl<S: Streamable> Stream<S> {
#[allow(clippy::declare_interior_mutable_const)]
const INSTANCE: OnceCell<Self> = OnceCell::const_new();

pub async fn get_or_init(client: &NatsClient) -> Self {
let cell = Self::INSTANCE;
cell.get_or_init(|| async { Self::new(client, None).await.to_owned() })
cell.get_or_init(|| async { Self::new(client).await.to_owned() })
.await
.to_owned()
}

pub async fn new(client: &NatsClient, config: Option<StreamOpts>) -> Self {
let is_mirror = config.unwrap_or_default().mirror;
pub async fn new(client: &NatsClient) -> Self {
let namespace = &client.namespace;
let bucket_name = namespace.stream_name(S::NAME);
let mirror_name = format!("{bucket_name}_mirrored");

let config = match is_mirror {
false => kv::Config {
bucket: bucket_name.to_owned(),
storage: stream::StorageType::File,
history: 1,
compression: true,
..Default::default()
},
true => kv::Config {
bucket: mirror_name.to_owned(),
storage: stream::StorageType::File,
history: 1,
compression: true,
mirror_direct: true,
mirror: Some(Source {
name: bucket_name.to_owned(),
domain: Some("core".into()),
..Default::default()
}),
..Default::default()
},

let config = kv::Config {
bucket: bucket_name.to_owned(),
storage: stream::StorageType::File,
history: 1,
compression: true,
..Default::default()
};

let store = client
Expand Down
4 changes: 2 additions & 2 deletions crates/fuel-streams-publisher/src/publisher/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ impl Publisher {
let nats_client_opts =
NatsClientOpts::admin_opts(None).with_custom_url(nats_url);
let nats_client = NatsClient::connect(&nats_client_opts).await?;
let fuel_streams = Arc::new(FuelStreams::new(&nats_client, None).await);
let fuel_streams = Arc::new(FuelStreams::new(&nats_client).await);

telemetry.record_streams_count(
fuel_core.chain_id(),
Expand All @@ -59,7 +59,7 @@ impl Publisher {
) -> anyhow::Result<Self> {
Ok(Publisher {
fuel_core,
fuel_streams: Arc::new(FuelStreams::new(nats_client, None).await),
fuel_streams: Arc::new(FuelStreams::new(nats_client).await),
nats_client: nats_client.clone(),
telemetry: Telemetry::new().await?,
})
Expand Down
15 changes: 8 additions & 7 deletions crates/sv-consumer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,20 +87,22 @@ async fn setup_nats(
> {
let core_client = Client::Core.create(cli).await?;
let publisher_client = Client::Publisher.create(cli).await?;
let stream_name = core_client.namespace.stream_name("block_importer");
let stream = core_client
let stream_name = publisher_client.namespace.stream_name("block_importer");
let stream = publisher_client
.jetstream
.get_or_create_stream(async_nats::jetstream::stream::Config {
name: stream_name,
subjects: vec!["block_submitted.>".to_string()],
retention: RetentionPolicy::WorkQueue,
duplicate_window: Duration::from_secs(1),
allow_rollup: true,
..Default::default()
})
.await?;

let consumer = stream
.get_or_create_consumer("block_importer", ConsumerConfig {
durable_name: Some("block_importer".to_string()),
ack_policy: AckPolicy::Explicit,
..Default::default()
})
Expand All @@ -113,13 +115,12 @@ async fn process_messages(
cli: &Cli,
token: &CancellationToken,
) -> Result<(), ConsumerError> {
let (_, publisher_client, consumer) = setup_nats(cli).await?;
let fuel_streams = FuelStreams::new(&publisher_client, None).await;
let fuel_streams: Arc<dyn FuelStreamsExt> = fuel_streams.arc();

let (core_client, publisher_client, consumer) = setup_nats(cli).await?;
let (_, publisher_stream) =
FuelStreams::setup_all(&core_client, &publisher_client).await;
let fuel_streams: Arc<dyn FuelStreamsExt> = publisher_stream.arc();
while !token.is_cancelled() {
let messages = consumer.fetch().max_messages(100).messages().await?;
let fuel_streams = fuel_streams.clone();
tokio::pin!(messages);
while let Some(msg) = messages.next().await {
let msg = msg?;
Expand Down

0 comments on commit 865118f

Please sign in to comment.