Skip to content

Commit

Permalink
fix(send): _NoDefault serializer sentinel introduced in order to make…
Browse files Browse the repository at this point in the history
… possible to send raw data even when a global serializer has been set (#280)
  • Loading branch information
marcosschroh authored Feb 3, 2025
1 parent e308740 commit cf235c3
Show file tree
Hide file tree
Showing 7 changed files with 316 additions and 60 deletions.
120 changes: 106 additions & 14 deletions docs/serialization.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,29 @@ By default, this is what kstream does.

As you can see the ConsumerRecord's `value` is bytes.

In order to keep your code pythonic, we provide a mechanism to serialize/deserialize
these bytes, into something more useful.
In order to keep your code pythonic, we provide a mechanism to `serialize/deserialize` these `bytes`, into something more useful.
This way, you can work with other data structures, like a `dict` or `dataclasses`.

## Serializers

Sometimes it is easier to work with a `dict` in your app, give it to `kstreams`, and let it transform it into `bytes` to be delivered to Kafka. For this situation, you need to implement `kstreams.serializers.Serializer`.

::: kstreams.serializers.Serializer
options:
show_root_heading: true
docstring_section_style: table
show_bases: false
members:
-

## Deserializers

The other situation is when you consume from Kafka (or other brokers). Instead of dealing with `bytes`, you may want to receive in your function the `dict` ready to be used.
For those cases, we need to use [middlewares](https://kpn.github.io/kstreams/middleware/).

The other situation is when you consume from Kafka (or other brokers). Instead of dealing with `bytes`,
you may want to receive in your function the `dict` ready to be used. For those cases, we need to use [middleware](https://kpn.github.io/kstreams/middleware/). For example, we can implement a `JsonMiddleware`:
### Deserializers Middleware

For example, we can implement a `JsonMiddleware`:

```python
from kstreams import middleware, ConsumerRecord
Expand All @@ -37,47 +45,131 @@ class JsonDeserializerMiddleware(middleware.BaseMiddleware):
return await self.next_call(cr)
```

It is also possble to use `kstreams.serializers.Deserializer` for deserialization, but this will be deprecated
### Old Deserializers

The old fashion way is to use `Deserializers`, which has been deprecated (but still maintained) in favor of [middleware](https://kpn.github.io/kstreams/middleware/)

::: kstreams.serializers.Deserializer
options:
show_root_heading: true
show_root_heading: false
docstring_section_style: table
show_bases: false
members:
-

!!! warning
`kstreams.serializers.Deserializer` will be deprecated, use [middlewares](https://kpn.github.io/kstreams/middleware/) instead

## Usage

Once you have written your serializer or deserializer, there are 2 ways of using them, in a
generic fashion or per stream.
Once you have written your `serializer` and `middleware/deserializer`, there are two ways to use them:

### Initialize the engine with your serializers
- `Globally`: When `Serializer` and/or `Deserializer` is set to the `StreamEngine` instance
- `Per case`: When a `Serializer` is used with the `send coroutine` or a `Middleware/Deserializer` is set to a `stream`

By doing this all the streams will use these serializers by default.
### Globally

The engine is initialized with serializers. By doing this all the streams will use these deserializers by default and every time that an event is produced
then the default `serializer` is used.

```python title="Json events example"
from kstreams import create_engine, middleware, ConsumerRecord

topic = "local--kstreams"

```python
stream_engine = create_engine(
title="my-stream-engine",
serializer=JsonSerializer(),
deserializer=JsonDeserializer(), # old fashion way and it will be deprecated
)


@stream_engine.stream(topic)
async def hello_stream(cr: ConsumerRecord):
# remember event.value is now a dict
print(cr.value["message"])
save_to_db(cr)
assert cr.value == {"message": "test"}


await stream_engine.send(
topic,
value={"message": "test"}
headers={"content-type": consts.APPLICATION_JSON,}
key="1",
)
```

### Initilize `streams` with a `deserializer` and produce events with `serializers`
### Per case

This is when `streams` are initialized with a `deserializer` (preferably a `middleware`) and we produce events with `serializers` in the send function.

- If a `global serializer` is set but we call `send(serializer=...)`, then the local `serializer` is used, not the global one.
- If a `global deserializer` is set but a `stream` has a local one, then the local `deserializer` is used. In other words, the most specific `deserializer` will be used

```python
from kstreams import middleware, ConsumerRecord
from kstreams import create_engine, middleware, ConsumerRecord

topic = "local--kstreams"

# stream_engine created without a `serializer/deserializer`
stream_engine = create_engine(
title="my-stream-engine",
)

# Here deserializer=JsonDeserializer() instead, but it will be deprecated
@stream_engine.stream(topic, middlewares=[middleware.Middleware(JsonDeserializerMiddleware)])
async def hello_stream(cr: ConsumerRecord):
# remember event.value is now a dict
print(cr.value["message"])
save_to_db(cr)


# send with a serializer
await stream_engine.send(
topic,
value={"message": "test"}
headers={"content-type": consts.APPLICATION_JSON,}
serializer=JsonSerializer() # in this case the Global Serializer is not used if there was one
key="1",
)
```

```python
## Forcing raw data

There is a situation when a `global` serializer is being used but still we want to produce raw data, for example when producing to a `DLQ`.
For this case, when must set the `serialzer` option to `None`:

```python title="DLQ example"
from kstreams import create_engine, middleware, ConsumerRecord


topic = "local--kstreams"
dlq_topic = "dlq--kstreams"

stream_engine = create_engine(
title="my-stream-engine",
serializer=JsonSerializer(), # Global serializer
)


@stream_engine.stream(topic)
async def hello_stream(cr: ConsumerRecord):
try:
# remember event.value is now a dict
save_to_db(cr)
assert cr.value == {"message": "test"}
except DeserializationException:
await stream_engine.send(
dlq_topic,
value=cr.value
headers=cr.headers
key=cr.key,
serializer=None, # force raw data
)


# this will produce Json
await stream_engine.send(
topic,
value={"message": "test"}
Expand Down
39 changes: 30 additions & 9 deletions examples/json_serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,18 @@

import aiorun

from kstreams import ConsumerRecord, Stream, consts, create_engine, middleware
from kstreams import ConsumerRecord, consts, create_engine, middleware
from kstreams.types import Headers

logger = logging.getLogger(__name__)


json_data = {"message": "Hello world!"}
raw_data = b"Hello world!"
raw_topic = "local--kstreams"
json_topic = "local--kstreams-json"


class JsonSerializer:
async def serialize(
self,
Expand Down Expand Up @@ -38,33 +44,48 @@ async def __call__(self, cr: ConsumerRecord):
serializer=JsonSerializer(),
)

data = {"message": "Hello world!"}
topic = "local--kstreams-json"

@stream_engine.stream(
raw_topic,
group_id="my-group-raw-data",
)
async def consume_raw(cr: ConsumerRecord):
logger.info(f"Event consumed: headers: {cr.headers}, value: {cr.value}")
assert cr.value == raw_data


@stream_engine.stream(
topic,
group_id="my-group",
json_topic,
group_id="my-group-json-data",
middlewares=[middleware.Middleware(JsonDeserializerMiddleware)],
)
async def consume(cr: ConsumerRecord, stream: Stream):
async def consume_json(cr: ConsumerRecord):
logger.info(f"Event consumed: headers: {cr.headers}, value: {cr.value}")
assert cr.value == data
assert cr.value == json_data


async def produce():
for _ in range(5):
# Serialize the data with APPLICATION_JSON
metadata = await stream_engine.send(
topic,
value=data,
json_topic,
value=json_data,
headers={
"content-type": consts.APPLICATION_JSON,
},
)
logger.info(f"Message sent: {metadata}")
await asyncio.sleep(3)

# send raw data to show that it is possible to send data without serialization
metadata = await stream_engine.send(
raw_topic,
value=raw_data,
serializer=None,
)

logger.info(f"Message sent: {metadata}")


async def main():
await stream_engine.start()
Expand Down
7 changes: 4 additions & 3 deletions kstreams/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from .middleware.udf_middleware import UdfHandler
from .prometheus.monitor import PrometheusMonitor
from .rebalance_listener import MetricsRebalanceListener, RebalanceListener
from .serializers import Deserializer, Serializer
from .serializers import NO_DEFAULT, Deserializer, Serializer
from .streams import Stream, StreamFunc
from .streams import stream as stream_func
from .types import Deprecated, EngineHooks, Headers, NextMiddlewareCall
Expand Down Expand Up @@ -96,7 +96,7 @@ async def send(
partition: typing.Optional[int] = None,
timestamp_ms: typing.Optional[int] = None,
headers: typing.Optional[Headers] = None,
serializer: typing.Optional[Serializer] = None,
serializer: typing.Optional[Serializer] = NO_DEFAULT,
serializer_kwargs: typing.Optional[typing.Dict] = None,
):
"""
Expand All @@ -114,7 +114,8 @@ async def send(
if self._producer is None:
raise EngineNotStartedException()

serializer = serializer or self.serializer
if serializer is NO_DEFAULT:
serializer = self.serializer

# serialize only when value and serializer are present
if value is not None and serializer is not None:
Expand Down
76 changes: 54 additions & 22 deletions kstreams/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,22 @@


class Deserializer(Protocol):
"""Protocol used by the Stream to deserialize.
A Protocol is similar to other languages features like an interface or a trait.
End users should provide their own class implementing this protocol.
For example a `JsonDeserializer`
```python
import json
from kstreams import ConsumerRecord
class JsonDeserializer:
async def deserialize(
self, consumer_record: ConsumerRecord, **kwargs
) -> ConsumerRecord:
data = json.loads(consumer_record.value.decode())
consumer_record.value = data
return consumer_record
```
"""Deserializers must implement the Deserializer Protocol
!!! Example
```python
import json
from kstreams import ConsumerRecord
class JsonDeserializer:
async def deserialize(
self, consumer_record: ConsumerRecord, **kwargs
) -> ConsumerRecord:
data = json.loads(consumer_record.value.decode())
consumer_record.value = data
return consumer_record
```
"""

async def deserialize(
Expand Down Expand Up @@ -75,4 +70,41 @@ async def serialize(
"""
Implement this method to deserialize the data received from the topic.
"""
...
... # pragma: no cover


class _NoDefault:
"""
This class is used as sentinel to indicate that no default serializer
value is provided when calling StreamEngine.send(...).
The sentinel helps to make a distintion between `None` and `_NoDefault` to solve
the case when there is a global serializer and `send(serializer=None)` is called
to indicate that `binary` must be send rather a serialized payload.
If we do not have this sentinel, then we can't distinguish when the global
serializer should be used or not.
Example:
StreamEngine(...).send("topic", value) # use global serializer if set
StreamEngine(...).send("topic", value, serializer=None) # send binary
StreamEngine(...).send(
"topic", value, serializer=CustomSerializer()) # use custom serializer
* If a global serializer is not set, then binary is always send.
"""

async def serialize(
self,
payload: Any,
headers: Optional[Headers] = None,
serializer_kwargs: Optional[Dict] = None,
) -> bytes:
"""
Implement this method to deserialize the data received from the topic.
"""
return payload # pragma: no cover


NO_DEFAULT = _NoDefault()
4 changes: 2 additions & 2 deletions kstreams/test_utils/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from kstreams import Consumer, Producer
from kstreams.engine import StreamEngine
from kstreams.prometheus.monitor import PrometheusMonitor
from kstreams.serializers import Serializer
from kstreams.serializers import NO_DEFAULT, Serializer
from kstreams.streams import Stream
from kstreams.types import ConsumerRecord, Headers

Expand Down Expand Up @@ -116,7 +116,7 @@ async def send(
partition: int = 0,
timestamp_ms: Optional[int] = None,
headers: Optional[Headers] = None,
serializer: Optional[Serializer] = None,
serializer: Optional[Serializer] = NO_DEFAULT,
serializer_kwargs: Optional[Dict] = None,
) -> RecordMetadata:
return await self.stream_engine.send(
Expand Down
1 change: 1 addition & 0 deletions scripts/cluster/start
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

docker-compose up -d
scripts/cluster/topics/create "local--kstreams"
scripts/cluster/topics/create "local--kstreams-json"
scripts/cluster/topics/create "local--hello-world"
scripts/cluster/topics/create "local--sse"
scripts/cluster/topics/create "local--avro-user"
Expand Down
Loading

0 comments on commit cf235c3

Please sign in to comment.