Skip to content

Commit

Permalink
Fix timestamp format
Browse files Browse the repository at this point in the history
  • Loading branch information
kanisiuskenneth committed Aug 24, 2018
1 parent 289b69e commit 192c0fc
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 5 deletions.
5 changes: 2 additions & 3 deletions analytic/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (w *analyticWorker) OnSuccess(f func(*sarama.ConsumerMessage)) {
}

func (w *analyticWorker) successReadMessage(message *sarama.ConsumerMessage) {
log.Infof("\nTopic: %s, Partition: %d, Offset: %d, Key: %s, MessageVal: %s,\n",
log.Debugf("\nTopic: %s, Partition: %d, Offset: %d, Key: %s, MessageVal: %s,\n",
message.Topic, message.Partition, message.Offset, message.Key, message.Value)
if w.onSuccessFunc != nil {
w.onSuccessFunc(message)
Expand Down Expand Up @@ -88,13 +88,12 @@ func (w *analyticWorker) consumeMessage() {
func (w *analyticWorker) onNewMessage(message *sarama.ConsumerMessage) {
messageVal := make(map[string]interface{})
_ = json.Unmarshal(message.Value, &messageVal)
roundedTime := roundTime(messageVal["@timestamp"])
data := buffer.IncomingLog{
Level: fmt.Sprint(messageVal["lvl"]),
Method: fmt.Sprint(messageVal["method"]),
Path: fmt.Sprint(messageVal["path"]),
Code: fmt.Sprint(messageVal["code"]),
Timestamp: fmt.Sprint(roundedTime),
Timestamp: roundTime(messageVal["@timestamp"]).UTC().Format(time.RFC3339),
}
buffer.GetBuffer().Add(w.subscribedTopic, data)
}
Expand Down
4 changes: 2 additions & 2 deletions buffer/log_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,11 @@ func (s *buffer) Add(topic string, log IncomingLog) {
}

func (s *buffer) Flush() {
log.Debug("Flushing data to database")
log.Info("Flushing data to database")
toBeFlushed := s.buff
s.buff = make(map[string]map[IncomingLog]int)
for k, v := range toBeFlushed {
log.Println("Flushing data", k, v)
log.Debug("Flushing data", k, v)
col := s.db.GetCollection(k)
for kk, vv := range v {
sl := createStoreLog(kk, vv)
Expand Down

0 comments on commit 192c0fc

Please sign in to comment.