Skip to content

Commit

Permalink
add support log to kafka
Browse files Browse the repository at this point in the history
add support log to kafka
  • Loading branch information
qclaogui committed Aug 11, 2019
1 parent 05fdae1 commit d604b73
Show file tree
Hide file tree
Showing 3 changed files with 196 additions and 2 deletions.
3 changes: 1 addition & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@ module github.com/qclaogui/lg
go 1.12

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/Shopify/sarama v1.23.1
github.com/pkg/errors v0.8.1 // indirect
github.com/stretchr/testify v1.3.0 // indirect
go.uber.org/atomic v1.4.0 // indirect
go.uber.org/multierr v1.1.0 // indirect
go.uber.org/zap v1.10.0
Expand Down
35 changes: 35 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,17 +1,52 @@
github.com/DataDog/zstd v1.3.6-0.20190409195224-796139022798 h1:2T/jmrHeTezcCM58lvEQXs0UpQJCo5SoGAcg+mbSTIg=
github.com/DataDog/zstd v1.3.6-0.20190409195224-796139022798/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo=
github.com/Shopify/sarama v1.23.1 h1:XxJBCZEoWJtoWjf/xRbmGUpAmTZGnuuF0ON0EvxxBrs=
github.com/Shopify/sarama v1.23.1/go.mod h1:XLH1GYJnLVE0XCr6KdJGVJRTwY30moWNJ4sERjXX6fs=
github.com/Shopify/toxiproxy v2.1.4+incompatible h1:TKdv8HiTLgE5wdJuEML90aBgNWsokNbMijUGhmcoBJc=
github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/eapache/go-resiliency v1.1.0 h1:1NtRmCAqadE2FN4ZcN6g90TP3uk8cg9rn9eNK2197aU=
github.com/eapache/go-resiliency v1.1.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs=
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8/yCZMuEPMUDHG0CW/brkkEp8mzqk2+ODEitlw=
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU=
github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc=
github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/hashicorp/go-uuid v1.0.1/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
github.com/jcmturner/gofork v0.0.0-20190328161633-dc7c13fece03 h1:FUwcHNlEqkqLjLBdCp5PRlCFijNjvcYANOZXzCfXwCM=
github.com/jcmturner/gofork v0.0.0-20190328161633-dc7c13fece03/go.mod h1:MK8+TM0La+2rjBD4jE12Kj1pCCxK7d2LK/UM3ncEo0o=
github.com/pierrec/lz4 v0.0.0-20190327172049-315a67e90e41 h1:GeinFsrjWz97fAxVUEd748aV0cYL+I6k44gFJTCVvpU=
github.com/pierrec/lz4 v0.0.0-20190327172049-315a67e90e41/go.mod h1:3/3N9NVKO0jef7pBehbT1qWhCMrIgbYNnFAZCqQ5LRc=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/profile v1.2.1/go.mod h1:hJw3o1OdXxsrSjjVksARp5W95eeEaEfptyVZyv6JUPA=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a h1:9ZKAASQSHhDYGoxY8uLVpewe1GDZ2vu2Tr/vTdVAkFQ=
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=
github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y=
go.uber.org/atomic v1.4.0 h1:cxzIVoETapQEqDhQu3QfnvXAV4AlzcvUCxkVUFw3+EU=
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/zap v1.10.0 h1:ORx85nbTijNz8ljznvCMR1ZBIPKFn3jQrag10X2AsuM=
go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190404164418-38d8ce5564a5/go.mod h1:WFFai1msRO1wXaEeE5yQxYXgSfI8pQAWXbQop6sCtWE=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3 h1:0GoQqolDA55aaLxZyTzK/Y2ePZzZTUrRacwib7cNsYQ=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190403152447-81d4e9dc473e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
gopkg.in/jcmturner/aescts.v1 v1.0.1/go.mod h1:nsR8qBOg+OucoIW+WMhB3GspUQXq9XorLnQb9XtvcOo=
gopkg.in/jcmturner/dnsutils.v1 v1.0.1/go.mod h1:m3v+5svpVOhtFAP/wSz+yzh4Mc0Fg7eRhxkJMWSIz9Q=
gopkg.in/jcmturner/gokrb5.v7 v7.2.3/go.mod h1:l8VISx+WGYp+Fp7KRbsiUuXTTOnxIc3Tuvyavf11/WM=
gopkg.in/jcmturner/rpc.v1 v1.1.0/go.mod h1:YIdkC4XfD6GXbzje11McwsDuOlZQSb9W4vfLvuNnlv8=
160 changes: 160 additions & 0 deletions kafka.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
package lg

import (
"io"
"os"
"strings"

"go.uber.org/zap"
"go.uber.org/zap/zapcore"

"github.com/Shopify/sarama"
)

// kafkaConfig contains information
type kafkaConfig struct {
Hosts string
Topic string
}

// write2Kafka is used for sending logs to kafka.
type write2Kafka struct {
config *kafkaConfig
syncProducer sarama.SyncProducer
}

func (w *write2Kafka) Write(b []byte) (n int, err error) {
if _, _, err = w.syncProducer.SendMessage(&sarama.ProducerMessage{
Topic: w.config.Topic,
Value: sarama.ByteEncoder(b),
}); err != nil {
return
}
n = len(b)
return
}

