Skip to content

Commit

Permalink
Use blockbuster to detect blocking calls
Browse files Browse the repository at this point in the history
  • Loading branch information
cbornet committed Nov 25, 2024
1 parent 03ba4da commit bcb3ad1
Show file tree
Hide file tree
Showing 13 changed files with 331 additions and 275 deletions.
418 changes: 228 additions & 190 deletions poetry.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ types-requests = "^2.31.0.20240311"
testcontainers = "~3.7.1"
python-dotenv = "~1.0.1"
isort = "^5.13.2"
blockbuster = "^1.2.0"

[tool.poetry.scripts]
cassio-create-init-string = "cassio.config.bundle_management:create_init_string_utility"
Expand Down
81 changes: 39 additions & 42 deletions src/cassio/table/base_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@
normalize_type_desc,
)
from cassio.table.utils import (
call_wrapped_async,
handle_multicolumn_packing,
handle_multicolumn_unpacking,
execute_cql,
)


Expand Down Expand Up @@ -192,7 +192,7 @@ def _normalize_row(self, raw_row: Any) -> Dict[str, Any]:
)
return repacked_row

def _delete(self, is_async: bool, **kwargs: Any) -> Union[None, ResponseFuture]:
def _get_delete_cql(self, **kwargs: Any):
n_kwargs = self._normalize_kwargs(kwargs, is_write=False)
(
rest_kwargs,
Expand All @@ -204,49 +204,45 @@ def _delete(self, is_async: bool, **kwargs: Any) -> Union[None, ResponseFuture]:
delete_cql = DELETE_CQL_TEMPLATE.format(
where_clause=where_clause,
)
if is_async:
return self.execute_cql_async(
delete_cql, args=delete_cql_vals, op_type=CQLOpType.WRITE
)
else:
self.execute_cql(delete_cql, args=delete_cql_vals, op_type=CQLOpType.WRITE)
return None
return delete_cql, delete_cql_vals

def delete(self, **kwargs: Any) -> None:
self._ensure_db_setup()
self._delete(is_async=False, **kwargs)
return None
delete_cql, delete_cql_vals = self._get_delete_cql(**kwargs)
self.execute_cql(delete_cql, args=delete_cql_vals, op_type=CQLOpType.WRITE)

def delete_async(self, **kwargs: Any) -> ResponseFuture:
self._ensure_db_setup()
return self._delete(is_async=True, **kwargs)
delete_cql, delete_cql_vals = self._get_delete_cql(**kwargs)
return self.execute_cql_async(
delete_cql, args=delete_cql_vals, op_type=CQLOpType.WRITE
)

async def adelete(self, **kwargs: Any) -> None:
await self._aensure_db_setup()
await call_wrapped_async(self.delete_async, **kwargs)

def _clear(self, is_async: bool) -> Union[None, ResponseFuture]:
truncate_table_cql = TRUNCATE_TABLE_CQL_TEMPLATE.format()
if is_async:
return self.execute_cql_async(
truncate_table_cql, args=tuple(), op_type=CQLOpType.WRITE
)
else:
self.execute_cql(truncate_table_cql, args=tuple(), op_type=CQLOpType.WRITE)
return None
delete_cql, delete_cql_vals = self._get_delete_cql(**kwargs)
await self.aexecute_cql(
delete_cql, args=delete_cql_vals, op_type=CQLOpType.WRITE
)

def clear(self) -> None:
self._ensure_db_setup()
self._clear(is_async=False)
return None
truncate_table_cql = TRUNCATE_TABLE_CQL_TEMPLATE.format()
self.execute_cql(truncate_table_cql, args=tuple(), op_type=CQLOpType.WRITE)

def clear_async(self) -> ResponseFuture:
self._ensure_db_setup()
return self._clear(is_async=True)
truncate_table_cql = TRUNCATE_TABLE_CQL_TEMPLATE.format()
return self.execute_cql_async(
truncate_table_cql, args=tuple(), op_type=CQLOpType.WRITE
)

async def aclear(self) -> None:
await self._aensure_db_setup()
await call_wrapped_async(self.clear_async)
truncate_table_cql = TRUNCATE_TABLE_CQL_TEMPLATE.format()
await self.aexecute_cql(
truncate_table_cql, args=tuple(), op_type=CQLOpType.WRITE
)

def _has_index_analyzers(self) -> bool:
if not self._body_index_options:
Expand Down Expand Up @@ -360,7 +356,7 @@ async def aget(self, **kwargs: Any) -> Optional[RowType]:
)
return self._normalize_result_set(result_set)

