Skip to content

Commit

Permalink
docs: add some documentations
Browse files Browse the repository at this point in the history
  • Loading branch information
floydspace committed Oct 27, 2024
1 parent 7ab310e commit 0ed6ed8
Show file tree
Hide file tree
Showing 2 changed files with 125 additions and 0 deletions.
90 changes: 90 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,96 @@ Effect Docs: https://www.effect.website<br>
Effect Reference: https://effect-ts.github.io/effect<br>
Effect Kafka Reference: https://floydspace.github.io/effect-kafka

# Installation

Choose your preferred package manager and run one of the following commands in your terminal:

- **Using npm:**

```sh
npm install effect-kafka
```

- **Using pnpm:**

```sh
pnpm add effect-kafka
```

- **Using yarn:**
```sh
yarn add effect-kafka
```

Next install one of kafka engine packages:
- [KafkaJS](https://github.com/tulios/kafkajs?tab=readme-ov-file#-getting-started) - Fully JavaScript implementation.
- [@confluentinc/kafka-javascript](https://github.com/confluentinc/confluent-kafka-javascript?tab=readme-ov-file#requirements) - JavaScript interface for C++ librdkafka implementation, which is more performant, but requires native bindings.

_**Note:** You can use any of the above Kafka engine packages, depending on your preference._

# Usage

Let's write a simple Kafka producer and consumer using `effect-kafka`. Before everything, we need a running instance of Kafka. We can do that by saving the following docker-compose script in the `docker-compose.yml` file and run `docker-compose up`:

```yaml
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- 22181:2181

kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- 29092:29092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
```
Now, we can run our `effect-kafka` application:
```typescript
import { NodeRuntime } from "@effect/platform-node";
import { Console, Effect, Random, Schedule, Stream } from "effect";
import { ConfluentKafkaJSInstance, Consumer, Producer } from "effect-kafka";
const producer = Stream.repeatEffect(Random.nextInt).pipe(
Stream.schedule(Schedule.fixed("2 seconds")),
Stream.flatMap((random) =>
Effect.flatMap(Producer.Producer, (p) =>
p.send({
topic: "random",
messages: [{ key: String(random % 4), value: random.toString() }],
}),
),
),
);
const consumer = Consumer.serveStream("random", { groupId: "group" }).pipe(
Stream.tap((record) => Console.log(record.value?.toString())),
);
const program = Stream.merge(producer, consumer).pipe(Stream.runDrain);
const ProducerLive = Producer.layer({ allowAutoTopicCreation: true });
const KafkaLive = ConfluentKafkaJSInstance.layer({ brokers: ["localhost:29092"] });
const MainLive = Effect.scoped(program).pipe(Effect.provide(ProducerLive), Effect.provide(KafkaLive));
NodeRuntime.runMain(MainLive);
```

See more examples in the [examples](./examples) directory.

# Roadmap

- [x] Consumer
Expand Down
35 changes: 35 additions & 0 deletions examples/streamProducer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import { NodeRuntime } from "@effect/platform-node";
import { Clock, Console, Effect, Schedule, Stream } from "effect";
import { ConfluentKafkaJSInstance, Consumer, Producer } from "../src";

const p = Stream.repeatEffect(Clock.currentTimeMillis).pipe(
Stream.schedule(Schedule.spaced("1 second")),
Stream.flatMap((time) =>
Effect.flatMap(Producer.Producer, (producer) =>
producer.send({
topic: "test-topic",
messages: [{ value: "Hello, effect-kafka user!", timestamp: String(time) }],
}),
),
),
);

const c = Consumer.serveStream("test-topic", { groupId: "group" }).pipe(
Stream.tap(({ topic, partition, ...message }) =>
Console.log({
topic,
partition,
offset: message.offset,
value: message.value?.toString(),
time: message.timestamp,
}),
),
);

const program = Stream.merge(p, c).pipe(Stream.runDrain);

const ProducerLive = Producer.layer({ allowAutoTopicCreation: true });
const KafkaLive = ConfluentKafkaJSInstance.layer({ brokers: ["localhost:19092"] });
const MainLive = Effect.scoped(program.pipe(Effect.provide(ProducerLive))).pipe(Effect.provide(KafkaLive));

NodeRuntime.runMain(MainLive);

0 comments on commit 0ed6ed8

Please sign in to comment.