Skip to content

Commit

Permalink
fix: improve consumer layers usage
Browse files Browse the repository at this point in the history
  • Loading branch information
floydspace committed Oct 27, 2024
1 parent 0ed6ed8 commit 5f5b329
Show file tree
Hide file tree
Showing 8 changed files with 67 additions and 35 deletions.
5 changes: 5 additions & 0 deletions .changeset/dull-hats-play.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"effect-kafka": patch
---

fix consumer layers usage
13 changes: 9 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ services:
Now, we can run our `effect-kafka` application:
```typescript
import { NodeRuntime } from "@effect/platform-node";
import { Console, Effect, Random, Schedule, Stream } from "effect";
import { Console, Effect, Layer, Random, Schedule, Stream } from "effect";
import { ConfluentKafkaJSInstance, Consumer, Producer } from "effect-kafka";
const producer = Stream.repeatEffect(Random.nextInt).pipe(
Expand All @@ -83,15 +83,20 @@ const producer = Stream.repeatEffect(Random.nextInt).pipe(
),
);
const consumer = Consumer.serveStream("random", { groupId: "group" }).pipe(
Stream.tap((record) => Console.log(record.value?.toString())),
const consumer = Consumer.serveStream("random").pipe(
Stream.tap((record) => Console.log(record.value?.toString()))
);
const program = Stream.merge(producer, consumer).pipe(Stream.runDrain);
const ProducerLive = Producer.layer({ allowAutoTopicCreation: true });
const ConsumerLive = Consumer.layer({ groupId: "group" });
const KafkaLive = ConfluentKafkaJSInstance.layer({ brokers: ["localhost:29092"] });
const MainLive = Effect.scoped(program).pipe(Effect.provide(ProducerLive), Effect.provide(KafkaLive));
const MainLive = program.pipe(
Effect.provide(Layer.merge(ProducerLive, ConsumerLive)),
Effect.provide(KafkaLive)
);
NodeRuntime.runMain(MainLive);
```
Expand Down
5 changes: 3 additions & 2 deletions examples/streamConsumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { NodeRuntime } from "@effect/platform-node";
import { Console, Effect, Stream } from "effect";
import { Consumer, KafkaJSInstance } from "../src";

