From d604b73375e2291bbcc211035ca6f3f14bad553d Mon Sep 17 00:00:00 2001 From: qclaogui Date: Sun, 11 Aug 2019 19:41:46 +0800 Subject: [PATCH] add support log to kafka add support log to kafka --- go.mod | 3 +- go.sum | 35 ++++++++++++ kafka.go | 160 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 196 insertions(+), 2 deletions(-) create mode 100644 kafka.go diff --git a/go.mod b/go.mod index 06add96..07d5a14 100644 --- a/go.mod +++ b/go.mod @@ -3,9 +3,8 @@ module github.com/qclaogui/lg go 1.12 require ( - github.com/davecgh/go-spew v1.1.1 // indirect + github.com/Shopify/sarama v1.23.1 github.com/pkg/errors v0.8.1 // indirect - github.com/stretchr/testify v1.3.0 // indirect go.uber.org/atomic v1.4.0 // indirect go.uber.org/multierr v1.1.0 // indirect go.uber.org/zap v1.10.0 diff --git a/go.sum b/go.sum index 9804ce4..ddc647d 100644 --- a/go.sum +++ b/go.sum @@ -1,17 +1,52 @@ +github.com/DataDog/zstd v1.3.6-0.20190409195224-796139022798 h1:2T/jmrHeTezcCM58lvEQXs0UpQJCo5SoGAcg+mbSTIg= +github.com/DataDog/zstd v1.3.6-0.20190409195224-796139022798/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo= +github.com/Shopify/sarama v1.23.1 h1:XxJBCZEoWJtoWjf/xRbmGUpAmTZGnuuF0ON0EvxxBrs= +github.com/Shopify/sarama v1.23.1/go.mod h1:XLH1GYJnLVE0XCr6KdJGVJRTwY30moWNJ4sERjXX6fs= +github.com/Shopify/toxiproxy v2.1.4+incompatible h1:TKdv8HiTLgE5wdJuEML90aBgNWsokNbMijUGhmcoBJc= +github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI= github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/eapache/go-resiliency v1.1.0 h1:1NtRmCAqadE2FN4ZcN6g90TP3uk8cg9rn9eNK2197aU= +github.com/eapache/go-resiliency v1.1.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs= +github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8/yCZMuEPMUDHG0CW/brkkEp8mzqk2+ODEitlw= +github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU= +github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc= +github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= +github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= +github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/hashicorp/go-uuid v1.0.1/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= +github.com/jcmturner/gofork v0.0.0-20190328161633-dc7c13fece03 h1:FUwcHNlEqkqLjLBdCp5PRlCFijNjvcYANOZXzCfXwCM= +github.com/jcmturner/gofork v0.0.0-20190328161633-dc7c13fece03/go.mod h1:MK8+TM0La+2rjBD4jE12Kj1pCCxK7d2LK/UM3ncEo0o= +github.com/pierrec/lz4 v0.0.0-20190327172049-315a67e90e41 h1:GeinFsrjWz97fAxVUEd748aV0cYL+I6k44gFJTCVvpU= +github.com/pierrec/lz4 v0.0.0-20190327172049-315a67e90e41/go.mod h1:3/3N9NVKO0jef7pBehbT1qWhCMrIgbYNnFAZCqQ5LRc= github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pkg/profile v1.2.1/go.mod h1:hJw3o1OdXxsrSjjVksARp5W95eeEaEfptyVZyv6JUPA= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a h1:9ZKAASQSHhDYGoxY8uLVpewe1GDZ2vu2Tr/vTdVAkFQ= +github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I= +github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y= go.uber.org/atomic v1.4.0 h1:cxzIVoETapQEqDhQu3QfnvXAV4AlzcvUCxkVUFw3+EU= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= go.uber.org/zap v1.10.0 h1:ORx85nbTijNz8ljznvCMR1ZBIPKFn3jQrag10X2AsuM= go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20190404164418-38d8ce5564a5/go.mod h1:WFFai1msRO1wXaEeE5yQxYXgSfI8pQAWXbQop6sCtWE= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3 h1:0GoQqolDA55aaLxZyTzK/Y2ePZzZTUrRacwib7cNsYQ= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190403152447-81d4e9dc473e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +gopkg.in/jcmturner/aescts.v1 v1.0.1/go.mod h1:nsR8qBOg+OucoIW+WMhB3GspUQXq9XorLnQb9XtvcOo= +gopkg.in/jcmturner/dnsutils.v1 v1.0.1/go.mod h1:m3v+5svpVOhtFAP/wSz+yzh4Mc0Fg7eRhxkJMWSIz9Q= +gopkg.in/jcmturner/gokrb5.v7 v7.2.3/go.mod h1:l8VISx+WGYp+Fp7KRbsiUuXTTOnxIc3Tuvyavf11/WM= +gopkg.in/jcmturner/rpc.v1 v1.1.0/go.mod h1:YIdkC4XfD6GXbzje11McwsDuOlZQSb9W4vfLvuNnlv8= diff --git a/kafka.go b/kafka.go new file mode 100644 index 0000000..0a2f209 --- /dev/null +++ b/kafka.go @@ -0,0 +1,160 @@ +package lg + +import ( + "io" + "os" + "strings" + + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + + "github.com/Shopify/sarama" +) + +// kafkaConfig contains information +type kafkaConfig struct { + Hosts string + Topic string +} + +// write2Kafka is used for sending logs to kafka. +type write2Kafka struct { + config *kafkaConfig + syncProducer sarama.SyncProducer +} + +func (w *write2Kafka) Write(b []byte) (n int, err error) { + if _, _, err = w.syncProducer.SendMessage(&sarama.ProducerMessage{ + Topic: w.config.Topic, + Value: sarama.ByteEncoder(b), + }); err != nil { + return + } + n = len(b) + return +} + +func newLog2Kafka(cfg *kafkaConfig) (*write2Kafka, error) { + config := sarama.NewConfig() + // SyncProducer + config.Producer.RequiredAcks = sarama.WaitForLocal // Only wait for the leader to ack + config.Producer.Compression = sarama.CompressionSnappy // Compress messages + // Producer.Return.Successes must be true to be used in a SyncProducer + config.Producer.Return.Successes = true + + var brokerList []string + for _, broker := range strings.Split(cfg.Hosts, ",") { + if strings.Index(broker, ":") == -1 { + broker += ":9092" + } + brokerList = append(brokerList, broker) + } + + var producer sarama.SyncProducer + var err error + if producer, err = sarama.NewSyncProducer(brokerList, config); err != nil { + return nil, err + } + + return &write2Kafka{config: cfg, syncProducer: producer}, nil +} + +// Init initializes log by input parameters +// lvl - global log level: Debug(-1), Info(0), Warn(1), Error(2), DPanic(3), Panic(4), Fatal(5) +// timeFormat - custom time format for logger of empty string to use default +func InitOnlyKafka(lvl int, project, kafkaTopic, brokers string) (err error) { + onceInit.Do(func() { + globalLevel := zapcore.Level(lvl) + + KafkaPriority := zap.LevelEnablerFunc(func(lvl zapcore.Level) bool { + return lvl >= globalLevel + }) + var ws io.Writer + if ws, err = newLog2Kafka(&kafkaConfig{Hosts: brokers, Topic: kafkaTopic}); err != nil { + return + } + + // Configure console output. + cfg := zap.NewProductionEncoderConfig() + if len(TimeFormat) > 0 { + cfg.TimeKey = "tsp" + cfg.EncodeTime = customTimeEncoder + } + + // Optimize the Kafka output for machine consumption and the console output + // for human operators. + core := zapcore.NewTee(zapcore.NewCore(zapcore.NewJSONEncoder(cfg), zapcore.Lock(zapcore.AddSync(ws)), KafkaPriority)) + + // ErrorLevel 堆栈跟踪 + stackTrace := zap.AddStacktrace(zap.ErrorLevel) + // 设置初始化字段 + fields := zap.Fields(zap.Object("info", &commonInfo{project, getHostName()})) + + // From a zapcore.Core, it's easy to construct a Logger. + APPLog = zap.New(core, fields, stackTrace) + }) + return err +} + +// Init initializes log by input parameters +// lvl - global log level: Debug(-1), Info(0), Warn(1), Error(2), DPanic(3), Panic(4), Fatal(5) +// timeFormat - custom time format for logger of empty string to use default +func InitWithKafka(lvl int, project, kafkaTopic, brokers string) (err error) { + onceInit.Do(func() { + // First, define our level-handling logic. + globalLevel := zapcore.Level(lvl) + // High-priority output should also go to standard error, and low-priority + // output should also go to standard out. + // It is usefull for Kubernetes deployment. + // Kubernetes interprets os.Stdout log items as INFO and os.Stderr log items + // as ERROR by default. + highPriority := zap.LevelEnablerFunc(func(lvl zapcore.Level) bool { + return lvl >= zapcore.ErrorLevel + }) + lowPriority := zap.LevelEnablerFunc(func(lvl zapcore.Level) bool { + return lvl >= globalLevel && lvl < zapcore.ErrorLevel + }) + + KafkaPriority := zap.LevelEnablerFunc(func(lvl zapcore.Level) bool { + return lvl >= globalLevel + }) + // Assume that we have clients for two Kafka topics. The clients implement + // zapcore.WriteSyncer and are safe for concurrent use. (If they only + // implement io.Writer, we can use zapcore.AddSync to add a no-op Sync + // method. If they're not safe for concurrent use, we can add a protecting + // mutex with zapcore.Lock.) + var ws io.Writer + if ws, err = newLog2Kafka(&kafkaConfig{Hosts: brokers, Topic: kafkaTopic}); err != nil { + return + } + + // Configure console output. + cfg := zap.NewProductionEncoderConfig() + if len(TimeFormat) > 0 { + cfg.TimeKey = "tsp" + cfg.EncodeTime = customTimeEncoder + } + + // Optimize the Kafka output for machine consumption and the console output + // for human operators. + kafkaEncoder := zapcore.NewJSONEncoder(cfg) + consoleEncoder := zapcore.NewJSONEncoder(cfg) + // Join the outputs, encoders, and level-handling functions into + // zapcore.Cores, then tee the four cores together. + core := zapcore.NewTee( + zapcore.NewCore(kafkaEncoder, zapcore.Lock(zapcore.AddSync(ws)), KafkaPriority), + // 同时写一份到标准输出 + zapcore.NewCore(consoleEncoder, zapcore.Lock(os.Stdout), lowPriority), + zapcore.NewCore(consoleEncoder, zapcore.Lock(os.Stderr), highPriority), + ) + + // ErrorLevel 堆栈跟踪 + stackTrace := zap.AddStacktrace(zap.ErrorLevel) + // 设置初始化字段 + fields := zap.Fields(zap.Object("info", &commonInfo{project, getHostName()})) + + // From a zapcore.Core, it's easy to construct a Logger. + APPLog = zap.New(core, fields, stackTrace) + }) + return err +}