Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: improve replication reliability with transaction acknowledgment #74

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 22 additions & 7 deletions lib/walex/events/event_modules.ex
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,15 @@ defmodule WalEx.Events.EventModules do

@impl true
def handle_call({:process, txn, server}, _from, state) do
server
|> WalEx.Config.get_configs(:modules)
|> process_events(txn)
resp =
server
|> WalEx.Config.get_configs(:modules)
|> process_events(txn)

{:reply, :ok, state}
{:reply, resp, state}
end

defp process_events(nil, %{changes: [], commit_timestamp: _}), do: nil
defp process_events(nil, %{changes: [], commit_timestamp: _}), do: :ok

defp process_events(modules, txn) when is_list(modules) do
process_modules(modules, txn)
Expand All @@ -48,16 +49,30 @@ defmodule WalEx.Events.EventModules do
defp process_modules(modules, txn) do
functions = ~w(process_all process_insert process_update process_delete)a

Enum.each(modules, &process_module(&1, functions, txn))
Enum.reduce_while(modules, :ok, fn module_name, _acc ->
case process_module(module_name, functions, txn) do
:ok -> {:cont, :ok}
{:ok, _} = term -> {:cont, term}
term -> {:halt, term}
end
end)
end

defp process_module(module_name, functions, txn) do
Enum.each(functions, &apply_process_macro(&1, module_name, txn))
Enum.reduce_while(functions, :ok, fn function, _acc ->
case apply_process_macro(function, module_name, txn) do
:ok -> {:cont, :ok}
{:ok, _} = term -> {:cont, term}
term -> {:halt, term}
end
end)
end

defp apply_process_macro(function, module, txn) do
if Keyword.has_key?(module.__info__(:functions), function) do
apply(module, function, [txn])
else
:ok
end
end
end
4 changes: 2 additions & 2 deletions lib/walex/events/events.ex
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ defmodule WalEx.Events do

@impl true
def handle_call({:process, txn, app_name}, _from, state) do
process_destinations(txn, app_name)
resp = process_destinations(txn, app_name)

{:reply, :ok, state}
{:reply, resp, state}
end

defp process_destinations(txn, app_name) do
Expand Down
22 changes: 20 additions & 2 deletions lib/walex/replication/publisher.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ defmodule WalEx.Replication.Publisher do

alias WalEx.{Changes, Config, Events, Types}
alias WalEx.Decoder.Messages
alias WalEx.Replication.Server

require Logger

defmodule(State,
do:
Expand Down Expand Up @@ -68,11 +71,26 @@ defmodule WalEx.Replication.Publisher do
end

defp process_message(
%{message: %Messages.Commit{lsn: commit_lsn}, app_name: app_name},
%{
message: %Messages.Commit{lsn: commit_lsn, end_lsn: commit_end_lsn},
app_name: app_name
},
%State{transaction: {current_txn_lsn, txn}, relations: _relations} = state
)
when commit_lsn == current_txn_lsn do
Events.process(txn, app_name)
case Events.process(txn, app_name) do
:ok ->
Server.ack(commit_end_lsn, app_name)

{:ok, _} ->
Server.ack(commit_end_lsn, app_name)

term ->
Logger.error(
"Failed to process transaction for lsn: #{inspect(commit_end_lsn)} with term: #{inspect(term)}"
)
end

state
end

Expand Down
44 changes: 40 additions & 4 deletions lib/walex/replication/server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ defmodule WalEx.Replication.Server do
slot_name: slot_name,
publication: publication,
durable_slot: durable_slot,
message_middleware: message_middleware
message_middleware: message_middleware,
wal_position: nil
}

{:ok, state}
Expand Down Expand Up @@ -113,7 +114,13 @@ defmodule WalEx.Replication.Server do
end

@impl true
def handle_result([%Postgrex.Result{} | _results], state = %{step: :create_slot}) do
def handle_result(
[%Postgrex.Result{columns: columns, rows: [row]} | _results],
state = %{step: :create_slot}
) do
consistent_point = columns |> Enum.zip(row) |> Enum.into(%{}) |> Map.get("consistent_point")
wal_position = String.split(consistent_point, "/") |> List.last() |> String.to_integer(16)
state = Map.put(state, :wal_position, wal_position)
start_replication_with_retry(state, 0, @initial_backoff)
end

