Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use blockbuster to detect blocking calls #175

Merged
merged 1 commit into from
Nov 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
418 changes: 228 additions & 190 deletions poetry.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ types-requests = "^2.31.0.20240311"
testcontainers = "~3.7.1"
python-dotenv = "~1.0.1"
isort = "^5.13.2"
blockbuster = "^1.2.0"

[tool.poetry.scripts]
cassio-create-init-string = "cassio.config.bundle_management:create_init_string_utility"
Expand Down
81 changes: 39 additions & 42 deletions src/cassio/table/base_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@
normalize_type_desc,
)
from cassio.table.utils import (
call_wrapped_async,
handle_multicolumn_packing,
handle_multicolumn_unpacking,
execute_cql,
)


Expand Down Expand Up @@ -192,7 +192,7 @@ def _normalize_row(self, raw_row: Any) -> Dict[str, Any]:
)
return repacked_row

def _delete(self, is_async: bool, **kwargs: Any) -> Union[None, ResponseFuture]:
def _get_delete_cql(self, **kwargs: Any) -> Tuple[str, Tuple[Any, ...]]:
n_kwargs = self._normalize_kwargs(kwargs, is_write=False)
(
rest_kwargs,
Expand All @@ -204,49 +204,45 @@ def _delete(self, is_async: bool, **kwargs: Any) -> Union[None, ResponseFuture]:
delete_cql = DELETE_CQL_TEMPLATE.format(
where_clause=where_clause,
)
if is_async:
return self.execute_cql_async(
delete_cql, args=delete_cql_vals, op_type=CQLOpType.WRITE
)
else:
self.execute_cql(delete_cql, args=delete_cql_vals, op_type=CQLOpType.WRITE)
return None
return delete_cql, delete_cql_vals

def delete(self, **kwargs: Any) -> None:
self._ensure_db_setup()
self._delete(is_async=False, **kwargs)
return None
delete_cql, delete_cql_vals = self._get_delete_cql(**kwargs)
self.execute_cql(delete_cql, args=delete_cql_vals, op_type=CQLOpType.WRITE)

def delete_async(self, **kwargs: Any) -> ResponseFuture:
self._ensure_db_setup()
return self._delete(is_async=True, **kwargs)
delete_cql, delete_cql_vals = self._get_delete_cql(**kwargs)
return self.execute_cql_async(
delete_cql, args=delete_cql_vals, op_type=CQLOpType.WRITE
)

async def adelete(self, **kwargs: Any) -> None:
await self._aensure_db_setup()
await call_wrapped_async(self.delete_async, **kwargs)

def _clear(self, is_async: bool) -> Union[None, ResponseFuture]:
truncate_table_cql = TRUNCATE_TABLE_CQL_TEMPLATE.format()
if is_async:
return self.execute_cql_async(
truncate_table_cql, args=tuple(), op_type=CQLOpType.WRITE
)
else:
self.execute_cql(truncate_table_cql, args=tuple(), op_type=CQLOpType.WRITE)
return None
delete_cql, delete_cql_vals = self._get_delete_cql(**kwargs)
await self.aexecute_cql(
delete_cql, args=delete_cql_vals, op_type=CQLOpType.WRITE
)

def clear(self) -> None:
self._ensure_db_setup()
self._clear(is_async=False)
return None
truncate_table_cql = TRUNCATE_TABLE_CQL_TEMPLATE.format()
self.execute_cql(truncate_table_cql, args=tuple(), op_type=CQLOpType.WRITE)

def clear_async(self) -> ResponseFuture:
self._ensure_db_setup()
return self._clear(is_async=True)
truncate_table_cql = TRUNCATE_TABLE_CQL_TEMPLATE.format()
return self.execute_cql_async(
truncate_table_cql, args=tuple(), op_type=CQLOpType.WRITE
)

async def aclear(self) -> None:
await self._aensure_db_setup()
await call_wrapped_async(self.clear_async)
truncate_table_cql = TRUNCATE_TABLE_CQL_TEMPLATE.format()
await self.aexecute_cql(
truncate_table_cql, args=tuple(), op_type=CQLOpType.WRITE
)

def _has_index_analyzers(self) -> bool:
if not self._body_index_options:
Expand Down Expand Up @@ -360,7 +356,7 @@ async def aget(self, **kwargs: Any) -> Optional[RowType]:
)
return self._normalize_result_set(result_set)

