diff --git a/analytic/worker/worker.go b/analytic/worker/worker.go index 41cb020..3ea54a0 100644 --- a/analytic/worker/worker.go +++ b/analytic/worker/worker.go @@ -41,7 +41,7 @@ func (w *analyticWorker) OnSuccess(f func(*sarama.ConsumerMessage)) { } func (w *analyticWorker) successReadMessage(message *sarama.ConsumerMessage) { - log.Infof("\nTopic: %s, Partition: %d, Offset: %d, Key: %s, MessageVal: %s,\n", + log.Debugf("\nTopic: %s, Partition: %d, Offset: %d, Key: %s, MessageVal: %s,\n", message.Topic, message.Partition, message.Offset, message.Key, message.Value) if w.onSuccessFunc != nil { w.onSuccessFunc(message) @@ -88,13 +88,12 @@ func (w *analyticWorker) consumeMessage() { func (w *analyticWorker) onNewMessage(message *sarama.ConsumerMessage) { messageVal := make(map[string]interface{}) _ = json.Unmarshal(message.Value, &messageVal) - roundedTime := roundTime(messageVal["@timestamp"]) data := buffer.IncomingLog{ Level: fmt.Sprint(messageVal["lvl"]), Method: fmt.Sprint(messageVal["method"]), Path: fmt.Sprint(messageVal["path"]), Code: fmt.Sprint(messageVal["code"]), - Timestamp: fmt.Sprint(roundedTime), + Timestamp: roundTime(messageVal["@timestamp"]).UTC().Format(time.RFC3339), } buffer.GetBuffer().Add(w.subscribedTopic, data) } diff --git a/buffer/log_buffer.go b/buffer/log_buffer.go index f6722a4..f5c1857 100644 --- a/buffer/log_buffer.go +++ b/buffer/log_buffer.go @@ -76,11 +76,11 @@ func (s *buffer) Add(topic string, log IncomingLog) { } func (s *buffer) Flush() { - log.Debug("Flushing data to database") + log.Info("Flushing data to database") toBeFlushed := s.buff s.buff = make(map[string]map[IncomingLog]int) for k, v := range toBeFlushed { - log.Println("Flushing data", k, v) + log.Debug("Flushing data", k, v) col := s.db.GetCollection(k) for kk, vv := range v { sl := createStoreLog(kk, vv)