-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathkafkaproducer.go
104 lines (84 loc) · 2.13 KB
/
kafkaproducer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
package main
import "gopkg.in/Shopify/sarama.v1"
import "time"
import "fmt"
import "hash"
import "hash/fnv"
type KafkaProducer struct {
Brokers []string
CommonKey sarama.Encoder
producer sarama.AsyncProducer
Statsd StatisticsSender
}
type inconsistentHashPartitioner struct {
random sarama.Partitioner
hasher hash.Hash32
}
func NewInconsistentHashPartitioner(topic string) sarama.Partitioner {
p := new(inconsistentHashPartitioner)
p.random = sarama.NewRandomPartitioner("")
p.hasher = fnv.New32a()
return p
}
func (p *inconsistentHashPartitioner) Partition(message *sarama.ProducerMessage, numPartitions int32) (int32, error) {
if message.Key == nil {
return p.random.Partition(message, numPartitions)
}
bytes, err := message.Key.Encode()
if err != nil {
return -1, err
}
p.hasher.Reset()
_, err = p.hasher.Write(bytes)
if err != nil {
return -1, err
}
hash := int32(p.hasher.Sum32())
if hash < 0 {
hash = -hash
}
return hash % numPartitions, nil
}
func (p *inconsistentHashPartitioner) RequiresConsistency() bool {
return false
}
func (s *KafkaProducer) Init(brokers []string, partition_key string) error {
s.Brokers = brokers
conf := sarama.NewConfig()
conf.Producer.Return.Successes = false
conf.Producer.Return.Errors = true
conf.Producer.Partitioner = NewInconsistentHashPartitioner
conf.Producer.Flush.Messages = 1
conf.Producer.Flush.Frequency = 20 * time.Millisecond
//conf.Producer.RequiredAcks = -1
conf.Metadata.Retry.Max = 1
s.CommonKey = sarama.ByteEncoder(partition_key)
kp, err := sarama.NewAsyncProducer(brokers, conf)
if err != nil {
return err
}
s.producer = kp
go func() {
for v := range kp.Errors() {
fmt.Printf("errors: %+v\n", v.Msg)
fmt.Printf("v: %+v\n", v)
if s.Statsd != nil {
s.Statsd.Inc("logs2kafka.produceErrors", 1, 1)
}
}
}()
return nil
}
func (s *KafkaProducer) Produce(m Message) {
km := sarama.ProducerMessage{
Topic: m.Topic,
Key: s.CommonKey,
Value: sarama.ByteEncoder(m.Container.Bytes()),
}
//fmt.Printf("producer: %+v\n", s.producer)
s.producer.Input() <- &km
}
func (s *KafkaProducer) Close() {
//s.close <- true
s.producer.AsyncClose()
}