Skip to content

Commit

Permalink
Added no-cache to sse testing
Browse files Browse the repository at this point in the history
  • Loading branch information
rodrigo-o committed Jan 23, 2025
1 parent ae3aa56 commit 7c4481e
Showing 1 changed file with 28 additions and 8 deletions.
36 changes: 28 additions & 8 deletions lib/beacon_api/event_listener.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,11 @@ defmodule BeaconApi.EventPubSub do
The idea is to have a single place to publish events, and then a method for a connection to subscribe to them.
"""

require Logger
alias EventBus.Model.Event
alias LambdaEthereumConsensus.Store
alias SSE.Chunk
alias Types.StateInfo

@type topic() :: atom()
@type event_data() :: any()
Expand All @@ -19,19 +22,36 @@ defmodule BeaconApi.EventPubSub do
TODO: We might want a noop if there are no subscribers for a topic.
"""
@spec publish(topic(), event_data()) :: :ok | {:error, atom()}
def publish(:finalized_checkpoint = topic, %{root: root, epoch: epoch}) do
data = %{root: BeaconApi.Utils.hex_encode(root), epoch: epoch}
chunk = %Chunk{data: [Jason.encode!(data)]}
event = %Event{id: UUID.uuid4(), data: chunk, topic: topic}

EventBus.notify(event)
def publish(:finalized_checkpoint = topic, %{root: block_root, epoch: epoch}) do
with %StateInfo{root: state_root} = Store.BlockStates.get_state_info(block_root) do
data = %{
block: BeaconApi.Utils.hex_encode(block_root),
state: BeaconApi.Utils.hex_encode(state_root),
epoch: epoch,
execution_optimistic: false
}

chunk = %Chunk{event: topic, data: [Jason.encode!(data)]}
event = %Event{id: UUID.uuid4(), data: chunk, topic: topic}

EventBus.notify(event)
else
nil ->
Logger.error("State not available for block", root: block_root)

{:error, :state_not_available}
end
end

def publish(_topic, _event_data), do: {:error, :unsupported_topic}

@doc """
Subscribe to a topic for stream events in an sse connection.
"""
@spec sse_subscribe(Plug.Conn.t(), topic(), event_data()) :: Plug.Conn.t()
def sse_subscribe(conn, topic), do: SSE.stream(conn, {[topic], %Chunk{data: []}})
@spec sse_subscribe(Plug.Conn.t(), topic()) :: Plug.Conn.t()
def sse_subscribe(conn, topic) do
conn
|> Plug.Conn.put_resp_header("cache-control", "no-cache")
|> SSE.stream({[topic], %Chunk{data: []}})
end
end

0 comments on commit 7c4481e

Please sign in to comment.