Skip to content

Commit

Permalink
skip update to search client with no changes
Browse files Browse the repository at this point in the history
  • Loading branch information
TerionGVS5 committed Feb 21, 2025
1 parent 4b7e5d5 commit 09dea8e
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 5 deletions.
15 changes: 10 additions & 5 deletions pgsync/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -1216,6 +1216,10 @@ def _on_publish(self, payloads: t.List[Payload]) -> None:
else:
_payloads: t.List[Payload] = []
for i, payload in enumerate(payloads):
if payload.tg_op == UPDATE and payload.old == payload.new:
logger.debug(f"Skipping payload due to no changes.")
continue

_payloads.append(payload)
j: int = i + 1
if j < len(payloads):
Expand All @@ -1228,11 +1232,12 @@ def _on_publish(self, payloads: t.List[Payload]) -> None:
self.index, self._payloads(_payloads)
)
_payloads = []
elif j == len(payloads):
self.search_client.bulk(
self.index, self._payloads(_payloads)
)
_payloads: list = []

if _payloads:
self.search_client.bulk(
self.index, self._payloads(_payloads)
)
_payloads: list = []

txids: t.Set = set(map(lambda x: x.xmin, payloads))
# for truncate, tg_op txids is None so skip setting the checkpoint
Expand Down
35 changes: 35 additions & 0 deletions tests/test_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,41 @@ def test__on_publish_mixed_ops(self, mock_logger, mock_es, sync):
mock_es.debug.call_count == 3
mock_es.assert_any_call("testdb", ANY)

@patch("pgsync.sync.SearchClient.bulk")
@patch("pgsync.sync.logger")
def test__on_publish_mixed_update_no_changes(self, mock_logger, mock_es, sync):
payloads = [
Payload(
schema="public",
tg_op="INSERT",
table="book",
old={"isbn": "001"},
new={"isbn": "0001"},
xmin=1234,
),
Payload(
schema="public",
tg_op="UPDATE",
table="book",
old={"isbn": "0002"},
new={"isbn": "0002"},
xmin=1234,
),
Payload(
schema="public",
tg_op="DELETE",
table="book",
old={"isbn": "003"},
new={"isbn": "0003"},
xmin=1234,
),
]
sync._on_publish(payloads)
mock_logger.debug.assert_any_call("Skipping payload due to no changes.")
assert sync.checkpoint is not None
mock_es.debug.call_count == 2
mock_es.assert_any_call("testdb", ANY)

@patch("pgsync.sync.Sync._on_publish")
def test_on_publish(self, mock_on_publish, sync):
payloads = [
Expand Down

0 comments on commit 09dea8e

Please sign in to comment.