From 3bf9201855b2c6f67330cc2fe0c27e8b8cfc44a5 Mon Sep 17 00:00:00 2001 From: Eddie Maldonado Date: Mon, 30 Oct 2023 14:45:08 -0400 Subject: [PATCH] Realtime vehicles / alerts PubSub (#2265) * refactor: move lookup key to socket assign * refactor: use PubSub for Realtime.Server * refactor: remove unnecessary Map.merge in calls to Socket.assign Co-authored-by: Kayla Firestack * refactor(tests): use factor rather than constant --------- Co-authored-by: Kayla Firestack --- lib/realtime/server.ex | 124 +++++----- lib/realtime/supervisor.ex | 1 + lib/skate_web/channels/alerts_channel.ex | 12 +- lib/skate_web/channels/vehicle_channel.ex | 39 +++- lib/skate_web/channels/vehicles_channel.ex | 54 +++-- .../channels/vehicles_search_channel.ex | 22 +- test/realtime/server_test.exs | 214 ++++++++++-------- .../channels/alerts_channel_test.exs | 4 +- .../channels/vehicle_channel_test.exs | 39 +++- .../channels/vehicles_channel_test.exs | 12 +- .../channels/vehicles_search_channel_test.exs | 5 +- 11 files changed, 328 insertions(+), 198 deletions(-) diff --git a/lib/realtime/server.ex b/lib/realtime/server.ex index abf72df81..fa749faa2 100644 --- a/lib/realtime/server.ex +++ b/lib/realtime/server.ex @@ -81,6 +81,9 @@ defmodule Realtime.Server do @spec default_name() :: GenServer.name() def default_name(), do: Realtime.Server + @spec pubsub_name() :: Phoenix.PubSub.t() + def pubsub_name(), do: Realtime.PubSub + @spec start_link(Keyword.t()) :: GenServer.on_start() def start_link(start_link_opts) do GenServer.start_link(__MODULE__, [], start_link_opts) @@ -93,69 +96,92 @@ defmodule Realtime.Server do ``` Those `lookup_args` can be passed into `RealTime.Server.lookup(lookup_args)/1` to get the data. """ - @spec subscribe_to_route(Route.id(), GenServer.server()) :: [VehicleOrGhost.t()] + @spec subscribe_to_route(Route.id(), GenServer.server()) :: + {subscription_key(), [VehicleOrGhost.t()]} def subscribe_to_route(route_id, server \\ default_name()) do - subscribe(server, {:route_id, route_id}) + subscription_key = {:route_id, route_id} + {subscription_key, subscribe(server, subscription_key)} end - @spec subscribe_to_all_shuttles(GenServer.server()) :: [Vehicle.t()] + @spec subscribe_to_all_shuttles(GenServer.server()) :: {subscription_key(), [Vehicle.t()]} def subscribe_to_all_shuttles(server \\ default_name()) do - subscribe(server, :all_shuttles) + subscription_key = :all_shuttles + {subscription_key, subscribe(server, subscription_key)} end - @spec subscribe_to_search(search_params(), GenServer.server()) :: [VehicleOrGhost.t()] + @spec subscribe_to_search(search_params(), GenServer.server()) :: + {subscription_key(), [VehicleOrGhost.t()]} def subscribe_to_search(search_params, server \\ default_name()) do - subscribe(server, {:search, search_params}) + subscription_key = {:search, search_params} + {subscription_key, subscribe(server, subscription_key)} end @spec subscribe_to_limited_search(search_params(), GenServer.server()) :: - limited_search_result() + {subscription_key(), limited_search_result()} def subscribe_to_limited_search(search_params, server \\ default_name()) do - subscribe(server, {:limited_search, search_params}) + subscription_key = {:limited_search, search_params} + {subscription_key, subscribe(server, subscription_key)} end @spec update_limited_search_subscription(search_params(), GenServer.server()) :: - limited_search_result() + {subscription_key(), limited_search_result()} def update_limited_search_subscription(search_params, server \\ default_name()) do - update_subscription(server, {:limited_search, search_params}) + subscription_key = {:limited_search, search_params} + {subscription_key, update_subscription(server, subscription_key)} end - @spec subscribe_to_vehicle(String.t(), GenServer.server()) :: [VehicleOrGhost.t()] + @spec subscribe_to_vehicle(String.t(), GenServer.server()) :: + {subscription_key(), [VehicleOrGhost.t()]} def subscribe_to_vehicle(vehicle_id, server \\ default_name()) do - subscribe( - server, - {:vehicle, vehicle_id} - ) + subscription_key = {:vehicle, vehicle_id} + + {subscription_key, + subscribe( + server, + subscription_key + )} end - @spec subscribe_to_vehicle_with_logged_out(String.t(), GenServer.server()) :: [ - VehicleOrGhost.t() - ] + @spec subscribe_to_vehicle_with_logged_out(String.t(), GenServer.server()) :: + {subscription_key(), + [ + VehicleOrGhost.t() + ]} def subscribe_to_vehicle_with_logged_out(vehicle_id, server \\ default_name()) do - subscribe( - server, - {:vehicle_with_logged_out, vehicle_id} - ) + subscription_key = {:vehicle_with_logged_out, vehicle_id} + + {subscription_key, + subscribe( + server, + subscription_key + )} end - @spec subscribe_to_run_ids([Run.id()], GenServer.server()) :: [VehicleOrGhost.t()] + @spec subscribe_to_run_ids([Run.id()], GenServer.server()) :: + {subscription_key(), [VehicleOrGhost.t()]} def subscribe_to_run_ids(run_ids, server \\ default_name()) do - subscribe(server, {:run_ids, run_ids}) + subscription_key = {:run_ids, run_ids} + {subscription_key, subscribe(server, subscription_key)} end - @spec subscribe_to_block_ids([Block.id()], GenServer.server()) :: [VehicleOrGhost.t()] + @spec subscribe_to_block_ids([Block.id()], GenServer.server()) :: + {subscription_key(), [VehicleOrGhost.t()]} def subscribe_to_block_ids(block_ids, server \\ default_name()) do - subscribe(server, {:block_ids, block_ids}) + subscription_key = {:block_ids, block_ids} + {subscription_key, subscribe(server, subscription_key)} end - @spec subscribe_to_all_pull_backs(GenServer.server()) :: [VehicleOrGhost.t()] + @spec subscribe_to_all_pull_backs(GenServer.server()) :: + {subscription_key(), [VehicleOrGhost.t()]} def subscribe_to_all_pull_backs(server \\ default_name()) do - subscribe(server, :all_pull_backs) + subscription_key = :all_pull_backs + {subscription_key, subscribe(server, subscription_key)} end - @spec subscribe_to_alerts(Route.id(), GenServer.server()) :: [String.t()] + @spec subscribe_to_alerts(Route.id(), GenServer.server()) :: {subscription_key(), [String.t()]} def subscribe_to_alerts(route_id, server \\ default_name()) do - subscribe(server, {:alerts, route_id}) + subscription_key = {:alerts, route_id} + {subscription_key, subscribe(server, subscription_key)} end def peek_at_vehicles_by_run_ids(run_ids, server \\ default_name()) do @@ -187,17 +213,20 @@ defmodule Realtime.Server do @spec subscribe(GenServer.server(), {:run_ids, [Run.id()]}) :: [VehicleOrGhost.t()] @spec subscribe(GenServer.server(), {:block_ids, [Block.id()]}) :: [VehicleOrGhost.t()] @spec subscribe(GenServer.server(), {:alerts, Route.id()}) :: [String.t()] + defp subscribe(server, {:alerts, _route_id} = subscription_key) do + {pubsub, ets} = GenServer.call(server, :subscription_info) + Phoenix.PubSub.subscribe(pubsub, "realtime_alerts") + lookup({ets, subscription_key}) + end + defp subscribe(server, subscription_key) do - {registry_key, ets} = GenServer.call(server, :subscription_info) - Registry.register(Realtime.Registry, registry_key, subscription_key) + {pubsub, ets} = GenServer.call(server, :subscription_info) + Phoenix.PubSub.subscribe(pubsub, "realtime_vehicles") lookup({ets, subscription_key}) end defp update_subscription(server, {:limited_search, _search_params} = subscription_key) do - {registry_key, ets} = GenServer.call(server, :subscription_info) - # Replace the old search subscription with the new one - Registry.unregister_match(Realtime.Registry, registry_key, {:limited_search, %{}}) - Registry.register(Realtime.Registry, registry_key, subscription_key) + {_pubsub, ets} = GenServer.call(server, :subscription_info) lookup({ets, subscription_key}) end @@ -378,8 +407,7 @@ defmodule Realtime.Server do @impl true def handle_call(:subscription_info, _from, %__MODULE__{} = state) do - registry_key = self() - {:reply, {registry_key, state.ets}, state} + {:reply, {pubsub_name(), state.ets}, state} end def handle_call(:ets, _from, %__MODULE__{ets: ets} = state) do @@ -518,21 +546,13 @@ defmodule Realtime.Server do @spec broadcast(t(), :vehicles | :alerts) :: :ok defp broadcast(state, data_type) do - registry_key = self() - - Registry.dispatch(Realtime.Supervisor.registry_name(), registry_key, fn entries -> - Enum.each(entries, fn {pid, subscripition_key} -> - if (data_type == :alerts and match?({:alerts, _}, subscripition_key)) or - (data_type == :vehicles and !match?({:alerts, _}, subscripition_key)) do - send_data({pid, subscripition_key}, state) - end - end) - end) - end + topic = + case data_type do + :vehicles -> "realtime_vehicles" + :alerts -> "realtime_alerts" + end - @spec send_data({pid, subscription_key}, t) :: broadcast_message - defp send_data({pid, subscription_key}, state) do - send(pid, {:new_realtime_data, {state.ets, subscription_key}}) + Phoenix.PubSub.broadcast(pubsub_name(), topic, {:new_realtime_data, state.ets}) end @spec block_is_active?(VehicleOrGhost.t()) :: boolean diff --git a/lib/realtime/supervisor.ex b/lib/realtime/supervisor.ex index 5da03874c..646057ea7 100644 --- a/lib/realtime/supervisor.ex +++ b/lib/realtime/supervisor.ex @@ -13,6 +13,7 @@ defmodule Realtime.Supervisor do children = [ {Registry, keys: :duplicate, name: registry_name()}, {Realtime.BlockWaiverStore, name: Realtime.BlockWaiverStore.default_name()}, + {Phoenix.PubSub, name: Realtime.Server.pubsub_name()}, {Realtime.Server, name: Realtime.Server.default_name()}, {Realtime.TrainVehiclesPubSub, name: Realtime.TrainVehiclesPubSub.default_name()}, {Realtime.DataStatusPubSub, name: Realtime.DataStatusPubSub.default_name()}, diff --git a/lib/skate_web/channels/alerts_channel.ex b/lib/skate_web/channels/alerts_channel.ex index af1634a01..af39a7029 100644 --- a/lib/skate_web/channels/alerts_channel.ex +++ b/lib/skate_web/channels/alerts_channel.ex @@ -7,13 +7,17 @@ defmodule SkateWeb.AlertsChannel do @impl SkateWeb.AuthenticatedChannel def join_authenticated("alerts:route:" <> route_id, _message, socket) do - alerts = Duration.log_duration(Server, :subscribe_to_alerts, [route_id]) - {:ok, %{data: alerts}, socket} + {lookup_key, alerts} = Duration.log_duration(Server, :subscribe_to_alerts, [route_id]) + + {:ok, %{data: alerts}, Phoenix.Socket.assign(socket, lookup_key: lookup_key)} end @impl SkateWeb.AuthenticatedChannel - def handle_info_authenticated({:new_realtime_data, lookup_args}, socket) do - data = Server.lookup(lookup_args) + def handle_info_authenticated({:new_realtime_data, ets}, socket) do + lookup_key = Map.get(socket.assigns, :lookup_key) + + data = Server.lookup({ets, lookup_key}) + :ok = push(socket, "alerts", %{data: data}) {:noreply, socket} end diff --git a/lib/skate_web/channels/vehicle_channel.ex b/lib/skate_web/channels/vehicle_channel.ex index 0e11fbd2c..471b4265a 100644 --- a/lib/skate_web/channels/vehicle_channel.ex +++ b/lib/skate_web/channels/vehicle_channel.ex @@ -5,8 +5,11 @@ defmodule SkateWeb.VehicleChannel do alias Realtime.Server @impl SkateWeb.AuthenticatedChannel - def handle_info_authenticated({:new_realtime_data, lookup_params}, socket) do - vehicle_or_ghost = Realtime.Server.lookup(lookup_params) + def handle_info_authenticated({:new_realtime_data, ets}, socket) do + lookup_key = socket.assigns[:lookup_key] + + vehicle_or_ghost = Realtime.Server.lookup({ets, lookup_key}) + :ok = push(socket, "vehicle", %{data: List.first(vehicle_or_ghost)}) {:noreply, socket} @@ -15,11 +18,17 @@ defmodule SkateWeb.VehicleChannel do @impl SkateWeb.AuthenticatedChannel def join_authenticated("vehicle:run_ids:" <> run_ids, _message, socket) do run_ids = String.split(run_ids, ",") + vehicle_or_ghost = Realtime.Server.peek_at_vehicles_by_run_ids(run_ids) |> List.first() - if vehicle_or_ghost do - _ = Server.subscribe_to_vehicle(vehicle_or_ghost.id) - end + socket = + if vehicle_or_ghost do + {lookup_key, _vehicle_or_ghost} = Server.subscribe_to_vehicle(vehicle_or_ghost.id) + + Phoenix.Socket.assign(socket, lookup_key: lookup_key) + else + socket + end {:ok, %{data: vehicle_or_ghost}, socket} end @@ -36,13 +45,23 @@ defmodule SkateWeb.VehicleChannel do Realtime.Server.peek_at_vehicle_by_id(vehicle_or_ghost_id) |> List.first() end - if vehicle_or_ghost do - if user_in_test_group? do - _ = Server.subscribe_to_vehicle_with_logged_out(vehicle_or_ghost.id) + {lookup_key, _vehicle_or_ghost} = + if vehicle_or_ghost do + if user_in_test_group? do + Server.subscribe_to_vehicle_with_logged_out(vehicle_or_ghost.id) + else + Server.subscribe_to_vehicle(vehicle_or_ghost.id) + end + else + {nil, nil} + end + + socket = + if is_nil(lookup_key) do + socket else - _ = Server.subscribe_to_vehicle(vehicle_or_ghost.id) + Phoenix.Socket.assign(socket, lookup_key: lookup_key) end - end {:ok, %{data: vehicle_or_ghost}, socket} end diff --git a/lib/skate_web/channels/vehicles_channel.ex b/lib/skate_web/channels/vehicles_channel.ex index a977ad47f..98d26f747 100644 --- a/lib/skate_web/channels/vehicles_channel.ex +++ b/lib/skate_web/channels/vehicles_channel.ex @@ -8,30 +8,40 @@ defmodule SkateWeb.VehiclesChannel do @impl SkateWeb.AuthenticatedChannel def join_authenticated("vehicles:shuttle:all", _message, socket) do - shuttles = Duration.log_duration(Server, :subscribe_to_all_shuttles, []) - {:ok, %{data: shuttles}, socket} + {lookup_key, shuttles} = Duration.log_duration(Server, :subscribe_to_all_shuttles, []) + + {:ok, %{data: shuttles}, Phoenix.Socket.assign(socket, lookup_key: lookup_key)} end def join_authenticated("vehicles:pull_backs:all", _message, socket) do - pull_backs = Duration.log_duration(Server, :subscribe_to_all_pull_backs, []) - {:ok, %{data: pull_backs}, socket} + {lookup_key, pull_backs} = Duration.log_duration(Server, :subscribe_to_all_pull_backs, []) + + {:ok, %{data: pull_backs}, Phoenix.Socket.assign(socket, lookup_key: lookup_key)} end def join_authenticated("vehicles:route:" <> route_id, _message, socket) do - vehicles_and_ghosts = Duration.log_duration(Server, :subscribe_to_route, [route_id]) - {:ok, %{data: vehicles_and_ghosts}, socket} + {lookup_key, vehicles_and_ghosts} = + Duration.log_duration(Server, :subscribe_to_route, [route_id]) + + {:ok, %{data: vehicles_and_ghosts}, Phoenix.Socket.assign(socket, lookup_key: lookup_key)} end def join_authenticated("vehicles:run_ids:" <> run_ids, _message, socket) do run_ids = String.split(run_ids, ",") - vehicles_and_ghosts = Duration.log_duration(Server, :subscribe_to_run_ids, [run_ids]) - {:ok, %{data: vehicles_and_ghosts}, socket} + + {lookup_key, vehicles_and_ghosts} = + Duration.log_duration(Server, :subscribe_to_run_ids, [run_ids]) + + {:ok, %{data: vehicles_and_ghosts}, Phoenix.Socket.assign(socket, lookup_key: lookup_key)} end def join_authenticated("vehicles:block_ids:" <> block_ids, _message, socket) do block_ids = String.split(block_ids, ",") - vehicles_and_ghosts = Duration.log_duration(Server, :subscribe_to_block_ids, [block_ids]) - {:ok, %{data: vehicles_and_ghosts}, socket} + + {lookup_key, vehicles_and_ghosts} = + Duration.log_duration(Server, :subscribe_to_block_ids, [block_ids]) + + {:ok, %{data: vehicles_and_ghosts}, Phoenix.Socket.assign(socket, lookup_key: lookup_key)} end def join_authenticated( @@ -76,9 +86,9 @@ defmodule SkateWeb.VehiclesChannel do "User=#{username} searched for property=#{subscribe_args.property}, text=#{subscribe_args.text}" end) - vehicles = Duration.log_duration(Server, :subscribe_to_search, [subscribe_args]) + {lookup_key, vehicles} = Duration.log_duration(Server, :subscribe_to_search, [subscribe_args]) - {:ok, %{data: vehicles}, socket} + {:ok, %{data: vehicles}, Phoenix.Socket.assign(socket, lookup_key: lookup_key)} end def join_authenticated(topic, _message, _socket) do @@ -86,16 +96,20 @@ defmodule SkateWeb.VehiclesChannel do end @impl SkateWeb.AuthenticatedChannel - def handle_info_authenticated({:new_realtime_data, lookup_args}, socket) do - event_name = event_name(lookup_args) - data = Server.lookup(lookup_args) + def handle_info_authenticated({:new_realtime_data, ets}, socket) do + lookup_key = socket.assigns[:lookup_key] + + data = Server.lookup({ets, lookup_key}) + + event_name = event_name(lookup_key) :ok = push(socket, event_name, %{data: data}) + {:noreply, socket} end - @spec event_name(Server.lookup_key()) :: String.t() - defp event_name({_ets, :all_shuttles}), do: "shuttles" - defp event_name({_ets, :all_pull_backs}), do: "pull_backs" - defp event_name({_ets, {:search, _}}), do: "search" - defp event_name({_ets, _}), do: "vehicles" + @spec event_name(Server.subscription_key()) :: String.t() + defp event_name(:all_shuttles), do: "shuttles" + defp event_name(:all_pull_backs), do: "pull_backs" + defp event_name({:search, _}), do: "search" + defp event_name(_), do: "vehicles" end diff --git a/lib/skate_web/channels/vehicles_search_channel.ex b/lib/skate_web/channels/vehicles_search_channel.ex index 6092e708f..957971c1e 100644 --- a/lib/skate_web/channels/vehicles_search_channel.ex +++ b/lib/skate_web/channels/vehicles_search_channel.ex @@ -42,9 +42,10 @@ defmodule SkateWeb.VehiclesSearchChannel do "#{__MODULE__} limited_search User=#{username} searched for property=#{subscribe_args.property}, text=#{subscribe_args.text}" end) - result = Duration.log_duration(Server, :subscribe_to_limited_search, [subscribe_args]) + {lookup_key, result} = + Duration.log_duration(Server, :subscribe_to_limited_search, [subscribe_args]) - {:ok, %{data: result}, socket} + {:ok, %{data: result}, Phoenix.Socket.assign(socket, lookup_key: lookup_key)} end @impl SkateWeb.AuthenticatedChannel @@ -67,7 +68,7 @@ defmodule SkateWeb.VehiclesSearchChannel do %{property: property, text: text} = search_params_from_subtopic(subtopic) - result = + {lookup_key, result} = Duration.log_duration(Server, :update_limited_search_subscription, [ %{ property: property, @@ -81,7 +82,7 @@ defmodule SkateWeb.VehiclesSearchChannel do "#{__MODULE__} limited_search User=#{username} updated limit for property=#{property}limit=#{limit}" end) - {:reply, {:ok, %{data: result}}, socket} + {:reply, {:ok, %{data: result}}, Phoenix.Socket.assign(socket, lookup_key: lookup_key)} end defp search_params_from_subtopic(subtopic) do @@ -90,13 +91,16 @@ defmodule SkateWeb.VehiclesSearchChannel do end @impl SkateWeb.AuthenticatedChannel - def handle_info_authenticated({:new_realtime_data, lookup_args}, socket) do - event_name = event_name(lookup_args) - data = Server.lookup(lookup_args) + def handle_info_authenticated({:new_realtime_data, ets}, socket) do + lookup_key = socket.assigns[:lookup_key] + + event_name = event_name(lookup_key) + data = Server.lookup({ets, lookup_key}) + :ok = push(socket, event_name, %{data: data}) {:noreply, socket} end - @spec event_name(Server.lookup_key()) :: String.t() - defp event_name({_ets, {:limited_search, _}}), do: "limited_search" + @spec event_name(Server.subscription_key()) :: String.t() + defp event_name({:limited_search, _}), do: "limited_search" end diff --git a/test/realtime/server_test.exs b/test/realtime/server_test.exs index def523ee6..3ba831e91 100644 --- a/test/realtime/server_test.exs +++ b/test/realtime/server_test.exs @@ -73,7 +73,7 @@ defmodule Realtime.ServerTest do } setup do - start_supervised({Registry, keys: :duplicate, name: Realtime.Supervisor.registry_name()}) + start_supervised({Phoenix.PubSub, name: Realtime.Server.pubsub_name()}) :ok end @@ -111,26 +111,28 @@ defmodule Realtime.ServerTest do end test "clients get vehicles when subscribing", %{server_pid: server_pid} do - vehicles_and_ghosts = Server.subscribe_to_route("1", server_pid) + {lookup_key, vehicles_and_ghosts} = Server.subscribe_to_route("1", server_pid) + assert vehicles_and_ghosts == [@vehicle, @ghost] + assert lookup_key == {:route_id, "1"} end test "clients subscribed to a route get data pushed to them", %{server_pid: server_pid} do - Server.subscribe_to_route("1", server_pid) + {lookup_key, _} = Server.subscribe_to_route("1", server_pid) Server.update_vehicles({@vehicles_by_route_id, [], []}, server_pid) assert_receive( - {:new_realtime_data, lookup_args}, + {:new_realtime_data, ets}, 200, "Client didn't receive vehicle positions" ) - assert Server.lookup(lookup_args) == [@vehicle, @ghost] + assert Server.lookup({ets, lookup_key}) == [@vehicle, @ghost] end test "clients subscribed to a route get repeated messages", %{server_pid: server_pid} do - Server.subscribe_to_route("1", server_pid) + {lookup_key, _} = Server.subscribe_to_route("1", server_pid) Server.update_vehicles({@vehicles_by_route_id, [], []}, server_pid) @@ -143,40 +145,40 @@ defmodule Realtime.ServerTest do Server.update_vehicles({@vehicles_by_route_id, [], []}, server_pid) assert_receive( - {:new_realtime_data, lookup_args}, + {:new_realtime_data, ets}, 200, "Client didn't receive vehicle positions the second time" ) - assert Server.lookup(lookup_args) == [@vehicle, @ghost] + assert Server.lookup({ets, lookup_key}) == [@vehicle, @ghost] end test "inactive routes have all their vehicle data removed", %{server_pid: server_pid} do - Server.subscribe_to_route("1", server_pid) + {lookup_key, _} = Server.subscribe_to_route("1", server_pid) Server.update_vehicles({%{}, [], []}, server_pid) assert_receive( - {:new_realtime_data, lookup_args}, + {:new_realtime_data, ets}, 200, "Client received vehicle positions" ) - assert Server.lookup(lookup_args) == [] + assert Server.lookup({ets, lookup_key}) == [] end test "vehicles on inactive blocks are removed", %{server_pid: server_pid} do - Server.subscribe_to_route("1", server_pid) + {lookup_key, _} = Server.subscribe_to_route("1", server_pid) Server.update_vehicles({%{"1" => [@vehicle_on_inactive_block]}, [], []}, server_pid) assert_receive( - {:new_realtime_data, lookup_args}, + {:new_realtime_data, ets}, 200, "Client received vehicle positions" ) - assert Server.lookup(lookup_args) == [] + assert Server.lookup({ets, lookup_key}) == [] end end @@ -190,33 +192,36 @@ defmodule Realtime.ServerTest do end test "clients get vehicles when subscribing", %{server_pid: server_pid} do - vehicles_and_ghosts = Server.subscribe_to_block_ids([@vehicle.block_id], server_pid) + {lookup_key, vehicles_and_ghosts} = + Server.subscribe_to_block_ids([@vehicle.block_id], server_pid) + assert vehicles_and_ghosts == [@vehicle] + assert lookup_key == {:block_ids, [@vehicle.block_id]} end test "can subscribe to multiple block IDs", %{server_pid: server_pid} do - vehicles_and_ghosts = + {_, vehicles_and_ghosts} = Server.subscribe_to_block_ids([@vehicle.block_id, @ghost.block_id], server_pid) assert vehicles_and_ghosts == [@vehicle, @ghost] end test "clients subscribed to a route get data pushed to them", %{server_pid: server_pid} do - Server.subscribe_to_block_ids([@vehicle.block_id], server_pid) + {lookup_key, _} = Server.subscribe_to_block_ids([@vehicle.block_id], server_pid) Server.update_vehicles({@vehicles_by_route_id, [], []}, server_pid) assert_receive( - {:new_realtime_data, lookup_args}, + {:new_realtime_data, ets}, 200, "Client didn't receive vehicle positions" ) - assert Server.lookup(lookup_args) == [@vehicle] + assert Server.lookup({ets, lookup_key}) == [@vehicle] end test "clients subscribed to block IDs get repeated messages", %{server_pid: server_pid} do - Server.subscribe_to_block_ids([@vehicle.block_id], server_pid) + {lookup_key, _} = Server.subscribe_to_block_ids([@vehicle.block_id], server_pid) Server.update_vehicles({@vehicles_by_route_id, [], []}, server_pid) @@ -229,26 +234,26 @@ defmodule Realtime.ServerTest do Server.update_vehicles({@vehicles_by_route_id, [], []}, server_pid) assert_receive( - {:new_realtime_data, lookup_args}, + {:new_realtime_data, ets}, 200, "Client didn't receive vehicle positions the second time" ) - assert Server.lookup(lookup_args) == [@vehicle] + assert Server.lookup({ets, lookup_key}) == [@vehicle] end test "inactive blocks have all their vehicle data removed", %{server_pid: server_pid} do - Server.subscribe_to_block_ids([@vehicle.block_id], server_pid) + {lookup_key, _} = Server.subscribe_to_block_ids([@vehicle.block_id], server_pid) Server.update_vehicles({%{}, [], []}, server_pid) assert_receive( - {:new_realtime_data, lookup_args}, + {:new_realtime_data, ets}, 200, "Client received vehicle positions" ) - assert Server.lookup(lookup_args) == [] + assert Server.lookup({ets, lookup_key}) == [] end end @@ -262,18 +267,20 @@ defmodule Realtime.ServerTest do end test "clients get all pull-backs upon subscribing", %{server_pid: pid} do - assert Server.subscribe_to_all_pull_backs(pid) == [@pull_back_vehicle] + {lookup_key, pull_back_vehicles} = Server.subscribe_to_all_pull_backs(pid) + assert pull_back_vehicles == [@pull_back_vehicle] + assert lookup_key == :all_pull_backs end test "clients get updated data pushed to them", %{server_pid: pid} do - Server.subscribe_to_all_pull_backs(pid) + {lookup_key, _} = Server.subscribe_to_all_pull_backs(pid) updated_pull_back_vehicle = %{@pull_back_vehicle | timestamp: 2} Server.update_vehicles({%{"2" => [updated_pull_back_vehicle]}, [], []}, pid) - assert_receive {:new_realtime_data, lookup_args} - assert Server.lookup(lookup_args) == [updated_pull_back_vehicle] + assert_receive {:new_realtime_data, ets} + assert Server.lookup({ets, lookup_key}) == [updated_pull_back_vehicle] end end @@ -287,16 +294,18 @@ defmodule Realtime.ServerTest do end test "clients get all shuttles upon subscribing", %{server_pid: pid} do - assert Server.subscribe_to_all_shuttles(pid) == [@shuttle] + {lookup_key, shuttles} = Server.subscribe_to_all_shuttles(pid) + assert shuttles == [@shuttle] + assert lookup_key == :all_shuttles end test "clients get updated data pushed to them", %{server_pid: pid} do - Server.subscribe_to_all_shuttles(pid) + {lookup_key, _} = Server.subscribe_to_all_shuttles(pid) Server.update_vehicles({%{}, [@shuttle, @shuttle], []}, pid) - assert_receive {:new_realtime_data, lookup_args} - assert Server.lookup(lookup_args) == [@shuttle, @shuttle] + assert_receive {:new_realtime_data, ets} + assert Server.lookup({ets, lookup_key}) == [@shuttle, @shuttle] end end @@ -310,16 +319,18 @@ defmodule Realtime.ServerTest do end test "clients get vehicle by ID upon subscribing", %{server_pid: pid} do - assert Server.subscribe_to_vehicle(@vehicle.id, pid) == [@vehicle] + {lookup_key, vehicle} = Server.subscribe_to_vehicle(@vehicle.id, pid) + assert vehicle == [@vehicle] + assert lookup_key == {:vehicle, @vehicle.id} end test "clients get updated data pushed to them", %{server_pid: pid} do - Server.subscribe_to_vehicle(@vehicle.id, pid) + {lookup_key, _} = Server.subscribe_to_vehicle(@vehicle.id, pid) Server.update_vehicles({@vehicles_by_route_id, [], []}, pid) - assert_receive {:new_realtime_data, lookup_args} - assert Server.lookup(lookup_args) == [@vehicle] + assert_receive {:new_realtime_data, ets} + assert Server.lookup({ets, lookup_key}) == [@vehicle] end end @@ -333,18 +344,23 @@ defmodule Realtime.ServerTest do end test "clients get vehicle by ID upon subscribing", %{server_pid: pid} do - assert Server.subscribe_to_vehicle_with_logged_out(@logged_out_vehicle.id, pid) == [ + {lookup_key, logged_out_vehicles} = + Server.subscribe_to_vehicle_with_logged_out(@logged_out_vehicle.id, pid) + + assert logged_out_vehicles == [ @logged_out_vehicle ] + + assert lookup_key == {:vehicle_with_logged_out, @logged_out_vehicle.id} end test "clients get updated data pushed to them", %{server_pid: pid} do - Server.subscribe_to_vehicle_with_logged_out(@logged_out_vehicle.id, pid) + {lookup_key, _} = Server.subscribe_to_vehicle_with_logged_out(@logged_out_vehicle.id, pid) Server.update_vehicles({%{}, [], [@logged_out_vehicle]}, pid) - assert_receive {:new_realtime_data, lookup_args} - assert Server.lookup(lookup_args) == [@logged_out_vehicle] + assert_receive {:new_realtime_data, ets} + assert Server.lookup({ets, lookup_key}) == [@logged_out_vehicle] end end @@ -358,47 +374,49 @@ defmodule Realtime.ServerTest do end test "clients get search results upon subscribing", %{server_pid: pid} do - results = Server.subscribe_to_search(%{property: :all, text: "90"}, pid) + {lookup_key, results} = Server.subscribe_to_search(%{property: :all, text: "90"}, pid) assert Enum.member?(results, @vehicle) assert Enum.member?(results, @ghost) assert Enum.member?(results, @shuttle) + assert lookup_key == {:search, %{property: :all, text: "90"}} end test "clients get updated search results pushed to them", %{server_pid: pid} do - Server.subscribe_to_search(%{property: :all, text: "90"}, pid) + {lookup_key, _} = Server.subscribe_to_search(%{property: :all, text: "90"}, pid) Server.update_vehicles({%{}, [@shuttle], []}, pid) - assert_receive {:new_realtime_data, lookup_args} - assert Server.lookup(lookup_args) == [@shuttle] + assert_receive {:new_realtime_data, ets} + assert Server.lookup({ets, lookup_key}) == [@shuttle] end test "does not receive duplicate vehicles", %{server_pid: pid} do - Server.subscribe_to_search(%{property: :all, text: "90"}, pid) + {lookup_key, _} = Server.subscribe_to_search(%{property: :all, text: "90"}, pid) Server.update_vehicles({%{}, [@shuttle, @shuttle], []}, pid) - assert_receive {:new_realtime_data, lookup_args} - assert Server.lookup(lookup_args) == [@shuttle] + assert_receive {:new_realtime_data, ets} + assert Server.lookup({ets, lookup_key}) == [@shuttle] end test "vehicles on inactive blocks are included", %{server_pid: pid} do - Server.subscribe_to_search(%{property: :vehicle, text: "v2-label"}, pid) + {lookup_key, _} = Server.subscribe_to_search(%{property: :vehicle, text: "v2-label"}, pid) Server.update_vehicles({%{"1" => [@vehicle_on_inactive_block]}, [], []}, pid) - assert_receive {:new_realtime_data, lookup_args} + assert_receive {:new_realtime_data, ets} - assert Server.lookup(lookup_args) == [@vehicle_on_inactive_block] + assert Server.lookup({ets, lookup_key}) == [@vehicle_on_inactive_block] end test "logged out vehicles are returned when include_logged_out_vehicles is true", %{server_pid: pid} do - Server.subscribe_to_search( - %{property: :vehicle, text: "123", include_logged_out_vehicles: true}, - pid - ) + {lookup_key, _} = + Server.subscribe_to_search( + %{property: :vehicle, text: "123", include_logged_out_vehicles: true}, + pid + ) logged_in_vehicle = build(:vehicle, id: "y1235", label: "1235", route_id: "1", run_id: "run_id") @@ -412,13 +430,13 @@ defmodule Realtime.ServerTest do pid ) - assert_receive {:new_realtime_data, lookup_args} - assert Server.lookup(lookup_args) == [logged_in_vehicle, logged_out_vehicle] + assert_receive {:new_realtime_data, ets} + assert Server.lookup({ets, lookup_key}) == [logged_in_vehicle, logged_out_vehicle] end test "logged out vehicles are not returned when include_logged_out_vehicles is not set", %{server_pid: pid} do - Server.subscribe_to_search(%{property: :vehicle, text: "123"}, pid) + {lookup_key, _} = Server.subscribe_to_search(%{property: :vehicle, text: "123"}, pid) logged_in_vehicle = build(:vehicle, id: "y1235", label: "1235", route_id: "1", run_id: "run_id") @@ -427,9 +445,9 @@ defmodule Realtime.ServerTest do Server.update_vehicles({%{"1" => [logged_in_vehicle]}, [], [logged_out_vehicle]}, pid) - assert_receive {:new_realtime_data, lookup_args} + assert_receive {:new_realtime_data, ets} - assert Server.lookup(lookup_args) == [logged_in_vehicle] + assert Server.lookup({ets, lookup_key}) == [logged_in_vehicle] end end @@ -443,49 +461,56 @@ defmodule Realtime.ServerTest do end test "clients get limited search results upon subscribing", %{server_pid: pid} do - assert %{matching_vehicles: [@ghost], has_more_matches: true} == - Server.subscribe_to_limited_search(%{property: :all, text: "90", limit: 1}, pid) + {lookup_key, results} = + Server.subscribe_to_limited_search(%{property: :all, text: "90", limit: 1}, pid) + + assert results == %{matching_vehicles: [@ghost], has_more_matches: true} + assert lookup_key == {:limited_search, %{property: :all, text: "90", limit: 1}} end test "clients get updated limited search results pushed to them", %{server_pid: pid} do - Server.subscribe_to_limited_search(%{property: :all, text: "90", limit: 5}, pid) + {lookup_key, _} = + Server.subscribe_to_limited_search(%{property: :all, text: "90", limit: 5}, pid) Server.update_vehicles({%{}, [@shuttle], []}, pid) - assert_receive {:new_realtime_data, lookup_args} + assert_receive {:new_realtime_data, ets} assert %{matching_vehicles: [@shuttle], has_more_matches: false} == - Server.lookup(lookup_args) + Server.lookup({ets, lookup_key}) end test "does not receive duplicate vehicles", %{server_pid: pid} do - Server.subscribe_to_limited_search(%{property: :all, text: "90", limit: 5}, pid) + {lookup_key, _} = + Server.subscribe_to_limited_search(%{property: :all, text: "90", limit: 5}, pid) Server.update_vehicles({%{}, [@shuttle, @shuttle], []}, pid) - assert_receive {:new_realtime_data, lookup_args} + assert_receive {:new_realtime_data, ets} assert %{matching_vehicles: [@shuttle], has_more_matches: false} = - Server.lookup(lookup_args) + Server.lookup({ets, lookup_key}) end test "vehicles on inactive blocks are included", %{server_pid: pid} do - Server.subscribe_to_limited_search(%{property: :vehicle, text: "v2-label", limit: 2}, pid) + {lookup_key, _} = + Server.subscribe_to_limited_search(%{property: :vehicle, text: "v2-label", limit: 2}, pid) Server.update_vehicles({%{"1" => [@vehicle_on_inactive_block]}, [], []}, pid) - assert_receive {:new_realtime_data, lookup_args} + assert_receive {:new_realtime_data, ets} assert %{matching_vehicles: [@vehicle_on_inactive_block], has_more_matches: false} = - Server.lookup(lookup_args) + Server.lookup({ets, lookup_key}) end test "logged out vehicles are returned when include_logged_out_vehicles is true", %{server_pid: pid} do - Server.subscribe_to_limited_search( - %{property: :vehicle, text: "123", include_logged_out_vehicles: true, limit: 4}, - pid - ) + {lookup_key, _} = + Server.subscribe_to_limited_search( + %{property: :vehicle, text: "123", include_logged_out_vehicles: true, limit: 4}, + pid + ) logged_in_vehicle = build(:vehicle, id: "y1235", label: "1235", route_id: "1", run_id: "run_id") @@ -499,17 +524,18 @@ defmodule Realtime.ServerTest do pid ) - assert_receive {:new_realtime_data, lookup_args} + assert_receive {:new_realtime_data, ets} assert %{ matching_vehicles: [logged_in_vehicle, logged_out_vehicle], has_more_matches: false - } == Server.lookup(lookup_args) + } == Server.lookup({ets, lookup_key}) end test "logged out vehicles are not returned when include_logged_out_vehicles is not set", %{server_pid: pid} do - Server.subscribe_to_limited_search(%{property: :vehicle, text: "123", limit: 5}, pid) + {lookup_key, _} = + Server.subscribe_to_limited_search(%{property: :vehicle, text: "123", limit: 5}, pid) logged_in_vehicle = build(:vehicle, id: "y1235", label: "1235", route_id: "1", run_id: "run_id") @@ -518,10 +544,10 @@ defmodule Realtime.ServerTest do Server.update_vehicles({%{"1" => [logged_in_vehicle]}, [], [logged_out_vehicle]}, pid) - assert_receive {:new_realtime_data, lookup_args} + assert_receive {:new_realtime_data, ets} assert %{matching_vehicles: [logged_in_vehicle], has_more_matches: false} == - Server.lookup(lookup_args) + Server.lookup({ets, lookup_key}) end end @@ -539,15 +565,21 @@ defmodule Realtime.ServerTest do first_search_params = %{property: :all, text: "90", limit: 5} second_search_params = %{property: :all, text: "asdf", limit: 5} - Server.subscribe_to_limited_search(first_search_params, pid) + {first_lookup_key, _} = Server.subscribe_to_limited_search(first_search_params, pid) + assert first_lookup_key == {:limited_search, first_search_params} + Server.update_vehicles({%{}, [@shuttle], []}, pid) - assert_receive {:new_realtime_data, {_ets_tid, {:limited_search, ^first_search_params}}} + assert_receive {:new_realtime_data, _ets_tid} + + {second_lookup_key, _} = + Server.update_limited_search_subscription(second_search_params, pid) + + assert second_lookup_key == {:limited_search, second_search_params} - Server.update_limited_search_subscription(second_search_params, pid) Server.update_vehicles({%{}, [@shuttle], []}, pid) - assert_receive {:new_realtime_data, {_ets_tid, {:limited_search, ^second_search_params}}} - refute_receive {:new_realtime_data, {_ets_tid, {:limited_search, ^first_search_params}}} + + assert_receive {:new_realtime_data, _ets_tid} end end @@ -561,16 +593,18 @@ defmodule Realtime.ServerTest do end test "clients get all shuttles upon subscribing", %{server_pid: pid} do - assert Server.subscribe_to_alerts("1", pid) == @alerts_by_route_id["1"] + {lookup_key, alerts} = Server.subscribe_to_alerts("1", pid) + assert alerts == @alerts_by_route_id["1"] + assert lookup_key == {:alerts, "1"} end test "clients get updated data pushed to them", %{server_pid: pid} do - Server.subscribe_to_alerts("1", pid) + {lookup_key, _} = Server.subscribe_to_alerts("1", pid) Server.update_alerts(%{"15" => ["Totally different alert"]}, pid) - assert_receive {:new_realtime_data, lookup_args} - assert Server.lookup(lookup_args) == [] + assert_receive {:new_realtime_data, ets} + assert Server.lookup({ets, lookup_key}) == [] end test "clients subscribed to vehicles don't get updated data pushed to them", %{ @@ -580,7 +614,7 @@ defmodule Realtime.ServerTest do Server.update_alerts(%{"1" => ["Totally different alert"]}, pid) - refute_receive {:new_realtime_data, _lookup_args} + refute_receive {:new_realtime_data, _ets} end end diff --git a/test/skate_web/channels/alerts_channel_test.exs b/test/skate_web/channels/alerts_channel_test.exs index 63be22e1a..a7ce17bc9 100644 --- a/test/skate_web/channels/alerts_channel_test.exs +++ b/test/skate_web/channels/alerts_channel_test.exs @@ -9,7 +9,7 @@ defmodule SkateWeb.AlertsChannelTest do socket = socket(UserSocket) - start_supervised({Registry, keys: :duplicate, name: Realtime.Supervisor.registry_name()}) + start_supervised({Phoenix.PubSub, name: Realtime.Server.pubsub_name()}) start_supervised({Realtime.Server, name: Realtime.Server.default_name()}) {:ok, socket: socket} @@ -54,7 +54,7 @@ defmodule SkateWeb.AlertsChannelTest do assert {:noreply, _socket} = AlertsChannel.handle_info( - {:new_realtime_data, {ets, {:alerts, "1"}}}, + {:new_realtime_data, ets}, socket ) diff --git a/test/skate_web/channels/vehicle_channel_test.exs b/test/skate_web/channels/vehicle_channel_test.exs index aa91356ca..784482e39 100644 --- a/test/skate_web/channels/vehicle_channel_test.exs +++ b/test/skate_web/channels/vehicle_channel_test.exs @@ -59,7 +59,7 @@ defmodule SkateWeb.VehicleChannelTest do |> socket("", %{}) |> Guardian.Phoenix.Socket.put_current_resource(%{id: user.id}) - start_supervised({Registry, keys: :duplicate, name: Realtime.Supervisor.registry_name()}) + start_supervised({Phoenix.PubSub, name: Realtime.Server.pubsub_name()}) start_supervised({Realtime.Server, name: Realtime.Server.default_name()}) {:ok, socket: socket, user: user} @@ -78,6 +78,41 @@ defmodule SkateWeb.VehicleChannelTest do subscribe_and_join(socket, VehicleChannel, "vehicle:run_ids:123-4567") end + test "handles case where vehicle for given run IDs does not exist", %{ + socket: socket + } do + assert Realtime.Server.update_vehicles({%{}, [], []}) == :ok + + expected_payload = %{data: nil} + + assert {:ok, ^expected_payload, %Socket{} = _socket} = + subscribe_and_join(socket, VehicleChannel, "vehicle:run_ids:123-4567") + end + + test "subscribes to the vehicle for given ID when user is not in the test group", %{ + socket: socket + } do + vehicle = build(:vehicle) + + Realtime.Server.update_vehicles({%{vehicle.route_id => [vehicle]}, [], []}) + + expected_payload = %{data: vehicle} + + assert {:ok, ^expected_payload, %Socket{} = _socket} = + subscribe_and_join(socket, VehicleChannel, "vehicle:id:#{vehicle.id}") + end + + test "handles case where the vehicle for given ID does not exist", %{ + socket: socket + } do + Realtime.Server.update_vehicles({%{}, [], []}) + + expected_payload = %{data: nil} + + assert {:ok, ^expected_payload, %Socket{} = _socket} = + subscribe_and_join(socket, VehicleChannel, "vehicle:id:#{@vehicle.id}") + end + test "subscribes to the logged out vehicle for given ID when user is in the test group", %{ socket: socket, user: user @@ -121,7 +156,7 @@ defmodule SkateWeb.VehicleChannelTest do assert {:noreply, _socket} = VehicleChannel.handle_info( - {:new_realtime_data, {ets, {:route_id, "1"}}}, + {:new_realtime_data, ets}, socket ) diff --git a/test/skate_web/channels/vehicles_channel_test.exs b/test/skate_web/channels/vehicles_channel_test.exs index b9966c717..1bbf5f8b8 100644 --- a/test/skate_web/channels/vehicles_channel_test.exs +++ b/test/skate_web/channels/vehicles_channel_test.exs @@ -20,7 +20,7 @@ defmodule SkateWeb.VehiclesChannelTest do |> socket("", %{}) |> Guardian.Phoenix.Socket.put_current_resource(%{id: user.id}) - start_supervised({Registry, keys: :duplicate, name: Realtime.Supervisor.registry_name()}) + start_supervised({Phoenix.PubSub, name: Realtime.Server.pubsub_name()}) start_supervised({Realtime.Server, name: Realtime.Server.default_name()}) {:ok, socket: socket, user: user} @@ -127,7 +127,7 @@ defmodule SkateWeb.VehiclesChannelTest do assert {:noreply, _socket} = VehiclesChannel.handle_info( - {:new_realtime_data, {ets, {:route_id, "1"}}}, + {:new_realtime_data, ets}, socket ) @@ -145,7 +145,7 @@ defmodule SkateWeb.VehiclesChannelTest do assert {:noreply, _socket} = VehiclesChannel.handle_info( - {:new_realtime_data, {ets, :all_shuttles}}, + {:new_realtime_data, ets}, socket ) @@ -165,7 +165,7 @@ defmodule SkateWeb.VehiclesChannelTest do assert {:noreply, _socket} = VehiclesChannel.handle_info( - {:new_realtime_data, {ets, :all_pull_backs}}, + {:new_realtime_data, ets}, socket ) @@ -183,7 +183,7 @@ defmodule SkateWeb.VehiclesChannelTest do assert {:noreply, _socket} = VehiclesChannel.handle_info( - {:new_realtime_data, {ets, {:search, %{text: @vehicle.label, property: :all}}}}, + {:new_realtime_data, ets}, socket ) @@ -240,7 +240,7 @@ defmodule SkateWeb.VehiclesChannelTest do {:stop, :normal, _socket} = VehiclesChannel.handle_info( - {:new_realtime_data, {ets, {:route_id, "1"}}}, + {:new_realtime_data, ets}, socket ) diff --git a/test/skate_web/channels/vehicles_search_channel_test.exs b/test/skate_web/channels/vehicles_search_channel_test.exs index 8c4477392..4720702a2 100644 --- a/test/skate_web/channels/vehicles_search_channel_test.exs +++ b/test/skate_web/channels/vehicles_search_channel_test.exs @@ -20,7 +20,7 @@ defmodule SkateWeb.VehiclesSearchChannelTest do |> socket("", %{}) |> Guardian.Phoenix.Socket.put_current_resource(%{id: user.id}) - start_supervised({Registry, keys: :duplicate, name: Realtime.Supervisor.registry_name()}) + start_supervised({Phoenix.PubSub, name: Realtime.Server.pubsub_name()}) start_supervised({Realtime.Server, name: Realtime.Server.default_name()}) {:ok, @@ -93,8 +93,7 @@ defmodule SkateWeb.VehiclesSearchChannelTest do assert {:noreply, _socket} = VehiclesSearchChannel.handle_info( - {:new_realtime_data, - {ets, {:limited_search, %{property: :vehicle, text: "000", limit: 2}}}}, + {:new_realtime_data, ets}, socket )