Skip to content

Commit

Permalink
fixed 2 bugs in find_and_delete_entries, added tests (#172)
Browse files Browse the repository at this point in the history
  • Loading branch information
hemidactylus authored Oct 2, 2024
1 parent e2ff7dc commit 3b8aee8
Show file tree
Hide file tree
Showing 6 changed files with 390 additions and 31 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
# Environments
.env
.testing.env
.local.testing.env
env

### (abridged) BOILERPLATE STARTS HERE
Expand Down
5 changes: 5 additions & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@ Add method to get session and keyspace from parameters.
Calling `cassio.init()` with insufficient arguments now raises an exception.
Fixed: bug when metadata key contains json and, in general, curly braces (by @epinzur)
Remove support for (EOL'd) python 3.8
Bugfix: find_and_delete_entries (metadata mixin) now uses the provided batch_size
Bugfix: find_and_delete_entries (metadata mixin) made compatible with clustered mixin
Added testing for find_and_delete_entries:
- with clustered mixin and all call patterns (w/out partition, w/out row_id)
- enhanced testing on simple metadata table (with counting checks, all call patterns)

v 0.1.9
=======
Expand Down
42 changes: 34 additions & 8 deletions src/cassio/table/mixins/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,22 +279,48 @@ def _get_find_entries_cql(
)
return select_cql, select_vals

def find_entries(self, n: int, **kwargs: Any) -> Iterable[RowType]:
def _find_unnormalized_entries(self, n: int, **kwargs: Any) -> Iterable[RowType]:
select_cql, select_vals = self._get_find_entries_cql(n, **kwargs)
result_set = self.execute_cql(
select_cql, args=select_vals, op_type=CQLOpType.READ
)
return (self._normalize_row(result) for result in result_set)
return (
raw_row if isinstance(raw_row, dict) else raw_row._asdict() # type: ignore[attr-defined]
for raw_row in result_set
)

def find_entries(self, n: int, **kwargs: Any) -> Iterable[RowType]:
return (
self._normalize_row(result)
for result in self._find_unnormalized_entries(
n=n,
**kwargs,
)
)

def find_entries_async(self, n: int, **kwargs: Any) -> ResponseFuture:
raise NotImplementedError("Asynchronous reads are not supported.")

async def afind_entries(self, n: int, **kwargs: Any) -> Iterable[RowType]:
async def _afind_unnormalized_entries(
self, n: int, **kwargs: Any
) -> Iterable[RowType]:
select_cql, select_vals = self._get_find_entries_cql(n, **kwargs)
result_set = await self.aexecute_cql(
select_cql, args=select_vals, op_type=CQLOpType.READ
)
return (self._normalize_row(result) for result in result_set)
return (
raw_row if isinstance(raw_row, dict) else raw_row._asdict() # type: ignore[attr-defined]
for raw_row in result_set
)

async def afind_entries(self, n: int, **kwargs: Any) -> Iterable[RowType]:
return (
self._normalize_row(result)
for result in await self._afind_unnormalized_entries(
n=n,
**kwargs,
)
)

@staticmethod
def _get_to_delete_and_visited(
Expand Down Expand Up @@ -322,15 +348,13 @@ def find_and_delete_entries(
# TODO: Use the 'columns' for a narrowed projection
# TODO: decouple finding and deleting (streaming) for faster performance
primary_key_cols = [col for col, _ in self._schema_primary_key()]
#
batch_size = 20
to_delete, visited_tuples = self._get_to_delete_and_visited(
n, batch_size, set()
)
while to_delete > 0:
del_pkargs = [
[found_row[pkc] for pkc in primary_key_cols]
for found_row in self.find_entries(n=to_delete, **kwargs)
for found_row in self._find_unnormalized_entries(n=to_delete, **kwargs)
]
if del_pkargs == []:
break
Expand Down Expand Up @@ -366,7 +390,9 @@ async def afind_and_delete_entries(
while to_delete > 0:
del_pkargs = [
[found_row[pkc] for pkc in primary_key_cols]
for found_row in await self.afind_entries(n=to_delete, **kwargs)
for found_row in await self._afind_unnormalized_entries(
n=to_delete, **kwargs
)
]
delete_coros = [
self.adelete(
Expand Down
2 changes: 2 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import pytest
from cassandra.auth import PlainTextAuthProvider
from cassandra.cluster import Cluster, Session
from cassandra.protocol import ProtocolVersion
from dotenv import load_dotenv
from testcontainers.core.container import DockerContainer
from testcontainers.core.waiting_utils import wait_for_logs
Expand Down Expand Up @@ -82,6 +83,7 @@ def db_session(cassandra_port: int) -> Iterator[Session]:
ASTRA_DB_CLIENT_ID,
ASTRA_DB_APPLICATION_TOKEN,
),
protocol_version=ProtocolVersion.V4,
)
yield cluster.connect()
elif mode in ["LOCAL_CASSANDRA", "TESTCONTAINERS_CASSANDRA"]:
Expand Down
265 changes: 265 additions & 0 deletions tests/integration/test_tableclasses_clusteredmetadatacassandratable.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,265 @@
"""
Table classes integration test - ClusteredMetadataCassandraTable
"""

import asyncio

import pytest
from cassandra.cluster import Session

from cassio.table.tables import ClusteredMetadataCassandraTable


@pytest.mark.usefixtures("db_session", "db_keyspace")
class TestClusteredMetadataCassandraTable:
def test_find_and_delete_entries_sync(
self, db_session: Session, db_keyspace: str
) -> None:
"""
Plan for the rows in this table:
primary key partition key metadata
-----------------------------------------------------------
("C", "up") ("dele", 0) {"field": "alpha"}
("C", "up") ("dele", 1) {"field": "alpha"}
...
("C", "up") ("good", 0) {"field": "omega"}
("C", "up") ("good", 1) {"field": "omega"}
...
("C", "dn") ("dele", 0) {"field": "alpha"}
("C", "dn") ("dele", 1) {"field": "alpha"}
...
("C", "dn") ("good", 0) {"field": "omega"}
("C", "dn") ("good", 1) {"field": "omega"}
...
for a total of 2 x 2 x ONEFOURTH_N_ROWS:
a 2 due to up/dn in part key
a 2 due to dele/good in clustering
The total rows to delete, i.e. those with alpha, are 2 x ONEFOURTH_N_ROWS.
"""
table_name_fad = "cm_ct"
ONEFOURTH_N_ROWS = 128
FAD_MAX_COUNT = 30 # must be < ONEFOURTH_N_ROWS for full testing
FAD_BATCH_SIZE = 25 # must be < FAD_MAX_COUNT-1 for full testing
db_session.execute(f"DROP TABLE IF EXISTS {db_keyspace}.{table_name_fad};")
t_fad = ClusteredMetadataCassandraTable(
session=db_session,
keyspace=db_keyspace,
table=table_name_fad,
primary_key_type=["TEXT", "TEXT", "TEXT", "INT"],
num_partition_keys=2,
)
futures = [
t_fad.put_async(
partition_id=("C", part_k),
row_id=(["good", "dele"][dele_status], row_i),
body_blob=(
f"PART_{part_k} / ROWID_{['good', 'dele'][dele_status]} "
f"/ md_{['omega', 'alpha'][dele_status]}"
),
metadata={"field": ["omega", "alpha"][dele_status]},
)
for row_i in range(ONEFOURTH_N_ROWS)
for dele_status in [1, 0] # 1 means "alpha", i.e. delete-me
for part_k in ["up", "dn"]
]
for f in futures:
_ = f.result()
#
q_md = {"field": "alpha"}

total_matching_rows = 2 * ONEFOURTH_N_ROWS

num_found_items_0a = len(
list(t_fad.find_entries(n=total_matching_rows + 1, metadata=q_md))
)
assert num_found_items_0a == total_matching_rows

# find_and_delete calls without match:
num_deleted0a = t_fad.find_and_delete_entries(
metadata=q_md,
partition_id=("X", "up"),
)
assert num_deleted0a == 0
num_deleted0b = t_fad.find_and_delete_entries(
metadata=q_md,
partition_id=("C", "up"),
row_id=("no", -1),
)
assert num_deleted0b == 0
num_deleted0c = t_fad.find_and_delete_entries(
metadata=q_md,
partition_id=("C", "up"),
row_id=("good", 0),
)
assert num_deleted0c == 0

num_found_items_0b = len(
list(t_fad.find_entries(n=total_matching_rows + 1, metadata=q_md))
)
assert num_found_items_0b == total_matching_rows

# one-item deletion
num_deleted1 = t_fad.find_and_delete_entries(
metadata=q_md,
partition_id=("C", "up"),
row_id=("dele", 0),
)
assert num_deleted1 == 1
num_found_items_1 = len(
list(t_fad.find_entries(n=total_matching_rows + 1, metadata=q_md))
)
assert num_found_items_1 == total_matching_rows - 1

# deletion of part of a partition
num_deleted_p = t_fad.find_and_delete_entries(
metadata=q_md,
partition_id=("C", "up"),
n=FAD_MAX_COUNT,
batch_size=FAD_BATCH_SIZE,
)
assert num_deleted_p == FAD_MAX_COUNT
num_found_items_p = len(
list(t_fad.find_entries(n=total_matching_rows + 1, metadata=q_md))
)
assert num_found_items_p == total_matching_rows - FAD_MAX_COUNT - 1

# deletion of the rest of the partition
num_deleted_p2 = t_fad.find_and_delete_entries(
metadata=q_md,
partition_id=("C", "up"),
batch_size=FAD_BATCH_SIZE,
)
assert num_deleted_p2 == ONEFOURTH_N_ROWS - FAD_MAX_COUNT - 1
num_found_items_p2 = len(
list(t_fad.find_entries(n=total_matching_rows + 1, metadata=q_md))
)
assert num_found_items_p2 == total_matching_rows - ONEFOURTH_N_ROWS

# deletion of everything that remains
num_deleted_a = t_fad.find_and_delete_entries(
metadata=q_md,
batch_size=FAD_BATCH_SIZE,
)
assert num_deleted_a == ONEFOURTH_N_ROWS
num_found_items_a = len(
list(t_fad.find_entries(n=total_matching_rows + 1, metadata=q_md))
)
assert num_found_items_a == 0

@pytest.mark.asyncio
async def test_find_and_delete_entries_async(
self, db_session: Session, db_keyspace: str
) -> None:
"""Same logic as for the sync counterpart."""
table_name_fad = "cm_ct"
ONEFOURTH_N_ROWS = 128
FAD_MAX_COUNT = 30 # must be < ONEFOURTH_N_ROWS for full testing
FAD_BATCH_SIZE = 25 # must be < FAD_MAX_COUNT-1 for full testing
db_session.execute(f"DROP TABLE IF EXISTS {db_keyspace}.{table_name_fad};")
t_fad = ClusteredMetadataCassandraTable(
session=db_session,
keyspace=db_keyspace,
table=table_name_fad,
primary_key_type=["TEXT", "TEXT", "TEXT", "INT"],
num_partition_keys=2,
)

coros = [
t_fad.aput(
partition_id=("C", part_k),
row_id=(["good", "dele"][dele_status], row_i),
body_blob=(
f"PART_{part_k} / ROWID_{['good', 'dele'][dele_status]} "
f"/ md_{['omega', 'alpha'][dele_status]}"
),
metadata={"field": ["omega", "alpha"][dele_status]},
)
for row_i in range(ONEFOURTH_N_ROWS)
for dele_status in [1, 0] # 1 means "alpha", i.e. delete-me
for part_k in ["up", "dn"]
]
await asyncio.gather(*coros)

#
q_md = {"field": "alpha"}

total_matching_rows = 2 * ONEFOURTH_N_ROWS

num_found_items_0a = len(
list(await t_fad.afind_entries(n=total_matching_rows + 1, metadata=q_md))
)
assert num_found_items_0a == total_matching_rows

# find_and_delete calls without match:
num_deleted0a = await t_fad.afind_and_delete_entries(
metadata=q_md,
partition_id=("X", "up"),
)
assert num_deleted0a == 0
num_deleted0b = await t_fad.afind_and_delete_entries(
metadata=q_md,
partition_id=("C", "up"),
row_id=("no", -1),
)
assert num_deleted0b == 0
num_deleted0c = await t_fad.afind_and_delete_entries(
metadata=q_md,
partition_id=("C", "up"),
row_id=("good", 0),
)
assert num_deleted0c == 0

num_found_items_0b = len(
list(await t_fad.afind_entries(n=total_matching_rows + 1, metadata=q_md))
)
assert num_found_items_0b == total_matching_rows

# one-item deletion
num_deleted1 = await t_fad.afind_and_delete_entries(
metadata=q_md,
partition_id=("C", "up"),
row_id=("dele", 0),
)
assert num_deleted1 == 1
num_found_items_1 = len(
list(await t_fad.afind_entries(n=total_matching_rows + 1, metadata=q_md))
)
assert num_found_items_1 == total_matching_rows - 1

# deletion of part of a partition
num_deleted_p = await t_fad.afind_and_delete_entries(
metadata=q_md,
partition_id=("C", "up"),
n=FAD_MAX_COUNT,
batch_size=FAD_BATCH_SIZE,
)
assert num_deleted_p == FAD_MAX_COUNT
num_found_items_p = len(
list(await t_fad.afind_entries(n=total_matching_rows + 1, metadata=q_md))
)
assert num_found_items_p == total_matching_rows - FAD_MAX_COUNT - 1

# deletion of the rest of the partition
num_deleted_p2 = await t_fad.afind_and_delete_entries(
metadata=q_md,
partition_id=("C", "up"),
batch_size=FAD_BATCH_SIZE,
)
assert num_deleted_p2 == ONEFOURTH_N_ROWS - FAD_MAX_COUNT - 1
num_found_items_p2 = len(
list(await t_fad.afind_entries(n=total_matching_rows + 1, metadata=q_md))
)
assert num_found_items_p2 == total_matching_rows - ONEFOURTH_N_ROWS

# deletion of everything that remains
num_deleted_a = await t_fad.afind_and_delete_entries(
metadata=q_md,
batch_size=FAD_BATCH_SIZE,
)
assert num_deleted_a == ONEFOURTH_N_ROWS
num_found_items_a = len(
list(await t_fad.afind_entries(n=total_matching_rows + 1, metadata=q_md))
)
assert num_found_items_a == 0
Loading

0 comments on commit 3b8aee8

Please sign in to comment.