Skip to content

Commit

Permalink
Verify that replica nodes received checkpoint LSN on shutdown (patron…
Browse files Browse the repository at this point in the history
…i#2939)

In case if archiving is enabled the `Postgresql.latest_checkpoint_location()` method returns LSN of the prev (SWITCH) record, which points to the beginning of the WAL file. It is done in order to make it possible to safely promote replica which recovers WAL files from the archive and wasn't streaming when the primary was stopped (primary doesn't archive this WAL file).

But, in certain cases using the LSN pointing to SWITCH record was causing unnecessary pg_rewind, if replica didn't managed to replay shutdown checkpoint record before it was promoted.

In order to mitigate the problem we need to check that replica received/replayed exactly the shutdown checkpoint LSN. But, at the same time we will still write LSN of the SWITCH record to the `/status` key when releasing the leader lock.
  • Loading branch information
CyberDem0n authored Nov 7, 2023
1 parent 269b04b commit 552e864
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 19 deletions.
13 changes: 7 additions & 6 deletions patroni/ha.py
Original file line number Diff line number Diff line change
Expand Up @@ -1227,15 +1227,16 @@ def demote(self, mode: str) -> Optional[bool]:

status = {'released': False}

def on_shutdown(checkpoint_location: int) -> None:
def on_shutdown(checkpoint_location: int, prev_location: int) -> None:
# Postmaster is still running, but pg_control already reports clean "shut down".
# It could happen if Postgres is still archiving the backlog of WAL files.
# If we know that there are replicas that received the shutdown checkpoint
# location, we can remove the leader key and allow them to start leader race.
time.sleep(1) # give replicas some more time to catch up
if self.is_failover_possible(cluster_lsn=checkpoint_location):
self.state_handler.set_role('demoted')
with self._async_executor:
self.release_leader_key_voluntarily(checkpoint_location)
self.release_leader_key_voluntarily(prev_location)
status['released'] = True

def before_shutdown() -> None:
Expand Down Expand Up @@ -1990,18 +1991,18 @@ def shutdown(self) -> None:

status = {'deleted': False}

def _on_shutdown(checkpoint_location: int) -> None:
def _on_shutdown(checkpoint_location: int, prev_location: int) -> None:
if self.is_leader():
# Postmaster is still running, but pg_control already reports clean "shut down".
# It could happen if Postgres is still archiving the backlog of WAL files.
# If we know that there are replicas that received the shutdown checkpoint
# location, we can remove the leader key and allow them to start leader race.

time.sleep(1) # give replicas some more time to catch up
if self.is_failover_possible(cluster_lsn=checkpoint_location):
self.dcs.delete_leader(self.cluster.leader, checkpoint_location)
self.dcs.delete_leader(self.cluster.leader, prev_location)
status['deleted'] = True
else:
self.dcs.write_leader_optime(checkpoint_location)
self.dcs.write_leader_optime(prev_location)