Expand Down Expand Up @@ -152,8 +159,19 @@ defmodule WalEx.Replication.Server do
def handle_data(<<?k, wal_end::64, _clock::64, reply>>, state) do
messages =
case reply do
1 -> [<<?r, wal_end + 1::64, wal_end + 1::64, wal_end + 1::64, current_time()::64, 0>>]
0 -> []
1 ->
Logger.debug(
"standby status update, remote wal: #{wal_end} current wal: #{state.wal_position}"
)

position = state.wal_position + 1

[
<<?r, position::64, position::64, position::64, current_time()::64, 0>>
]

0 ->
[]
end

{:noreply, messages, state}
Expand All @@ -165,6 +183,24 @@ defmodule WalEx.Replication.Server do
{:query, query, %{state | step: :slot_exists}}
end

def handle_info({:ack_transaction, {_, wal_position}}, state) do
Logger.debug("moving wal position to #{wal_position}")
state = Map.put(state, :wal_position, wal_position)
{:noreply, state}
end

def ack(info, app_name) do
case Registry.lookup(:walex_registry, {__MODULE__, app_name}) do
[{pid, _}] ->
send(pid, {:ack_transaction, info})

[] ->
Logger.warning(
"Attempted to ack transaction but server process not found for #{app_name}"
)
end
end

defp set_pgx_replication_conn_opts(app_name) do
database_configs_keys = [
:hostname,
Expand Down
80 changes: 78 additions & 2 deletions test/walex/database_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@

refute Process.info(database_pid)

Process.sleep(5000)

new_database_pid = get_database_pid(supervisor_pid)

assert is_pid(new_database_pid)
Expand Down Expand Up @@ -143,6 +145,8 @@

assert :ok == pg_restart()

Process.sleep(5000)

new_database_pid = get_database_pid(supervisor_pid)

assert is_pid(new_database_pid)
Expand All @@ -151,6 +155,8 @@
end

test "durable replication slot", %{database_pid: database_pid} do
# TODO currently failing

assert [] = pg_replication_slots(database_pid)

slot_name = "durable_slot"
Expand Down Expand Up @@ -208,6 +214,35 @@
[slot] = pg_replication_slots(database_pid)
assert slot == inactive_slot
end

@tag timeout: 80_000
test "should send wal again if module returns an error with durable slot" do

Check failure on line 219 in test/walex/database_test.exs

View workflow job for this annotation

GitHub Actions / Build and test

test logical replication should send wal again if module returns an error with durable slot (WalEx.DatabaseTest)
start_supervised!(QueuedResponses)

config =
@base_configs
|> Keyword.put(:modules, [UsesQueueModule])
|> Keyword.put(:durable_slot, true)
|> Keyword.put(:slot_name, "durable_retry_slot")

QueuedResponses.push(fn ->
Logger.debug("Error response")
:error
end)

QueuedResponses.push(fn ->
Logger.debug("Success response")
:ok
end)

assert {:ok, supervisor_pid} = TestSupervisor.start_link(config)
database_pid = get_database_pid(supervisor_pid)

log = assert_update_user(database_pid, sleep: 60_000)
assert log =~ "Error response"
assert log =~ "Failed to process transaction"
assert log =~ "Success response"
end
end

@linux_path "/usr/lib/postgresql"
Expand Down Expand Up @@ -446,16 +481,22 @@
)
end

defp assert_update_user(database_pid) do
defp assert_update_user(database_pid, opts \\ []) do
sleep_duration = Keyword.get(opts, :sleep, 1000)

capture_log =
ExUnit.CaptureLog.capture_log(fn ->
update_user(database_pid)

:timer.sleep(1000)
:timer.sleep(sleep_duration)
end)

IO.puts(capture_log)

assert capture_log =~ "on_update event occurred"
assert capture_log =~ "%WalEx.Event"

capture_log
end
end

Expand Down Expand Up @@ -486,3 +527,38 @@
fn events -> Logger.info("on_update event occurred: #{inspect(events, pretty: true)}") end
)
end

defmodule QueuedResponses do
use Agent

def start_link(_opts) do
Agent.start_link(fn -> [] end, name: __MODULE__)
end

def push(response) do
Agent.update(__MODULE__, fn responses -> [response | responses] end)
end

def pop do
case Agent.get_and_update(__MODULE__, &List.pop_at(&1, -1)) do
nil ->
raise "No responses in queue."

response ->
response
end
end
end

defmodule UsesQueueModule do
require Logger
use WalEx.Event, name: :todos

on_update(
:user,
[],
fn _events ->
QueuedResponses.pop().()
end
)
end
Loading