diff --git a/lib/beacon_api/event_listener.ex b/lib/beacon_api/event_listener.ex index 5cf0f869f..1017e8942 100644 --- a/lib/beacon_api/event_listener.ex +++ b/lib/beacon_api/event_listener.ex @@ -7,8 +7,11 @@ defmodule BeaconApi.EventPubSub do The idea is to have a single place to publish events, and then a method for a connection to subscribe to them. """ + require Logger alias EventBus.Model.Event + alias LambdaEthereumConsensus.Store alias SSE.Chunk + alias Types.StateInfo @type topic() :: atom() @type event_data() :: any() @@ -19,12 +22,25 @@ defmodule BeaconApi.EventPubSub do TODO: We might want a noop if there are no subscribers for a topic. """ @spec publish(topic(), event_data()) :: :ok | {:error, atom()} - def publish(:finalized_checkpoint = topic, %{root: root, epoch: epoch}) do - data = %{root: BeaconApi.Utils.hex_encode(root), epoch: epoch} - chunk = %Chunk{data: [Jason.encode!(data)]} - event = %Event{id: UUID.uuid4(), data: chunk, topic: topic} - - EventBus.notify(event) + def publish(:finalized_checkpoint = topic, %{root: block_root, epoch: epoch}) do + with %StateInfo{root: state_root} = Store.BlockStates.get_state_info(block_root) do + data = %{ + block: BeaconApi.Utils.hex_encode(block_root), + state: BeaconApi.Utils.hex_encode(state_root), + epoch: epoch, + execution_optimistic: false + } + + chunk = %Chunk{event: topic, data: [Jason.encode!(data)]} + event = %Event{id: UUID.uuid4(), data: chunk, topic: topic} + + EventBus.notify(event) + else + nil -> + Logger.error("State not available for block", root: block_root) + + {:error, :state_not_available} + end end def publish(_topic, _event_data), do: {:error, :unsupported_topic} @@ -32,6 +48,10 @@ defmodule BeaconApi.EventPubSub do @doc """ Subscribe to a topic for stream events in an sse connection. """ - @spec sse_subscribe(Plug.Conn.t(), topic(), event_data()) :: Plug.Conn.t() - def sse_subscribe(conn, topic), do: SSE.stream(conn, {[topic], %Chunk{data: []}}) + @spec sse_subscribe(Plug.Conn.t(), topic()) :: Plug.Conn.t() + def sse_subscribe(conn, topic) do + conn + |> Plug.Conn.put_resp_header("cache-control", "no-cache") + |> SSE.stream({[topic], %Chunk{data: []}}) + end end