def _put(self, is_async: bool, **kwargs: Any) -> Union[None, ResponseFuture]:
def _get_put_cql(self, **kwargs: Any):
n_kwargs = self._normalize_kwargs(kwargs, is_write=True)
primary_key = self._schema_primary_key()
assert set(col for col, _ in primary_key) - set(n_kwargs.keys()) == set()
Expand All @@ -385,27 +381,26 @@ def _put(self, is_async: bool, **kwargs: Any) -> Union[None, ResponseFuture]:
value_placeholders=value_placeholders,
ttl_spec=ttl_spec,
)
#
if is_async:
return self.execute_cql_async(
insert_cql, args=insert_cql_args, op_type=CQLOpType.WRITE
)
else:
self.execute_cql(insert_cql, args=insert_cql_args, op_type=CQLOpType.WRITE)
return None
return insert_cql, insert_cql_args

def put(self, **kwargs: Any) -> None:
self._ensure_db_setup()
self._put(is_async=False, **kwargs)
return None
insert_cql, insert_cql_args = self._get_put_cql(**kwargs)
self.execute_cql(insert_cql, args=insert_cql_args, op_type=CQLOpType.WRITE)

def put_async(self, **kwargs: Any) -> ResponseFuture:
self._ensure_db_setup()
return self._put(is_async=True, **kwargs)
insert_cql, insert_cql_args = self._get_put_cql(**kwargs)
return self.execute_cql_async(
insert_cql, args=insert_cql_args, op_type=CQLOpType.WRITE
)

async def aput(self, **kwargs: Any) -> None:
await self._aensure_db_setup()
await call_wrapped_async(self.put_async, **kwargs)
insert_cql, insert_cql_args = self._get_put_cql(**kwargs)
await self.aexecute_cql(
insert_cql, args=insert_cql_args, op_type=CQLOpType.WRITE
)