const program = Consumer.serveStream("test-topic", { groupId: "group" }).pipe(
const program = Consumer.serveStream("test-topic").pipe(
Stream.runForEach(({ topic, partition, ...message }) =>
Console.log({
topic,
Expand All @@ -13,7 +13,8 @@ const program = Consumer.serveStream("test-topic", { groupId: "group" }).pipe(
),
);

const ConsumerLive = Consumer.layer({ groupId: "group" });
const KafkaLive = KafkaJSInstance.layer({ brokers: ["localhost:19092"] });
const MainLive = Effect.scoped(program).pipe(Effect.provide(KafkaLive));
const MainLive = Effect.scoped(program).pipe(Effect.provide(ConsumerLive), Effect.provide(KafkaLive));

NodeRuntime.runMain(MainLive);
8 changes: 5 additions & 3 deletions examples/streamProducer.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { NodeRuntime } from "@effect/platform-node";
import { Clock, Console, Effect, Schedule, Stream } from "effect";
import { Clock, Console, Effect, Layer, Schedule, Stream } from "effect";
import { ConfluentKafkaJSInstance, Consumer, Producer } from "../src";

const p = Stream.repeatEffect(Clock.currentTimeMillis).pipe(
Expand All @@ -14,7 +14,7 @@ const p = Stream.repeatEffect(Clock.currentTimeMillis).pipe(
),
);

const c = Consumer.serveStream("test-topic", { groupId: "group" }).pipe(
const c = Consumer.serveStream("test-topic").pipe(
Stream.tap(({ topic, partition, ...message }) =>
Console.log({
topic,
Expand All @@ -29,7 +29,9 @@ const c = Consumer.serveStream("test-topic", { groupId: "group" }).pipe(
const program = Stream.merge(p, c).pipe(Stream.runDrain);

const ProducerLive = Producer.layer({ allowAutoTopicCreation: true });
const ConsumerLive = Consumer.layer({ groupId: "group" });

const KafkaLive = ConfluentKafkaJSInstance.layer({ brokers: ["localhost:19092"] });
const MainLive = Effect.scoped(program.pipe(Effect.provide(ProducerLive))).pipe(Effect.provide(KafkaLive));
const MainLive = program.pipe(Effect.provide(Layer.merge(ProducerLive, ConsumerLive)), Effect.provide(KafkaLive));

NodeRuntime.runMain(MainLive);
13 changes: 8 additions & 5 deletions examples/streams.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,7 @@ class KafkaStreamConsumerTopic extends Context.Tag("@superwall/open-revenue-tran
const makeKafkaStreamConsumer = Effect.gen(function* () {
const { topic, groupId } = yield* KafkaStreamConsumerTopic;

const stream = Consumer.serveStream(topic, {
autoCommit: false,
groupId,
fromBeginning: true,
}).pipe(
const stream = Consumer.serveStream(topic).pipe(
Stream.zipWithPrevious,
Stream.mapEffect(([previous, current]) =>
Effect.gen(function* () {
Expand All @@ -38,6 +34,13 @@ const makeKafkaStreamConsumer = Effect.gen(function* () {
return current;
}),
),
Stream.provideSomeLayer(
Consumer.layer({
autoCommit: false,
groupId,
fromBeginning: true,
}),
),
);
return {
stream,
Expand Down
23 changes: 17 additions & 6 deletions src/Consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,15 @@ export const make: (options: {
) => Stream.Stream<ConsumerRecord.ConsumerRecord, never, Scope.Scope>;
}) => Consumer = internal.make;

/**
* @since 0.3.1
* @category constructors
*/
export const makeConsumer: (
options: Consumer.ConsumerOptions,
) => Effect.Effect<Consumer, Error.ConnectionException, KafkaInstance.KafkaInstance | Scope.Scope> =
internal.makeConsumer;

/**
* @since 0.1.0
* @category accessors
Expand Down Expand Up @@ -134,9 +143,11 @@ export const serveEffect: {
*/
export const serveStream: (
path: MessageRouter.Route.Path,
options: Consumer.ConsumerOptions,
) => Stream.Stream<
ConsumerRecord.ConsumerRecord,
Error.ConnectionException,
KafkaInstance.KafkaInstance | Scope.Scope
> = internal.serveStream;
) => Stream.Stream<ConsumerRecord.ConsumerRecord, Error.ConnectionException, Consumer | Scope.Scope> =
internal.serveStream;

/**
* @since 0.3.1
* @category layers
*/
export const layer = (options: Consumer.ConsumerOptions) => Layer.scoped(Consumer, makeConsumer(options));
18 changes: 9 additions & 9 deletions src/Producer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,15 +98,6 @@ export const make: (options: {
readonly sendBatch: (batch: Producer.ProducerBatch) => Effect.Effect<Producer.RecordMetadata[]>;
}) => Producer = internal.make;

/**
* @since 0.2.0
* @category accessors
*/
export const send: (
record: Producer.ProducerRecord,
) => Effect.Effect<Producer.RecordMetadata[], Error.ConnectionException, KafkaInstance.KafkaInstance | Scope.Scope> =
internal.send;

/**
* @since 0.2.0
* @category constructors
Expand All @@ -116,6 +107,15 @@ export const makeProducer: (
) => Effect.Effect<Producer, Error.ConnectionException, KafkaInstance.KafkaInstance | Scope.Scope> =
internal.makeProducer;

/**
* @since 0.2.0
* @category accessors
*/
export const send: (
record: Producer.ProducerRecord,
) => Effect.Effect<Producer.RecordMetadata[], Error.ConnectionException, KafkaInstance.KafkaInstance | Scope.Scope> =
internal.send;

/**
* @since 0.2.0
* @category layers
Expand Down
17 changes: 11 additions & 6 deletions src/internal/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,15 @@ export const make = (options: {
) => Stream.Stream<ConsumerRecord.ConsumerRecord, never, Scope.Scope>;
}): Consumer.Consumer => Object.assign(Object.create(consumerProto), options);

/** @internal */
export const makeConsumer = (
options: Consumer.Consumer.ConsumerOptions,
): Effect.Effect<Consumer.Consumer, Error.ConnectionException, KafkaInstance.KafkaInstance | Scope.Scope> =>
Effect.gen(function* () {
const instance = yield* KafkaInstance.KafkaInstance;
return yield* instance.consumer(options);
});

/** @internal */
export const serve = dual<
{
Expand Down Expand Up @@ -109,12 +118,8 @@ export const serveEffect = dual<
/** @internal */
export const serveStream = (
path: MessageRouter.Route.Path,
options: Consumer.Consumer.ConsumerOptions,
): Stream.Stream<ConsumerRecord.ConsumerRecord, Error.ConnectionException, KafkaInstance.KafkaInstance | Scope.Scope> =>
Effect.gen(function* () {
const instance = yield* KafkaInstance.KafkaInstance;
return yield* instance.consumer(options);
}).pipe(
): Stream.Stream<ConsumerRecord.ConsumerRecord, Error.ConnectionException, Consumer.Consumer | Scope.Scope> =>
consumerTag.pipe(
Effect.map((consumer) => consumer.runStream(path)),
Stream.flatten(),
);

0 comments on commit 5f5b329

Please sign in to comment.