Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(send): _NoDefault serializer sentinel introduced in order to make… #280

Merged
merged 1 commit into from
Feb 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 106 additions & 14 deletions docs/serialization.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,29 @@ By default, this is what kstream does.

As you can see the ConsumerRecord's `value` is bytes.

In order to keep your code pythonic, we provide a mechanism to serialize/deserialize
these bytes, into something more useful.
In order to keep your code pythonic, we provide a mechanism to `serialize/deserialize` these `bytes`, into something more useful.
This way, you can work with other data structures, like a `dict` or `dataclasses`.

## Serializers

Sometimes it is easier to work with a `dict` in your app, give it to `kstreams`, and let it transform it into `bytes` to be delivered to Kafka. For this situation, you need to implement `kstreams.serializers.Serializer`.

::: kstreams.serializers.Serializer
options:
show_root_heading: true
docstring_section_style: table
show_bases: false
members:
-

## Deserializers

The other situation is when you consume from Kafka (or other brokers). Instead of dealing with `bytes`, you may want to receive in your function the `dict` ready to be used.
For those cases, we need to use [middlewares](https://kpn.github.io/kstreams/middleware/).

The other situation is when you consume from Kafka (or other brokers). Instead of dealing with `bytes`,
you may want to receive in your function the `dict` ready to be used. For those cases, we need to use [middleware](https://kpn.github.io/kstreams/middleware/). For example, we can implement a `JsonMiddleware`:
### Deserializers Middleware

For example, we can implement a `JsonMiddleware`:

```python
from kstreams import middleware, ConsumerRecord
Expand All @@ -37,47 +45,131 @@ class JsonDeserializerMiddleware(middleware.BaseMiddleware):
return await self.next_call(cr)
```

It is also possble to use `kstreams.serializers.Deserializer` for deserialization, but this will be deprecated
### Old Deserializers

The old fashion way is to use `Deserializers`, which has been deprecated (but still maintained) in favor of [middleware](https://kpn.github.io/kstreams/middleware/)

::: kstreams.serializers.Deserializer
options:
show_root_heading: true
show_root_heading: false
docstring_section_style: table
show_bases: false
members:
-

!!! warning
`kstreams.serializers.Deserializer` will be deprecated, use [middlewares](https://kpn.github.io/kstreams/middleware/) instead

## Usage

Once you have written your serializer or deserializer, there are 2 ways of using them, in a
generic fashion or per stream.
Once you have written your `serializer` and `middleware/deserializer`, there are two ways to use them:

### Initialize the engine with your serializers
- `Globally`: When `Serializer` and/or `Deserializer` is set to the `StreamEngine` instance
- `Per case`: When a `Serializer` is used with the `send coroutine` or a `Middleware/Deserializer` is set to a `stream`

By doing this all the streams will use these serializers by default.
### Globally

The engine is initialized with serializers. By doing this all the streams will use these deserializers by default and every time that an event is produced
then the default `serializer` is used.

```python title="Json events example"
from kstreams import create_engine, middleware, ConsumerRecord

topic = "local--kstreams"

```python
stream_engine = create_engine(
title="my-stream-engine",
serializer=JsonSerializer(),
deserializer=JsonDeserializer(), # old fashion way and it will be deprecated
)


@stream_engine.stream(topic)
async def hello_stream(cr: ConsumerRecord):
# remember event.value is now a dict
print(cr.value["message"])
save_to_db(cr)
assert cr.value == {"message": "test"}


await stream_engine.send(
topic,
value={"message": "test"}
headers={"content-type": consts.APPLICATION_JSON,}
key="1",
)
```

### Initilize `streams` with a `deserializer` and produce events with `serializers`
### Per case

This is when `streams` are initialized with a `deserializer` (preferably a `middleware`) and we produce events with `serializers` in the send function.

- If a `global serializer` is set but we call `send(serializer=...)`, then the local `serializer` is used, not the global one.
- If a `global deserializer` is set but a `stream` has a local one, then the local `deserializer` is used. In other words, the most specific `deserializer` will be used

```python
from kstreams import middleware, ConsumerRecord
from kstreams import create_engine, middleware, ConsumerRecord

topic = "local--kstreams"

# stream_engine created without a `serializer/deserializer`
stream_engine = create_engine(
title="my-stream-engine",
)

# Here deserializer=JsonDeserializer() instead, but it will be deprecated
@stream_engine.stream(topic, middlewares=[middleware.Middleware(JsonDeserializerMiddleware)])
async def hello_stream(cr: ConsumerRecord):
# remember event.value is now a dict
print(cr.value["message"])
save_to_db(cr)


# send with a serializer
await stream_engine.send(
topic,
value={"message": "test"}
headers={"content-type": consts.APPLICATION_JSON,}
serializer=JsonSerializer() # in this case the Global Serializer is not used if there was one
key="1",
)
```

```python
## Forcing raw data

There is a situation when a `global` serializer is being used but still we want to produce raw data, for example when producing to a `DLQ`.
For this case, when must set the `serialzer` option to `None`:

```python title="DLQ example"
from kstreams import create_engine, middleware, ConsumerRecord


topic = "local--kstreams"
dlq_topic = "dlq--kstreams"

stream_engine = create_engine(
title="my-stream-engine",
serializer=JsonSerializer(), # Global serializer
)


@stream_engine.stream(topic)
async def hello_stream(cr: ConsumerRecord):
try:
# remember event.value is now a dict
save_to_db(cr)
assert cr.value == {"message": "test"}
except DeserializationException:
await stream_engine.send(
dlq_topic,
value=cr.value
headers=cr.headers
key=cr.key,
serializer=None, # force raw data
)


# this will produce Json
await stream_engine.send(
topic,
value={"message": "test"}
Expand Down
39 changes: 30 additions & 9 deletions examples/json_serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,18 @@

import aiorun

from kstreams import ConsumerRecord, Stream, consts, create_engine, middleware
from kstreams import ConsumerRecord, consts, create_engine, middleware
from kstreams.types import Headers

logger = logging.getLogger(__name__)


json_data = {"message": "Hello world!"}
raw_data = b"Hello world!"
raw_topic = "local--kstreams"
json_topic = "local--kstreams-json"


class JsonSerializer:
async def serialize(
self,
Expand Down Expand Up @@ -38,33 +44,48 @@ async def __call__(self, cr: ConsumerRecord):
serializer=JsonSerializer(),
)

data = {"message": "Hello world!"}
topic = "local--kstreams-json"

@stream_engine.stream(
raw_topic,
group_id="my-group-raw-data",
)
async def consume_raw(cr: ConsumerRecord):
logger.info(f"Event consumed: headers: {cr.headers}, value: {cr.value}")
assert cr.value == raw_data


@stream_engine.stream(
topic,
group_id="my-group",
json_topic,
group_id="my-group-json-data",
middlewares=[middleware.Middleware(JsonDeserializerMiddleware)],
)
async def consume(cr: ConsumerRecord, stream: Stream):
async def consume_json(cr: ConsumerRecord):
logger.info(f"Event consumed: headers: {cr.headers}, value: {cr.value}")
assert cr.value == data
assert cr.value == json_data


async def produce():
for _ in range(5):
# Serialize the data with APPLICATION_JSON
metadata = await stream_engine.send(
topic,
value=data,
json_topic,
value=json_data,
headers={
"content-type": consts.APPLICATION_JSON,
},
)
logger.info(f"Message sent: {metadata}")
await asyncio.sleep(3)

# send raw data to show that it is possible to send data without serialization
metadata = await stream_engine.send(
raw_topic,
value=raw_data,
serializer=None,
)

logger.info(f"Message sent: {metadata}")


async def main():
await stream_engine.start()
Expand Down
7 changes: 4 additions & 3 deletions kstreams/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from .middleware.udf_middleware import UdfHandler
from .prometheus.monitor import PrometheusMonitor
from .rebalance_listener import MetricsRebalanceListener, RebalanceListener
from .serializers import Deserializer, Serializer
from .serializers import NO_DEFAULT, Deserializer, Serializer
from .streams import Stream, StreamFunc
from .streams import stream as stream_func
from .types import Deprecated, EngineHooks, Headers, NextMiddlewareCall
Expand Down Expand Up @@ -96,7 +96,7 @@ async def send(
partition: typing.Optional[int] = None,
timestamp_ms: typing.Optional[int] = None,
headers: typing.Optional[Headers] = None,
serializer: typing.Optional[Serializer] = None,
serializer: typing.Optional[Serializer] = NO_DEFAULT,
serializer_kwargs: typing.Optional[typing.Dict] = None,
):
"""
Expand All @@ -114,7 +114,8 @@ async def send(
if self._producer is None:
raise EngineNotStartedException()

serializer = serializer or self.serializer
if serializer is NO_DEFAULT:
marcosschroh marked this conversation as resolved.
Show resolved Hide resolved
serializer = self.serializer

# serialize only when value and serializer are present
if value is not None and serializer is not None:
Expand Down
76 changes: 54 additions & 22 deletions kstreams/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,22 @@


class Deserializer(Protocol):
woile marked this conversation as resolved.
Show resolved Hide resolved
"""Protocol used by the Stream to deserialize.

A Protocol is similar to other languages features like an interface or a trait.

End users should provide their own class implementing this protocol.

For example a `JsonDeserializer`

```python
import json
from kstreams import ConsumerRecord

class JsonDeserializer:

async def deserialize(
self, consumer_record: ConsumerRecord, **kwargs
) -> ConsumerRecord:
data = json.loads(consumer_record.value.decode())
consumer_record.value = data
return consumer_record
```
"""Deserializers must implement the Deserializer Protocol

!!! Example
```python
import json
from kstreams import ConsumerRecord

class JsonDeserializer:

async def deserialize(
self, consumer_record: ConsumerRecord, **kwargs
) -> ConsumerRecord:
data = json.loads(consumer_record.value.decode())
consumer_record.value = data
return consumer_record
```
"""

async def deserialize(
Expand Down Expand Up @@ -75,4 +70,41 @@ async def serialize(
"""
Implement this method to deserialize the data received from the topic.
"""
...
... # pragma: no cover
marcosschroh marked this conversation as resolved.
Show resolved Hide resolved


class _NoDefault:
"""
This class is used as sentinel to indicate that no default serializer
value is provided when calling StreamEngine.send(...).

The sentinel helps to make a distintion between `None` and `_NoDefault` to solve
the case when there is a global serializer and `send(serializer=None)` is called
to indicate that `binary` must be send rather a serialized payload.

If we do not have this sentinel, then we can't distinguish when the global
marcosschroh marked this conversation as resolved.
Show resolved Hide resolved
serializer should be used or not.

Example:
StreamEngine(...).send("topic", value) # use global serializer if set
StreamEngine(...).send("topic", value, serializer=None) # send binary
StreamEngine(...).send(
"topic", value, serializer=CustomSerializer()) # use custom serializer

* If a global serializer is not set, then binary is always send.

"""

async def serialize(
self,
payload: Any,
headers: Optional[Headers] = None,
serializer_kwargs: Optional[Dict] = None,
) -> bytes:
"""
Implement this method to deserialize the data received from the topic.
"""
return payload # pragma: no cover


NO_DEFAULT = _NoDefault()
4 changes: 2 additions & 2 deletions kstreams/test_utils/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from kstreams import Consumer, Producer
from kstreams.engine import StreamEngine
from kstreams.prometheus.monitor import PrometheusMonitor
from kstreams.serializers import Serializer
from kstreams.serializers import NO_DEFAULT, Serializer
from kstreams.streams import Stream
from kstreams.types import ConsumerRecord, Headers

Expand Down Expand Up @@ -116,7 +116,7 @@ async def send(
partition: int = 0,
timestamp_ms: Optional[int] = None,
headers: Optional[Headers] = None,
serializer: Optional[Serializer] = None,
serializer: Optional[Serializer] = NO_DEFAULT,
serializer_kwargs: Optional[Dict] = None,
) -> RecordMetadata:
return await self.stream_engine.send(
Expand Down
1 change: 1 addition & 0 deletions scripts/cluster/start
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

docker-compose up -d
scripts/cluster/topics/create "local--kstreams"
scripts/cluster/topics/create "local--kstreams-json"
scripts/cluster/topics/create "local--hello-world"
scripts/cluster/topics/create "local--sse"
scripts/cluster/topics/create "local--avro-user"
Expand Down
Loading
Loading