diff --git a/lib/walex/events/event_modules.ex b/lib/walex/events/event_modules.ex index e958e0a..0b3b0d6 100755 --- a/lib/walex/events/event_modules.ex +++ b/lib/walex/events/event_modules.ex @@ -32,14 +32,15 @@ defmodule WalEx.Events.EventModules do @impl true def handle_call({:process, txn, server}, _from, state) do - server - |> WalEx.Config.get_configs(:modules) - |> process_events(txn) + resp = + server + |> WalEx.Config.get_configs(:modules) + |> process_events(txn) - {:reply, :ok, state} + {:reply, resp, state} end - defp process_events(nil, %{changes: [], commit_timestamp: _}), do: nil + defp process_events(nil, %{changes: [], commit_timestamp: _}), do: :ok defp process_events(modules, txn) when is_list(modules) do process_modules(modules, txn) @@ -48,16 +49,30 @@ defmodule WalEx.Events.EventModules do defp process_modules(modules, txn) do functions = ~w(process_all process_insert process_update process_delete)a - Enum.each(modules, &process_module(&1, functions, txn)) + Enum.reduce_while(modules, :ok, fn module_name, _acc -> + case process_module(module_name, functions, txn) do + :ok -> {:cont, :ok} + {:ok, _} = term -> {:cont, term} + term -> {:halt, term} + end + end) end defp process_module(module_name, functions, txn) do - Enum.each(functions, &apply_process_macro(&1, module_name, txn)) + Enum.reduce_while(functions, :ok, fn function, _acc -> + case apply_process_macro(function, module_name, txn) do + :ok -> {:cont, :ok} + {:ok, _} = term -> {:cont, term} + term -> {:halt, term} + end + end) end defp apply_process_macro(function, module, txn) do if Keyword.has_key?(module.__info__(:functions), function) do apply(module, function, [txn]) + else + :ok end end end diff --git a/lib/walex/events/events.ex b/lib/walex/events/events.ex index de0a406..8a7264b 100644 --- a/lib/walex/events/events.ex +++ b/lib/walex/events/events.ex @@ -35,9 +35,9 @@ defmodule WalEx.Events do @impl true def handle_call({:process, txn, app_name}, _from, state) do - process_destinations(txn, app_name) + resp = process_destinations(txn, app_name) - {:reply, :ok, state} + {:reply, resp, state} end defp process_destinations(txn, app_name) do diff --git a/lib/walex/replication/publisher.ex b/lib/walex/replication/publisher.ex index b128777..8e79ed8 100644 --- a/lib/walex/replication/publisher.ex +++ b/lib/walex/replication/publisher.ex @@ -6,6 +6,9 @@ defmodule WalEx.Replication.Publisher do alias WalEx.{Changes, Config, Events, Types} alias WalEx.Decoder.Messages + alias WalEx.Replication.Server + + require Logger defmodule(State, do: @@ -68,11 +71,26 @@ defmodule WalEx.Replication.Publisher do end defp process_message( - %{message: %Messages.Commit{lsn: commit_lsn}, app_name: app_name}, + %{ + message: %Messages.Commit{lsn: commit_lsn, end_lsn: commit_end_lsn}, + app_name: app_name + }, %State{transaction: {current_txn_lsn, txn}, relations: _relations} = state ) when commit_lsn == current_txn_lsn do - Events.process(txn, app_name) + case Events.process(txn, app_name) do + :ok -> + Server.ack(commit_end_lsn, app_name) + + {:ok, _} -> + Server.ack(commit_end_lsn, app_name) + + term -> + Logger.error( + "Failed to process transaction for lsn: #{inspect(commit_end_lsn)} with term: #{inspect(term)}" + ) + end + state end diff --git a/lib/walex/replication/server.ex b/lib/walex/replication/server.ex index fdf85dd..52fc57a 100644 --- a/lib/walex/replication/server.ex +++ b/lib/walex/replication/server.ex @@ -46,7 +46,8 @@ defmodule WalEx.Replication.Server do slot_name: slot_name, publication: publication, durable_slot: durable_slot, - message_middleware: message_middleware + message_middleware: message_middleware, + wal_position: nil } {:ok, state} @@ -113,7 +114,13 @@ defmodule WalEx.Replication.Server do end @impl true - def handle_result([%Postgrex.Result{} | _results], state = %{step: :create_slot}) do + def handle_result( + [%Postgrex.Result{columns: columns, rows: [row]} | _results], + state = %{step: :create_slot} + ) do + consistent_point = columns |> Enum.zip(row) |> Enum.into(%{}) |> Map.get("consistent_point") + wal_position = String.split(consistent_point, "/") |> List.last() |> String.to_integer(16) + state = Map.put(state, :wal_position, wal_position) start_replication_with_retry(state, 0, @initial_backoff) end @@ -152,8 +159,19 @@ defmodule WalEx.Replication.Server do def handle_data(<>, state) do messages = case reply do - 1 -> [<>] - 0 -> [] + 1 -> + Logger.debug( + "standby status update, remote wal: #{wal_end} current wal: #{state.wal_position}" + ) + + position = state.wal_position + 1 + + [ + <> + ] + + 0 -> + [] end {:noreply, messages, state} @@ -165,6 +183,24 @@ defmodule WalEx.Replication.Server do {:query, query, %{state | step: :slot_exists}} end + def handle_info({:ack_transaction, {_, wal_position}}, state) do + Logger.debug("moving wal position to #{wal_position}") + state = Map.put(state, :wal_position, wal_position) + {:noreply, state} + end + + def ack(info, app_name) do + case Registry.lookup(:walex_registry, {__MODULE__, app_name}) do + [{pid, _}] -> + send(pid, {:ack_transaction, info}) + + [] -> + Logger.warning( + "Attempted to ack transaction but server process not found for #{app_name}" + ) + end + end + defp set_pgx_replication_conn_opts(app_name) do database_configs_keys = [ :hostname, diff --git a/test/walex/database_test.exs b/test/walex/database_test.exs index 946e2e0..9b60a53 100644 --- a/test/walex/database_test.exs +++ b/test/walex/database_test.exs @@ -82,6 +82,8 @@ defmodule WalEx.DatabaseTest do refute Process.info(database_pid) + Process.sleep(5000) + new_database_pid = get_database_pid(supervisor_pid) assert is_pid(new_database_pid) @@ -143,6 +145,8 @@ defmodule WalEx.DatabaseTest do assert :ok == pg_restart() + Process.sleep(5000) + new_database_pid = get_database_pid(supervisor_pid) assert is_pid(new_database_pid) @@ -151,6 +155,8 @@ defmodule WalEx.DatabaseTest do end test "durable replication slot", %{database_pid: database_pid} do + # TODO currently failing + assert [] = pg_replication_slots(database_pid) slot_name = "durable_slot" @@ -208,6 +214,35 @@ defmodule WalEx.DatabaseTest do [slot] = pg_replication_slots(database_pid) assert slot == inactive_slot end + + @tag timeout: 80_000 + test "should send wal again if module returns an error with durable slot" do + start_supervised!(QueuedResponses) + + config = + @base_configs + |> Keyword.put(:modules, [UsesQueueModule]) + |> Keyword.put(:durable_slot, true) + |> Keyword.put(:slot_name, "durable_retry_slot") + + QueuedResponses.push(fn -> + Logger.debug("Error response") + :error + end) + + QueuedResponses.push(fn -> + Logger.debug("Success response") + :ok + end) + + assert {:ok, supervisor_pid} = TestSupervisor.start_link(config) + database_pid = get_database_pid(supervisor_pid) + + log = assert_update_user(database_pid, sleep: 60_000) + assert log =~ "Error response" + assert log =~ "Failed to process transaction" + assert log =~ "Success response" + end end @linux_path "/usr/lib/postgresql" @@ -446,16 +481,22 @@ defmodule WalEx.DatabaseTest do ) end - defp assert_update_user(database_pid) do + defp assert_update_user(database_pid, opts \\ []) do + sleep_duration = Keyword.get(opts, :sleep, 1000) + capture_log = ExUnit.CaptureLog.capture_log(fn -> update_user(database_pid) - :timer.sleep(1000) + :timer.sleep(sleep_duration) end) + IO.puts(capture_log) + assert capture_log =~ "on_update event occurred" assert capture_log =~ "%WalEx.Event" + + capture_log end end @@ -486,3 +527,38 @@ defmodule TestModule do fn events -> Logger.info("on_update event occurred: #{inspect(events, pretty: true)}") end ) end + +defmodule QueuedResponses do + use Agent + + def start_link(_opts) do + Agent.start_link(fn -> [] end, name: __MODULE__) + end + + def push(response) do + Agent.update(__MODULE__, fn responses -> [response | responses] end) + end + + def pop do + case Agent.get_and_update(__MODULE__, &List.pop_at(&1, -1)) do + nil -> + raise "No responses in queue." + + response -> + response + end + end +end + +defmodule UsesQueueModule do + require Logger + use WalEx.Event, name: :todos + + on_update( + :user, + [], + fn _events -> + QueuedResponses.pop().() + end + ) +end