Skip to content

Commit

Permalink
feat: add annotations to subscriptions table
Browse files Browse the repository at this point in the history
  • Loading branch information
yordis committed Jan 24, 2025
1 parent 362f087 commit f2080f6
Show file tree
Hide file tree
Showing 7 changed files with 117 additions and 17 deletions.
16 changes: 15 additions & 1 deletion lib/event_store/sql/init.ex
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,24 @@ defmodule EventStore.Sql.Init do
create_subscription_index(),
create_snapshots_table(column_data_type),
create_schema_migrations_table(),
record_event_store_schema_version()
record_event_store_schema_version(),
add_annotations_to_subscriptions(),
create_subscription_annotations_index()
]
end

defp add_annotations_to_subscriptions do
"""
ALTER TABLE subscriptions ADD COLUMN annotations jsonb;
"""
end

defp create_subscription_annotations_index do
"""
CREATE INDEX subscriptions_annotations_idx ON subscriptions USING gin (annotations);
"""
end

defp create_streams_table do
"""
CREATE TABLE streams
Expand Down
7 changes: 4 additions & 3 deletions lib/event_store/sql/statements/insert_subscription.sql.eex
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ INSERT INTO "<%= schema %>".subscriptions
(
stream_uuid,
subscription_name,
last_seen
last_seen,
annotations
)
VALUES ($1, $2, $3)
RETURNING subscription_id, stream_uuid, subscription_name, last_seen, created_at;
VALUES ($1, $2, $3, $4)
RETURNING subscription_id, stream_uuid, subscription_name, last_seen, created_at, annotations;
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ SELECT
stream_uuid,
subscription_name,
last_seen,
created_at
created_at,
annotations
FROM "<%= schema %>".subscriptions
ORDER BY created_at;
3 changes: 2 additions & 1 deletion lib/event_store/sql/statements/query_subscription.sql.eex
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ SELECT
stream_uuid,
subscription_name,
last_seen,
created_at
created_at,
annotations
FROM "<%= schema %>".subscriptions
WHERE stream_uuid = $1 AND subscription_name = $2;
61 changes: 54 additions & 7 deletions lib/event_store/storage/subscription.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,40 @@ defmodule EventStore.Storage.Subscription do
QuerySubscription
}

@typedoc """
A subscription to an event stream.
* `subscription_id` - Unique identifier for the subscription
* `stream_uuid` - The stream being subscribed to
* `subscription_name` - Name of the subscription
* `last_seen` - Last event seen by this subscription
* `created_at` - When the subscription was created
* `annotations` - Arbitrary non-identifying metadata attached to the
subscription, inspired by Kubernetes annotations. These are key-value pairs
that can be used to store auxiliary information about a subscription that
is not directly part of its core functionality. For example:
* Build/release information (team owner, git sha, etc.)
* Client-specific configuration
* Debugging info
* Tool information
"""
@type t :: %EventStore.Storage.Subscription{
subscription_id: non_neg_integer(),
stream_uuid: String.t(),
subscription_name: String.t(),
last_seen: non_neg_integer() | nil,
created_at: DateTime.t()
created_at: DateTime.t(),
annotations: map()
}

defstruct [:subscription_id, :stream_uuid, :subscription_name, :last_seen, :created_at]
defstruct [
:subscription_id,
:stream_uuid,
:subscription_name,
:last_seen,
:created_at,
:annotations
]

defdelegate subscriptions(conn, opts), to: QueryAllSubscriptions, as: :execute

Expand Down Expand Up @@ -49,7 +74,17 @@ defmodule EventStore.Storage.Subscription do
do: Subscription.Delete.execute(conn, stream_uuid, subscription_name, opts)

defp create_subscription(conn, stream_uuid, subscription_name, start_from, opts) do
case CreateSubscription.execute(conn, stream_uuid, subscription_name, start_from, opts) do
{splitted_opts, opts} = Keyword.split(opts, [:annotations])
annotations = Keyword.get(splitted_opts, :annotations) || %{}

case CreateSubscription.execute(
conn,
stream_uuid,
subscription_name,
start_from,
annotations,
opts
) do
{:ok, %Subscription{}} = reply ->
reply

Expand Down Expand Up @@ -96,7 +131,8 @@ defmodule EventStore.Storage.Subscription do
defmodule CreateSubscription do
@moduledoc false

def execute(conn, stream_uuid, subscription_name, start_from, opts) do
def execute(conn, stream_uuid, subscription_name, start_from, annotations, opts)
when is_map(annotations) do
Logger.debug(
"Attempting to create subscription on stream " <>
inspect(stream_uuid) <>
Expand All @@ -107,7 +143,12 @@ defmodule EventStore.Storage.Subscription do

query = Statements.insert_subscription(schema)

case Postgrex.query(conn, query, [stream_uuid, subscription_name, start_from], opts) do
case Postgrex.query(
conn,
query,
[stream_uuid, subscription_name, start_from, annotations],
opts
) do
{:ok, %Postgrex.Result{rows: rows}} ->
Logger.debug(
"Created subscription on stream \"#{stream_uuid}\" named \"#{subscription_name}\""
Expand Down Expand Up @@ -200,16 +241,22 @@ defmodule EventStore.Storage.Subscription do
stream_uuid,
subscription_name,
last_seen,
created_at
created_at,
annotations
] = row

%Subscription{
subscription_id: subscription_id,
stream_uuid: stream_uuid,
subscription_name: subscription_name,
last_seen: last_seen,
created_at: created_at
created_at: created_at,
annotations: annotations_from_row(annotations)
}
end

defp annotations_from_row(nil), do: %{}
defp annotations_from_row([]), do: %{}
defp annotations_from_row(annotations), do: annotations
end
end
7 changes: 7 additions & 0 deletions priv/event_store/migrations/v1.4.0.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
-- Add annotations field to subscriptions table

ALTER TABLE subscriptions
ADD COLUMN annotations jsonb DEFAULT '{}'::jsonb NOT NULL;

-- Add index on annotations for better query performance
CREATE INDEX subscriptions_annotations_idx ON subscriptions USING gin (annotations);
37 changes: 33 additions & 4 deletions test/storage/subscription_persistence_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -69,16 +69,44 @@ defmodule EventStore.Storage.SubscriptionPersistenceTest do
verify_subscription(subscription, 1)
end

test "create subscription with annotations", context do
annotations = %{"key" => "value", "nested" => %{"data" => 123}}
{:ok, subscription} = subscribe_to_stream(context, annotations: annotations)

verify_subscription(subscription, nil, annotations)
end

test "create subscription when already exists preserves annotations", context do
annotations = %{"key" => "value", "metadata" => %{"version" => 1}}
{:ok, subscription1} = subscribe_to_stream(context, annotations: annotations)

# No annotations provided
{:ok, subscription2} = subscribe_to_stream(context)
# Explicit nil
{:ok, subscription3} = subscribe_to_stream(context, annotations: nil)
# Empty map
{:ok, subscription4} = subscribe_to_stream(context, annotations: %{})

assert subscription1.subscription_id == subscription2.subscription_id
assert subscription2.subscription_id == subscription3.subscription_id
assert subscription3.subscription_id == subscription4.subscription_id
assert subscription1.annotations == annotations
assert subscription2.annotations == annotations
assert subscription3.annotations == annotations
assert subscription4.annotations == annotations
end

def ack_last_seen_event(context, last_seen) do
%{conn: conn, schema: schema} = context

Storage.ack_last_seen_event(conn, @all_stream, @subscription_name, last_seen, schema: schema)
end

defp subscribe_to_stream(context) do
defp subscribe_to_stream(context, opts \\ []) do
%{conn: conn, schema: schema} = context
opts = Keyword.merge([schema: schema], opts)

Storage.subscribe_to_stream(conn, @all_stream, @subscription_name, schema: schema)
Storage.subscribe_to_stream(conn, @all_stream, @subscription_name, opts)
end

defp delete_subscription(context) do
Expand All @@ -93,13 +121,14 @@ defmodule EventStore.Storage.SubscriptionPersistenceTest do
Storage.subscriptions(conn, schema: schema)
end

defp verify_subscription(subscription, last_seen \\ nil)
defp verify_subscription(subscription, last_seen \\ nil, annotations \\ %{})

defp verify_subscription(subscription, last_seen) do
defp verify_subscription(subscription, last_seen, annotations) do
assert subscription.subscription_id > 0
assert subscription.stream_uuid == @all_stream
assert subscription.subscription_name == @subscription_name
assert subscription.last_seen == last_seen
assert subscription.created_at != nil
assert subscription.annotations == annotations
end
end

0 comments on commit f2080f6

Please sign in to comment.