def _put(self, is_async: bool, **kwargs: Any) -> Union[None, ResponseFuture]:
def _get_put_cql(self, **kwargs: Any) -> Tuple[str, Tuple[Any, ...]]:
n_kwargs = self._normalize_kwargs(kwargs, is_write=True)
primary_key = self._schema_primary_key()
assert set(col for col, _ in primary_key) - set(n_kwargs.keys()) == set()
Expand All @@ -385,27 +381,26 @@ def _put(self, is_async: bool, **kwargs: Any) -> Union[None, ResponseFuture]:
value_placeholders=value_placeholders,
ttl_spec=ttl_spec,
)
#
if is_async:
return self.execute_cql_async(
insert_cql, args=insert_cql_args, op_type=CQLOpType.WRITE
)
else:
self.execute_cql(insert_cql, args=insert_cql_args, op_type=CQLOpType.WRITE)
return None
return insert_cql, insert_cql_args

def put(self, **kwargs: Any) -> None:
self._ensure_db_setup()
self._put(is_async=False, **kwargs)
return None
insert_cql, insert_cql_args = self._get_put_cql(**kwargs)
self.execute_cql(insert_cql, args=insert_cql_args, op_type=CQLOpType.WRITE)

def put_async(self, **kwargs: Any) -> ResponseFuture:
self._ensure_db_setup()
return self._put(is_async=True, **kwargs)
insert_cql, insert_cql_args = self._get_put_cql(**kwargs)
return self.execute_cql_async(
insert_cql, args=insert_cql_args, op_type=CQLOpType.WRITE
)

async def aput(self, **kwargs: Any) -> None:
await self._aensure_db_setup()
await call_wrapped_async(self.put_async, **kwargs)
insert_cql, insert_cql_args = self._get_put_cql(**kwargs)
await self.aexecute_cql(
insert_cql, args=insert_cql_args, op_type=CQLOpType.WRITE
)

