-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathkafka.go
45 lines (36 loc) · 998 Bytes
/
kafka.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package eventqueue
import (
"gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
)
// KafkaConfig the config to kafka
type KafkaConfig struct {
Broker string
}
type kafkaProducer struct {
producer *kafka.Producer
deliveryChan chan kafka.Event
}
func (k *kafkaProducer) Produce(topic string, message string) error {
k.producer.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
Value: []byte(message),
}, k.deliveryChan)
e := <-k.deliveryChan
m := e.(*kafka.Message)
if m.TopicPartition.Error != nil {
return m.TopicPartition.Error
}
return nil
}
func (k *kafkaProducer) Close() {
close(k.deliveryChan)
}
// NewProducer constructor function to kafka producer
func NewProducer(config *KafkaConfig) (Producer, error) {
p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": config.Broker})
if err != nil {
return nil, err
}
k := kafkaProducer{p, make(chan kafka.Event)}
return &k, nil
}