diff --git a/cmd/data-aggregation-api/main.go b/cmd/data-aggregation-api/main.go index 3075382..f46aaee 100644 --- a/cmd/data-aggregation-api/main.go +++ b/cmd/data-aggregation-api/main.go @@ -40,6 +40,20 @@ var ( builtBy = "unknown" ) +func dispatchSingleRequest(incoming <-chan struct{}) chan struct{} { + outgoing := make(chan struct{}) + + go func() { + defer close(outgoing) + for range incoming { + log.Info().Msg("Received new build request.") + outgoing <- struct{}{} + } + }() + + return outgoing +} + func run() error { ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) defer stop() @@ -67,10 +81,11 @@ func run() error { deviceRepo := device.NewSafeRepository() reports := report.NewRepository() - // TODO: be able to close goroutine when the context is closed (graceful shutdown) - go job.StartBuildLoop(&deviceRepo, &reports) + newBuildRequest := make(chan struct{}) + triggerNewBuild := dispatchSingleRequest(newBuildRequest) - if err := router.NewManager(&deviceRepo, &reports).ListenAndServe(ctx, config.Cfg.API.ListenAddress, config.Cfg.API.ListenPort); err != nil { + go job.StartBuildLoop(&deviceRepo, &reports, triggerNewBuild) + if err := router.NewManager(&deviceRepo, &reports, newBuildRequest).ListenAndServe(ctx, config.Cfg.API.ListenAddress, config.Cfg.API.ListenPort); err != nil { return fmt.Errorf("webserver error: %w", err) } diff --git a/internal/api/router/endpoints.go b/internal/api/router/endpoints.go index 352a18f..91b35cc 100644 --- a/internal/api/router/endpoints.go +++ b/internal/api/router/endpoints.go @@ -130,3 +130,16 @@ func (m *Manager) getLastSuccessfulReport(w http.ResponseWriter, _ *http.Request w.Header().Set(contentType, applicationJSON) _, _ = w.Write(out) } + +// triggerBuild enables the user to trigger a new build. +// +// It only accepts one build request at a time. +func (m *Manager) triggerBuild(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) { + w.Header().Set(contentType, applicationJSON) + select { + case m.newBuildRequest <- struct{}{}: + _, _ = w.Write([]byte("{\"message\": \"new build request received\"")) + default: + _, _ = w.Write([]byte("{\"message\": \"a build request is already pending\"")) + } +} diff --git a/internal/api/router/manager.go b/internal/api/router/manager.go index 1d60791..b921d64 100644 --- a/internal/api/router/manager.go +++ b/internal/api/router/manager.go @@ -27,18 +27,20 @@ type DevicesRepository interface { } type Manager struct { - devices DevicesRepository - reports *report.Repository + devices DevicesRepository + reports *report.Repository + newBuildRequest chan<- struct{} } // NewManager creates and initializes a new API manager. -func NewManager(deviceRepo DevicesRepository, reports *report.Repository) *Manager { - return &Manager{devices: deviceRepo, reports: reports} +func NewManager(deviceRepo DevicesRepository, reports *report.Repository, restartRequest chan<- struct{}) *Manager { + return &Manager{devices: deviceRepo, reports: reports, newBuildRequest: restartRequest} } // ListenAndServe starts to serve Web API requests. func (m *Manager) ListenAndServe(ctx context.Context, address string, port int) error { defer func() { + close(m.newBuildRequest) log.Warn().Msg("Shutdown.") }() @@ -57,6 +59,7 @@ func (m *Manager) ListenAndServe(ctx context.Context, address string, port int) router.GET("/v1/report/last", withAuth.Wrap(m.getLastReport)) router.GET("/v1/report/last/complete", withAuth.Wrap(m.getLastCompleteReport)) router.GET("/v1/report/last/successful", withAuth.Wrap(m.getLastSuccessfulReport)) + router.POST("/v1/build/trigger", withAuth.Wrap(m.triggerBuild)) listenSocket := fmt.Sprint(address, ":", port) log.Info().Msgf("Start webserver - listening on %s", listenSocket) diff --git a/internal/job/job.go b/internal/job/job.go index 46cc279..67ff503 100644 --- a/internal/job/job.go +++ b/internal/job/job.go @@ -144,7 +144,9 @@ func RunBuild(reportCh chan report.Message) (map[string]*device.Device, report.S } // StartBuildLoop starts the build in an infinite loop. -func StartBuildLoop(deviceRepo router.DevicesRepository, reports *report.Repository) { +// +// Closing the triggerNewBuild channel will stop the loop. +func StartBuildLoop(deviceRepo router.DevicesRepository, reports *report.Repository, triggerNewBuild <-chan struct{}) { metricsRegistry := metrics.NewRegistry() for { var wg sync.WaitGroup @@ -189,6 +191,13 @@ func StartBuildLoop(deviceRepo router.DevicesRepository, reports *report.Reposit close(reportCh) wg.Wait() - time.Sleep(config.Cfg.Build.Interval) + select { + case <-time.After(config.Cfg.Build.Interval): + case _, ok := <-triggerNewBuild: + if !ok { + log.Info().Msg("triggerNewBuild channel closed, stopping build loop") + return + } + } } } diff --git a/internal/report/report.go b/internal/report/report.go index 4238503..38367b9 100644 --- a/internal/report/report.go +++ b/internal/report/report.go @@ -43,7 +43,7 @@ func logMessage(msg Message) { // It ends when the channel is closed. // This function is concurrent-safe. func (r *Report) Watch(messageChan <-chan Message) { - log.Info().Msg("Starting report dispatcher") + log.Info().Msg("starting report dispatcher") r.StartTime = time.Now() for msg := range messageChan { logMessage(msg) @@ -52,7 +52,7 @@ func (r *Report) Watch(messageChan <-chan Message) { r.mutex.Unlock() } r.EndTime = time.Now() - log.Info().Msg("Stopping report dispatcher") + log.Info().Msg("stopping report dispatcher") } func (r *Report) ToJSON() ([]byte, error) {