def _get_db_setup_cql(self, schema: Dict[str, List[ColumnSpecType]]) -> str:
column_specs = [
Expand Down Expand Up @@ -612,10 +607,12 @@ async def aexecute_cql(
statement = SimpleStatement(final_cql)
logger.debug(f'aExecuting statement "{final_cql}" as simple (unprepared)')
else:
statement = self._obtain_prepared_statement(final_cql)
statement = await asyncio.to_thread(
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems there's no possibility to prepare a statement async in the driver atm.
So delegating to a thread...

self._obtain_prepared_statement, final_cql
)
logger.debug(f'aExecuting statement "{final_cql}" as prepared')
logger.trace(f'Statement "{final_cql}" has args: "{str(args)}"') # type: ignore
return cast(
Iterable[RowType],
await call_wrapped_async(self.session.execute_async, statement, args),
await execute_cql(self.session, statement, args),
)
29 changes: 14 additions & 15 deletions src/cassio/table/mixins/clustered.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
from cassio.table.cql import DELETE_CQL_TEMPLATE, SELECT_CQL_TEMPLATE, CQLOpType
from cassio.table.table_types import ColumnSpecType, RowType, normalize_type_desc
from cassio.table.utils import (
call_wrapped_async,
handle_multicolumn_packing,
handle_multicolumn_unpacking,
)
Expand Down Expand Up @@ -48,9 +47,9 @@ def _schema_pk(self) -> List[ColumnSpecType]:
def _schema_cc(self) -> List[ColumnSpecType]:
return self._schema_row_id()

def _delete_partition(
self, is_async: bool, partition_id: Optional[PARTITION_ID_TYPE] = None
) -> Union[None, ResponseFuture]:
def _get_delete_partition_cql(
self, partition_id: Optional[PARTITION_ID_TYPE] = None
) -> Tuple[str, Tuple[Any, ...]]:
_partition_id = self.partition_id if partition_id is None else partition_id
#
_pid_dict = handle_multicolumn_unpacking(
Expand All @@ -68,29 +67,29 @@ def _delete_partition(
delete_cql = DELETE_CQL_TEMPLATE.format(
where_clause=where_clause,
)
if is_async:
return self.execute_cql_async(
delete_cql, args=delete_cql_vals, op_type=CQLOpType.WRITE
)
else:
self.execute_cql(delete_cql, args=delete_cql_vals, op_type=CQLOpType.WRITE)
return None
return delete_cql, delete_cql_vals

def delete_partition(
self, partition_id: Optional[PARTITION_ID_TYPE] = None
) -> None:
self._delete_partition(is_async=False, partition_id=partition_id)
return None
delete_cql, delete_cql_vals = self._get_delete_partition_cql(partition_id)
self.execute_cql(delete_cql, args=delete_cql_vals, op_type=CQLOpType.WRITE)

def delete_partition_async(
self, partition_id: Optional[PARTITION_ID_TYPE] = None
) -> ResponseFuture:
return self._delete_partition(is_async=True, partition_id=partition_id)
delete_cql, delete_cql_vals = self._get_delete_partition_cql(partition_id)
return self.execute_cql_async(
delete_cql, args=delete_cql_vals, op_type=CQLOpType.WRITE
)

async def adelete_partition(
self, partition_id: Optional[PARTITION_ID_TYPE] = None
) -> None:
await call_wrapped_async(self.delete_partition_async, partition_id=partition_id)
delete_cql, delete_cql_vals = self._get_delete_partition_cql(partition_id)
await self.aexecute_cql(
delete_cql, args=delete_cql_vals, op_type=CQLOpType.WRITE
)

def _normalize_kwargs(
self, args_dict: Dict[str, Any], is_write: bool
Expand Down
2 changes: 1 addition & 1 deletion src/cassio/table/mixins/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def __init__(

@staticmethod
def _normalize_metadata_indexing_policy(
metadata_indexing: Union[Tuple[str, Iterable[str]], str]
metadata_indexing: Union[Tuple[str, Iterable[str]], str],
) -> MetadataIndexingPolicy:
mode: MetadataIndexingMode
fields: Set[str]
Expand Down
2 changes: 1 addition & 1 deletion src/cassio/table/mixins/vector.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ async def _aschema_da(self) -> List[ColumnSpecType]:

@staticmethod
def _get_create_vector_index_cql(
vector_index_options: List[Tuple[str, Any]]
vector_index_options: List[Tuple[str, Any]],
) -> str:
index_name = "idx_vector"
index_column = "vector"
Expand Down
6 changes: 5 additions & 1 deletion src/cassio/table/utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import asyncio
from typing import Any, Callable, Dict, Iterable

from cassandra.cluster import ResponseFuture
from cassandra.cluster import ResponseFuture, Session


async def call_wrapped_async(
Expand All @@ -21,6 +21,10 @@ def error_handler(exc: BaseException) -> None:
return await asyncio_future


async def execute_cql(session: Session, cql: Any, args: Any = None) -> ResponseFuture:
return await call_wrapped_async(session.execute_async, cql, args)


def handle_multicolumn_unpacking(
args_dict: Dict[str, Any],
key_name: str,
Expand Down
8 changes: 7 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
"""
fixtures for testing
"""

import os
from tempfile import TemporaryDirectory
from typing import Dict, Iterator, List, Optional, Tuple

import pytest
from blockbuster import BlockBuster, blockbuster_ctx
from cassandra.auth import PlainTextAuthProvider
from cassandra.cluster import Cluster, Session
from cassandra.protocol import ProtocolVersion
Expand All @@ -24,6 +24,12 @@
# Fixtures


@pytest.fixture(autouse=True)
def blockbuster() -> Iterator[BlockBuster]:
with blockbuster_ctx() as bb:
yield bb


@pytest.fixture(scope="session", autouse=True)
def cassandra_port(db_keyspace: str) -> Iterator[int]:
if os.getenv("TEST_DB_MODE", "LOCAL_CASSANDRA") == "TESTCONTAINERS_CASSANDRA":
Expand Down
17 changes: 9 additions & 8 deletions tests/integration/test_tableclasses_clusteredcassandratable.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from cassandra.cluster import Session

from cassio.table.tables import ClusteredCassandraTable
from cassio.table.utils import call_wrapped_async
from cassio.table.utils import execute_cql


@pytest.mark.usefixtures("db_session", "db_keyspace")
Expand Down Expand Up @@ -149,9 +149,8 @@ def test_crud_async(self, db_session: Session, db_keyspace: str) -> None:
@pytest.mark.asyncio
async def test_crud_asyncio(self, db_session: Session, db_keyspace: str) -> None:
table_name = "c_ct"
await call_wrapped_async(
db_session.execute_async,
f"DROP TABLE IF EXISTS {db_keyspace}.{table_name};",
await execute_cql(
db_session, f"DROP TABLE IF EXISTS {db_keyspace}.{table_name};"
)
#
t = ClusteredCassandraTable(
Expand Down Expand Up @@ -200,15 +199,16 @@ async def test_partition_ordering_asyncio(
self, db_session: Session, db_keyspace: str
) -> None:
table_name_asc = "c_ct"
await call_wrapped_async(
db_session.execute_async,
await execute_cql(
db_session,
f"DROP TABLE IF EXISTS {db_keyspace}.{table_name_asc};",
)
t_asc = ClusteredCassandraTable(
session=db_session,
keyspace=db_keyspace,
table=table_name_asc,
partition_id="my_part",
async_setup=True,
)
await t_asc.aput(row_id="row1", body_blob="blob1")
await t_asc.aput(row_id="row2", body_blob="blob1")
Expand All @@ -220,8 +220,8 @@ async def test_partition_ordering_asyncio(
await t_asc.aclear()
#
table_name_desc = "c_ct_desc"
await call_wrapped_async(
db_session.execute_async,
await execute_cql(
db_session,
f"DROP TABLE IF EXISTS {db_keyspace}.{table_name_desc};",
)
t_desc = ClusteredCassandraTable(
Expand All @@ -230,6 +230,7 @@ async def test_partition_ordering_asyncio(
table=table_name_desc,
partition_id="my_part",
ordering_in_partition="desc",
async_setup=True,
)
await t_desc.aput(row_id="row1", body_blob="blob1")
await t_desc.aput(row_id="row2", body_blob="blob1")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from cassandra.cluster import Session

from cassio.table.tables import ClusteredMetadataCassandraTable
from cassio.table.utils import execute_cql


@pytest.mark.usefixtures("db_session", "db_keyspace")
Expand Down Expand Up @@ -157,13 +158,16 @@ async def test_find_and_delete_entries_async(
ONEFOURTH_N_ROWS = 128
FAD_MAX_COUNT = 30 # must be < ONEFOURTH_N_ROWS for full testing
FAD_BATCH_SIZE = 25 # must be < FAD_MAX_COUNT-1 for full testing
db_session.execute(f"DROP TABLE IF EXISTS {db_keyspace}.{table_name_fad};")
await execute_cql(
db_session, f"DROP TABLE IF EXISTS {db_keyspace}.{table_name_fad};"
)
t_fad = ClusteredMetadataCassandraTable(
session=db_session,
keyspace=db_keyspace,
table=table_name_fad,
primary_key_type=["TEXT", "TEXT", "TEXT", "INT"],
num_partition_keys=2,
async_setup=True,
)

coros = [
Expand Down
Loading