diff --git a/pkg/runner/runner.go b/pkg/runner/runner.go index 1af3f92..a1ec434 100644 --- a/pkg/runner/runner.go +++ b/pkg/runner/runner.go @@ -321,7 +321,7 @@ timerLoop: } } } - case <-edm.stop: + case <-edm.ctx.Done(): break timerLoop } } @@ -546,6 +546,7 @@ func Run() { logger.Error("unable to init edm", "error", err) os.Exit(1) } + defer edm.stop() viperNotifyCh := make(chan fsnotify.Event) @@ -644,16 +645,6 @@ func Run() { os.Exit(1) } - // Exit gracefully on SIGINT or SIGTERM - go func() { - sigs := make(chan os.Signal, 1) - signal.Notify(sigs, os.Interrupt, syscall.SIGTERM) - <-sigs - - // We received a signal, tell runMinimiser() to stop - close(edm.stop) - }() - metricsServer := &http.Server{ Addr: "127.0.0.1:2112", ReadTimeout: 10 * time.Second, @@ -797,9 +788,10 @@ type dnstapMinimiser struct { newQnameDiscarded prometheus.Counter seenQnameLRUEvicted prometheus.Counter newQnameChannelLen prometheus.Gauge - stop chan struct{} // close this channel to gracefully stop runMinimiser() - debug bool // if we should print debug messages during operation - mutex sync.RWMutex // Mutex for protecting updates to fields at runtime + ctx context.Context + stop context.CancelFunc // call this to gracefully stop runMinimiser() + debug bool // if we should print debug messages during operation + mutex sync.RWMutex // Mutex for protecting updates to fields at runtime sessionWriterCh chan *prevSessions histogramWriterCh chan *wellKnownDomainsData newQnamePublisherCh chan *protocols.EventsMqttMessageNewQnameJson @@ -833,6 +825,9 @@ func newDnstapMinimiser(logger *slog.Logger, cryptopanKey string, cryptopanSalt return nil, fmt.Errorf("newDnstapMinimiser: %w", err) } + // Exit gracefully on SIGINT or SIGTERM + edm.ctx, edm.stop = signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) + // Use separate prometheus registry for each edm instance, otherwise // trying to run tests where each test do their own call to // newDnstapMinimiser() will panic: @@ -882,7 +877,6 @@ func newDnstapMinimiser(logger *slog.Logger, cryptopanKey string, cryptopanSalt }) edm.promReg = promReg - edm.stop = make(chan struct{}) // Size 32 matches unexported "const outputChannelSize = 32" in // https://github.com/dnstap/golang-dnstap/blob/master/dnstap.go edm.inputChannel = make(chan []byte, 32) @@ -1152,7 +1146,7 @@ func (edm *dnstapMinimiser) qnameSeen(msg *dns.Msg, seenQnameLRU *lru.Cache[stri // runMinimiser reads frames from the inputChannel, doing any modifications and // then passes them on to a dnstap.Output. To gracefully stop -// runMinimiser() you need to close the edm.stop channel. +// runMinimiser() you can call edm.stop(). func (edm *dnstapMinimiser) runMinimiser(minimiserID int, wg *sync.WaitGroup, dawgFile string, seenQnameLRU *lru.Cache[string, struct{}], pdb *pebble.DB, newQnameBuffer int, disableSessionFiles bool, debugDnstapFile *os.File, disableHistogramSender bool, labelLimit int, wkdTracker *wellKnownDomainsTracker) { defer wg.Done() @@ -1246,7 +1240,7 @@ minimiserLoop: session := edm.newSession(dt, msg, isQuery, labelLimit, timestamp) edm.sessionCollectorCh <- session } - case <-edm.stop: + case <-edm.ctx.Done(): break minimiserLoop } } @@ -1486,7 +1480,7 @@ timerLoop: } } } - case <-edm.stop: + case <-edm.ctx.Done(): break timerLoop } }