diff --git a/pkg/runner/runner.go b/pkg/runner/runner.go index d4d9cbc..f715628 100644 --- a/pkg/runner/runner.go +++ b/pkg/runner/runner.go @@ -280,7 +280,6 @@ func (edm *dnstapMinimiser) reverseLabelsBounded(labels []string, maxLen int) [] func (edm *dnstapMinimiser) diskCleaner(wg *sync.WaitGroup, sentDir string) { // We will scan the directory each tick for sent files to remove. - wg.Add(1) defer wg.Done() ticker := time.NewTicker(time.Second * 60) @@ -682,15 +681,20 @@ func Run() { labelLimit := 10 // Start record writers and data senders in the background + wg.Add(1) go edm.sessionWriter(dataDir, &wg) + wg.Add(1) go edm.histogramWriter(labelLimit, outboxDir, &wg) if !edm.histogramSenderDisabled { + wg.Add(1) go edm.histogramSender(outboxDir, sentDir, &wg) } if !edm.mqttDisabled { + wg.Add(1) go edm.newQnamePublisher(&wg) } + wg.Add(1) go edm.diskCleaner(&wg, sentDir) dawgFile := viper.GetString("well-known-domains") @@ -733,6 +737,7 @@ func Run() { } // Start data collector + wg.Add(1) go edm.dataCollector(&wg, wkdTracker, dawgFile) var minimiserWg sync.WaitGroup @@ -978,7 +983,6 @@ func (wkd *wellKnownDomainsTracker) lookup(ipBytes []byte, msg *dns.Msg) (int, b } func (wkd *wellKnownDomainsTracker) updateRetryer(edm *dnstapMinimiser, wg *sync.WaitGroup) { - wg.Add(1) defer wg.Done() for wu := range wkd.retryCh { @@ -1345,7 +1349,6 @@ func (edm *dnstapMinimiser) newSession(dt *dnstap.Dnstap, msg *dns.Msg, isQuery } func (edm *dnstapMinimiser) sessionWriter(dataDir string, wg *sync.WaitGroup) { - wg.Add(1) defer wg.Done() edm.log.Info("sessionStructWriter: starting") @@ -1361,7 +1364,6 @@ func (edm *dnstapMinimiser) sessionWriter(dataDir string, wg *sync.WaitGroup) { } func (edm *dnstapMinimiser) histogramWriter(labelLimit int, outboxDir string, wg *sync.WaitGroup) { - wg.Add(1) defer wg.Done() edm.log.Info("histogramWriter: starting") @@ -1436,7 +1438,6 @@ func (edm *dnstapMinimiser) createFile(dst string) (*os.File, error) { } func (edm *dnstapMinimiser) histogramSender(outboxDir string, sentDir string, wg *sync.WaitGroup) { - wg.Add(1) defer wg.Done() edm.log.Info("histogramSender: starting") @@ -1509,7 +1510,6 @@ func timestampsFromFilename(name string) (time.Time, time.Time, error) { } func (edm *dnstapMinimiser) newQnamePublisher(wg *sync.WaitGroup) { - wg.Add(1) defer wg.Done() edm.log.Info("newQnamePublisher: starting") @@ -1918,7 +1918,6 @@ func timeUntilNextMinute() time.Duration { // runMinimiser generates data and it is collected into datasets here func (edm *dnstapMinimiser) dataCollector(wg *sync.WaitGroup, wkd *wellKnownDomainsTracker, dawgFile string) { - wg.Add(1) defer wg.Done() // Keep track of if we have recorded any dnstap packets in session data @@ -1927,6 +1926,7 @@ func (edm *dnstapMinimiser) dataCollector(wg *sync.WaitGroup, wkd *wellKnownDoma // Start retryer, handles instances where the received update has a // dawgModTime that is no longer valid becuase it has been rotated. var retryerWg sync.WaitGroup + retryerWg.Add(1) go wkd.updateRetryer(edm, &retryerWg) sessions := []*sessionData{}