def _before_shutdown() -> None:
self.notify_citus_coordinator('before_demote')
Expand Down
40 changes: 29 additions & 11 deletions patroni/postgresql/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -591,14 +591,17 @@ def parse_wal_record(self, timeline: str,
return match.group(1), match.group(2), match.group(3), match.group(4)
return None, None, None, None

def latest_checkpoint_location(self) -> Optional[int]:
"""Returns checkpoint location for the cleanly shut down primary.
But, if we know that the checkpoint was written to the new WAL
due to the archive_mode=on, we will return the LSN of prev wal record (SWITCH)."""
def _checkpoint_locations_from_controldata(self, data: Dict[str, str]) -> Optional[Tuple[int, int]]:
"""Get shutdown checkpoint location.
:param data: :class:`dict` object with values returned by `pg_controldata` tool.
data = self.controldata()
:returns: a tuple of checkpoint LSN for the cleanly shut down primary, and LSN of prev wal record (SWITCH)
if we know that the checkpoint was written to the new WAL file due to the archive_mode=on.
"""
timeline = data.get("Latest checkpoint's TimeLineID")
lsn = checkpoint_lsn = data.get('Latest checkpoint location')
prev_lsn = None
if data.get('Database cluster state') == 'shut down' and lsn and timeline and checkpoint_lsn:
try:
checkpoint_lsn = parse_lsn(checkpoint_lsn)
Expand All @@ -609,13 +612,26 @@ def latest_checkpoint_location(self) -> Optional[int]:
_, lsn, _, desc = self.parse_wal_record(timeline, prev)
prev = parse_lsn(prev)
# If the cluster is shutdown with archive_mode=on, WAL is switched before writing the checkpoint.
# In this case we want to take the LSN of previous record (switch) as the last known WAL location.
# In this case we want to take the LSN of previous record (SWITCH) as the last known WAL location.
if lsn and parse_lsn(lsn) == prev and str(desc).strip() in ('xlog switch', 'SWITCH'):
return prev
prev_lsn = prev
except Exception as e:
logger.error('Exception when parsing WAL pg_%sdump output: %r', self.wal_name, e)
if isinstance(checkpoint_lsn, int):
return checkpoint_lsn
return checkpoint_lsn, (prev_lsn or checkpoint_lsn)

def latest_checkpoint_location(self) -> Optional[int]:
"""Get shutdown checkpoint location.
.. note::
In case if checkpoint was written to the new WAL file due to the archive_mode=on
we return LSN of the previous wal record (SWITCH).
:returns: checkpoint LSN for the cleanly shut down primary.
"""
checkpoint_locations = self._checkpoint_locations_from_controldata(self.controldata())
if checkpoint_locations:
return checkpoint_locations[1]

def is_running(self) -> Optional[PostmasterProcess]:
"""Returns PostmasterProcess if one is running on the data directory or None. If most recently seen process
Expand Down Expand Up @@ -801,7 +817,7 @@ def checkpoint(self, connect_kwargs: Optional[Dict[str, Any]] = None,
return 'not accessible or not healty'

def stop(self, mode: str = 'fast', block_callbacks: bool = False, checkpoint: Optional[bool] = None,
on_safepoint: Optional[Callable[..., Any]] = None, on_shutdown: Optional[Callable[[int], Any]] = None,
on_safepoint: Optional[Callable[..., Any]] = None, on_shutdown: Optional[Callable[[int, int], Any]] = None,
before_shutdown: Optional[Callable[..., Any]] = None, stop_timeout: Optional[int] = None) -> bool:
"""Stop PostgreSQL
Expand Down Expand Up @@ -831,7 +847,7 @@ def stop(self, mode: str = 'fast', block_callbacks: bool = False, checkpoint: Op
return success

def _do_stop(self, mode: str, block_callbacks: bool, checkpoint: bool,
on_safepoint: Optional[Callable[..., Any]], on_shutdown: Optional[Callable[..., Any]],
on_safepoint: Optional[Callable[..., Any]], on_shutdown: Optional[Callable[[int, int], Any]],
before_shutdown: Optional[Callable[..., Any]], stop_timeout: Optional[int]) -> Tuple[bool, bool]:
postmaster = self.is_running()
if not postmaster:
Expand Down Expand Up @@ -871,7 +887,9 @@ def _do_stop(self, mode: str, block_callbacks: bool, checkpoint: bool,
while postmaster.is_running():
data = self.controldata()
if data.get('Database cluster state', '') == 'shut down':
on_shutdown(self.latest_checkpoint_location())
checkpoint_locations = self._checkpoint_locations_from_controldata(data)
if checkpoint_locations:
on_shutdown(*checkpoint_locations)
break
elif data.get('Database cluster state', '').startswith('shut down'): # shut down in recovery
break
Expand Down
2 changes: 1 addition & 1 deletion tests/test_ha.py
Original file line number Diff line number Diff line change
Expand Up @@ -1532,7 +1532,7 @@ def test_shutdown(self):
self.ha.is_leader = true

def stop(*args, **kwargs):
kwargs['on_shutdown'](123)
kwargs['on_shutdown'](123, 120)

self.p.stop = stop
self.ha.shutdown()
Expand Down
5 changes: 4 additions & 1 deletion tests/test_postgresql.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,10 @@ def test_stop(self, mock_cancellable_call, mock_is_running):
@patch.object(Postgresql, 'latest_checkpoint_location', Mock(return_value='7'))
def test__do_stop(self):
mock_callback = Mock()
with patch.object(Postgresql, 'controldata', Mock(return_value={'Database cluster state': 'shut down'})):
with patch.object(Postgresql, 'controldata',
Mock(return_value={'Database cluster state': 'shut down',
"Latest checkpoint's TimeLineID": '1',
'Latest checkpoint location': '1/1'})):
self.assertTrue(self.p.stop(on_shutdown=mock_callback, stop_timeout=3))
mock_callback.assert_called()
with patch.object(Postgresql, 'controldata',
Expand Down

0 comments on commit 552e864

Please sign in to comment.