func newLog2Kafka(cfg *kafkaConfig) (*write2Kafka, error) {
config := sarama.NewConfig()
// SyncProducer
config.Producer.RequiredAcks = sarama.WaitForLocal // Only wait for the leader to ack
config.Producer.Compression = sarama.CompressionSnappy // Compress messages
// Producer.Return.Successes must be true to be used in a SyncProducer
config.Producer.Return.Successes = true

var brokerList []string
for _, broker := range strings.Split(cfg.Hosts, ",") {
if strings.Index(broker, ":") == -1 {
broker += ":9092"
}
brokerList = append(brokerList, broker)
}

var producer sarama.SyncProducer
var err error
if producer, err = sarama.NewSyncProducer(brokerList, config); err != nil {
return nil, err
}

return &write2Kafka{config: cfg, syncProducer: producer}, nil
}

// Init initializes log by input parameters
// lvl - global log level: Debug(-1), Info(0), Warn(1), Error(2), DPanic(3), Panic(4), Fatal(5)
// timeFormat - custom time format for logger of empty string to use default
func InitOnlyKafka(lvl int, project, kafkaTopic, brokers string) (err error) {
onceInit.Do(func() {
globalLevel := zapcore.Level(lvl)

KafkaPriority := zap.LevelEnablerFunc(func(lvl zapcore.Level) bool {
return lvl >= globalLevel
})
var ws io.Writer
if ws, err = newLog2Kafka(&kafkaConfig{Hosts: brokers, Topic: kafkaTopic}); err != nil {
return
}

// Configure console output.
cfg := zap.NewProductionEncoderConfig()
if len(TimeFormat) > 0 {
cfg.TimeKey = "tsp"
cfg.EncodeTime = customTimeEncoder
}

// Optimize the Kafka output for machine consumption and the console output
// for human operators.
core := zapcore.NewTee(zapcore.NewCore(zapcore.NewJSONEncoder(cfg), zapcore.Lock(zapcore.AddSync(ws)), KafkaPriority))

// ErrorLevel 堆栈跟踪
stackTrace := zap.AddStacktrace(zap.ErrorLevel)
// 设置初始化字段
fields := zap.Fields(zap.Object("info", &commonInfo{project, getHostName()}))

// From a zapcore.Core, it's easy to construct a Logger.
APPLog = zap.New(core, fields, stackTrace)
})
return err
}

// Init initializes log by input parameters
// lvl - global log level: Debug(-1), Info(0), Warn(1), Error(2), DPanic(3), Panic(4), Fatal(5)
// timeFormat - custom time format for logger of empty string to use default
func InitWithKafka(lvl int, project, kafkaTopic, brokers string) (err error) {
onceInit.Do(func() {
// First, define our level-handling logic.
globalLevel := zapcore.Level(lvl)
// High-priority output should also go to standard error, and low-priority
// output should also go to standard out.
// It is usefull for Kubernetes deployment.
// Kubernetes interprets os.Stdout log items as INFO and os.Stderr log items
// as ERROR by default.
highPriority := zap.LevelEnablerFunc(func(lvl zapcore.Level) bool {
return lvl >= zapcore.ErrorLevel
})
lowPriority := zap.LevelEnablerFunc(func(lvl zapcore.Level) bool {
return lvl >= globalLevel && lvl < zapcore.ErrorLevel
})

KafkaPriority := zap.LevelEnablerFunc(func(lvl zapcore.Level) bool {
return lvl >= globalLevel
})
// Assume that we have clients for two Kafka topics. The clients implement
// zapcore.WriteSyncer and are safe for concurrent use. (If they only
// implement io.Writer, we can use zapcore.AddSync to add a no-op Sync
// method. If they're not safe for concurrent use, we can add a protecting
// mutex with zapcore.Lock.)
var ws io.Writer
if ws, err = newLog2Kafka(&kafkaConfig{Hosts: brokers, Topic: kafkaTopic}); err != nil {
return
}

// Configure console output.
cfg := zap.NewProductionEncoderConfig()
if len(TimeFormat) > 0 {
cfg.TimeKey = "tsp"
cfg.EncodeTime = customTimeEncoder
}

// Optimize the Kafka output for machine consumption and the console output
// for human operators.
kafkaEncoder := zapcore.NewJSONEncoder(cfg)
consoleEncoder := zapcore.NewJSONEncoder(cfg)
// Join the outputs, encoders, and level-handling functions into
// zapcore.Cores, then tee the four cores together.
core := zapcore.NewTee(
zapcore.NewCore(kafkaEncoder, zapcore.Lock(zapcore.AddSync(ws)), KafkaPriority),
// 同时写一份到标准输出
zapcore.NewCore(consoleEncoder, zapcore.Lock(os.Stdout), lowPriority),
zapcore.NewCore(consoleEncoder, zapcore.Lock(os.Stderr), highPriority),
)

// ErrorLevel 堆栈跟踪
stackTrace := zap.AddStacktrace(zap.ErrorLevel)
// 设置初始化字段
fields := zap.Fields(zap.Object("info", &commonInfo{project, getHostName()}))

// From a zapcore.Core, it's easy to construct a Logger.
APPLog = zap.New(core, fields, stackTrace)
})
return err
}

0 comments on commit d604b73

Please sign in to comment.