def _get_db_setup_cql(self, schema: Dict[str, List[ColumnSpecType]]) -> str:
column_specs = [
Expand Down Expand Up @@ -612,10 +607,12 @@ async def aexecute_cql(
statement = SimpleStatement(final_cql)
logger.debug(f'aExecuting statement "{final_cql}" as simple (unprepared)')
else:
statement = self._obtain_prepared_statement(final_cql)
statement = await asyncio.to_thread(
self._obtain_prepared_statement, final_cql
)
logger.debug(f'aExecuting statement "{final_cql}" as prepared')
logger.trace(f'Statement "{final_cql}" has args: "{str(args)}"') # type: ignore
return cast(
Iterable[RowType],
await call_wrapped_async(self.session.execute_async, statement, args),
await execute_cql(self.session, statement, args),
)
29 changes: 14 additions & 15 deletions src/cassio/table/mixins/clustered.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
from cassio.table.cql import DELETE_CQL_TEMPLATE, SELECT_CQL_TEMPLATE, CQLOpType
from cassio.table.table_types import ColumnSpecType, RowType, normalize_type_desc
from cassio.table.utils import (
call_wrapped_async,
handle_multicolumn_packing,
handle_multicolumn_unpacking,
)
Expand Down Expand Up @@ -48,9 +47,9 @@ def _schema_pk(self) -> List[ColumnSpecType]:
def _schema_cc(self) -> List[ColumnSpecType]:
return self._schema_row_id()

def _delete_partition(
self, is_async: bool, partition_id: Optional[PARTITION_ID_TYPE] = None
) -> Union[None, ResponseFuture]:
def _get_delete_partition_cql(
self, partition_id: Optional[PARTITION_ID_TYPE] = None
):
_partition_id = self.partition_id if partition_id is None else partition_id
#
_pid_dict = handle_multicolumn_unpacking(
Expand All @@ -68,29 +67,29 @@ def _delete_partition(
delete_cql = DELETE_CQL_TEMPLATE.format(
where_clause=where_clause,
)
if is_async:
return self.execute_cql_async(
delete_cql, args=delete_cql_vals, op_type=CQLOpType.WRITE
)
else:
self.execute_cql(delete_cql, args=delete_cql_vals, op_type=CQLOpType.WRITE)
return None
return delete_cql, delete_cql_vals

def delete_partition(
self, partition_id: Optional[PARTITION_ID_TYPE] = None
) -> None:
self._delete_partition(is_async=False, partition_id=partition_id)
return None
delete_cql, delete_cql_vals = self._get_delete_partition_cql(partition_id)
self.execute_cql(delete_cql, args=delete_cql_vals, op_type=CQLOpType.WRITE)

def delete_partition_async(
self, partition_id: Optional[PARTITION_ID_TYPE] = None
) -> ResponseFuture:
return self._delete_partition(is_async=True, partition_id=partition_id)
delete_cql, delete_cql_vals = self._get_delete_partition_cql(partition_id)
return self.execute_cql_async(
delete_cql, args=delete_cql_vals, op_type=CQLOpType.WRITE
)

async def adelete_partition(
self, partition_id: Optional[PARTITION_ID_TYPE] = None
) -> None:
await call_wrapped_async(self.delete_partition_async, partition_id=partition_id)
delete_cql, delete_cql_vals = self._get_delete_partition_cql(partition_id)
await self.aexecute_cql(
delete_cql, args=delete_cql_vals, op_type=CQLOpType.WRITE
)

def _normalize_kwargs(
self, args_dict: Dict[str, Any], is_write: bool
Expand Down
2 changes: 1 addition & 1 deletion src/cassio/table/mixins/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def __init__(

@staticmethod
def _normalize_metadata_indexing_policy(
metadata_indexing: Union[Tuple[str, Iterable[str]], str]
metadata_indexing: Union[Tuple[str, Iterable[str]], str],
) -> MetadataIndexingPolicy:
mode: MetadataIndexingMode
fields: Set[str]
Expand Down
2 changes: 1 addition & 1 deletion src/cassio/table/mixins/vector.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ async def _aschema_da(self) -> List[ColumnSpecType]:

@staticmethod
def _get_create_vector_index_cql(
vector_index_options: List[Tuple[str, Any]]
vector_index_options: List[Tuple[str, Any]],
) -> str:
index_name = "idx_vector"
index_column = "vector"
Expand Down
6 changes: 5 additions & 1 deletion src/cassio/table/utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import asyncio
from typing import Any, Callable, Dict, Iterable

from cassandra.cluster import ResponseFuture
from cassandra.cluster import ResponseFuture, Session


async def call_wrapped_async(
Expand All @@ -21,6 +21,10 @@ def error_handler(exc: BaseException) -> None:
return await asyncio_future


async def execute_cql(session: Session, cql: Any, args: Any = None) -> ResponseFuture:
return await call_wrapped_async(session.execute_async, cql, args)


def handle_multicolumn_unpacking(
args_dict: Dict[str, Any],
key_name: str,
Expand Down
8 changes: 7 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
"""
fixtures for testing
"""

import os
from tempfile import TemporaryDirectory
from typing import Dict, Iterator, List, Optional, Tuple

import pytest
from blockbuster import BlockBuster, blockbuster_ctx
from cassandra.auth import PlainTextAuthProvider
from cassandra.cluster import Cluster, Session
from cassandra.protocol import ProtocolVersion
Expand All @@ -24,6 +24,12 @@
# Fixtures


@pytest.fixture(autouse=True)
def blockbuster() -> Iterator[BlockBuster]:
with blockbuster_ctx() as bb:
yield bb


@pytest.fixture(scope="session", autouse=True)
def cassandra_port(db_keyspace: str) -> Iterator[int]:
if os.getenv("TEST_DB_MODE", "LOCAL_CASSANDRA") == "TESTCONTAINERS_CASSANDRA":
Expand Down
17 changes: 9 additions & 8 deletions tests/integration/test_tableclasses_clusteredcassandratable.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from cassandra.cluster import Session

from cassio.table.tables import ClusteredCassandraTable
from cassio.table.utils import call_wrapped_async
from cassio.table.utils import execute_cql


@pytest.mark.usefixtures("db_session", "db_keyspace")
Expand Down Expand Up @@ -149,9 +149,8 @@ def test_crud_async(self, db_session: Session, db_keyspace: str) -> None:
@pytest.mark.asyncio
async def test_crud_asyncio(self, db_session: Session, db_keyspace: str) -> None:
table_name = "c_ct"
await call_wrapped_async(
db_session.execute_async,
f"DROP TABLE IF EXISTS {db_keyspace}.{table_name};",
await execute_cql(
db_session, f"DROP TABLE IF EXISTS {db_keyspace}.{table_name};"
)
#
t = ClusteredCassandraTable(
Expand Down Expand Up @@ -200,15 +199,16 @@ async def test_partition_ordering_asyncio(
self, db_session: Session, db_keyspace: str
) -> None:
table_name_asc = "c_ct"
await call_wrapped_async(
db_session.execute_async,
await execute_cql(
db_session,
f"DROP TABLE IF EXISTS {db_keyspace}.{table_name_asc};",
)
t_asc = ClusteredCassandraTable(
session=db_session,
keyspace=db_keyspace,
table=table_name_asc,
partition_id="my_part",
async_setup=True,
)
await t_asc.aput(row_id="row1", body_blob="blob1")
await t_asc.aput(row_id="row2", body_blob="blob1")
Expand All @@ -220,8 +220,8 @@ async def test_partition_ordering_asyncio(
await t_asc.aclear()
#
table_name_desc = "c_ct_desc"
await call_wrapped_async(
db_session.execute_async,
await execute_cql(
db_session,
f"DROP TABLE IF EXISTS {db_keyspace}.{table_name_desc};",
)
t_desc = ClusteredCassandraTable(
Expand All @@ -230,6 +230,7 @@ async def test_partition_ordering_asyncio(
table=table_name_desc,
partition_id="my_part",
ordering_in_partition="desc",
async_setup=True,
)
await t_desc.aput(row_id="row1", body_blob="blob1")
await t_desc.aput(row_id="row2", body_blob="blob1")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from cassandra.cluster import Session

from cassio.table.tables import ClusteredMetadataCassandraTable
from cassio.table.utils import execute_cql


@pytest.mark.usefixtures("db_session", "db_keyspace")
Expand Down Expand Up @@ -157,13 +158,16 @@ async def test_find_and_delete_entries_async(
ONEFOURTH_N_ROWS = 128
FAD_MAX_COUNT = 30 # must be < ONEFOURTH_N_ROWS for full testing
FAD_BATCH_SIZE = 25 # must be < FAD_MAX_COUNT-1 for full testing
db_session.execute(f"DROP TABLE IF EXISTS {db_keyspace}.{table_name_fad};")
await execute_cql(
db_session, f"DROP TABLE IF EXISTS {db_keyspace}.{table_name_fad};"
)
t_fad = ClusteredMetadataCassandraTable(
session=db_session,
keyspace=db_keyspace,
table=table_name_fad,
primary_key_type=["TEXT", "TEXT", "TEXT", "INT"],
num_partition_keys=2,
async_setup=True,
)

coros = [
Expand Down
Loading

0 comments on commit bcb3ad1

Please sign in to comment.