Skip to content
This repository has been archived by the owner on Aug 9, 2023. It is now read-only.

Commit

Permalink
Add shutdown handling (#47)
Browse files Browse the repository at this point in the history
Currently we just exit the exporter when receiving a signal. This does not wait
for closing down the polling mechanisms etc.

This change introduces a real shutdown sequence that waits for all components to
stop before exiting.
  • Loading branch information
Bjørn authored Dec 8, 2020
1 parent 2ce9d6d commit f37c05d
Show file tree
Hide file tree
Showing 2 changed files with 120 additions and 45 deletions.
135 changes: 98 additions & 37 deletions main.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package main

import (
"errors"
"context"
"fmt"
"net/http"
"os"
Expand Down Expand Up @@ -96,38 +96,68 @@ func main() {
}
})

done := make(chan error, 1)
// context used to stop worker components from signal or component failures
ctx, stop := context.WithCancel(context.Background())
defer stop()

// used to report errors from components
var exitCode int
componentFailed := make(chan error, 1)
var wg sync.WaitGroup

go func() {
log.Infof("Listening on %s", *listenAddress)
err := http.ListenAndServe(*listenAddress, nil)
if err != nil {
done <- err
componentFailed <- fmt.Errorf("http listener stopped: %v", err)
}
}()

// Go routine responsible for starting shutdown sequence based of signals or
// component failures
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
wg.Add(1)
go func() {
sig := <-sigs
log.Infof("Received os signal '%s'. Terminating...", sig)
done <- nil
defer wg.Done()
select {
case sig := <-sigs:
log.Infof("Received os signal '%s'. Terminating...", sig)
case err := <-componentFailed:
if err != nil {
log.Errorf("Component failed: %v", err)
exitCode = 1
}
}
stop()
}()

go runAPIPolling(done, *snykAPIURL, *snykAPIToken, *snykOrganizations, secondDuration(*snykInterval), secondDuration(*requestTimeout))
wg.Add(1)
go func() {
defer wg.Done()
log.Info("Snyk API scraper starting")
defer log.Info("Snyk API scraper stopped")
err := runAPIPolling(ctx, *snykAPIURL, *snykAPIToken, *snykOrganizations, secondDuration(*snykInterval), secondDuration(*requestTimeout))
if err != nil {
componentFailed <- fmt.Errorf("snyk api scraper: %w", err)
}
}()

reason := <-done
if reason != nil {
log.Errorf("Snyk exporter exited due to error: %v", reason)
os.Exit(1)
// wait for all components to stop
wg.Wait()
if exitCode != 0 {
log.Errorf("Snyk exporter exited with exit %d", exitCode)
os.Exit(exitCode)
} else {
log.Infof("Snyk exporter exited with exit 0")
}
log.Infof("Snyk exporter exited with exit 0")
}

func secondDuration(seconds int) time.Duration {
return time.Duration(seconds) * time.Second
}

func runAPIPolling(done chan error, url, token string, organizationIDs []string, requestInterval, requestTimeout time.Duration) {
func runAPIPolling(ctx context.Context, url, token string, organizationIDs []string, requestInterval, requestTimeout time.Duration) error {
client := client{
httpClient: &http.Client{
Timeout: requestTimeout,
Expand All @@ -137,36 +167,59 @@ func runAPIPolling(done chan error, url, token string, organizationIDs []string,
}
organizations, err := getOrganizations(&client, organizationIDs)
if err != nil {
done <- err
return
return err
}
log.Infof("Running Snyk API scraper for organizations: %v", strings.Join(organizationNames(organizations), ", "))

// kick off a poll right away to get metrics available right after startup
pollAPI(ctx, &client, organizations)

ticker := time.NewTicker(requestInterval)
defer ticker.Stop()
for {
var gaugeResults []gaugeResult
for _, organization := range organizations {
log.Infof("Collecting for organization '%s'", organization.Name)
results, err := collect(&client, organization)
if err != nil {
log.With("error", errors.Unwrap(err)).
With("organzationName", organization.Name).
With("organzationId", organization.ID).
Errorf("Collection failed for organization '%s': %v", organization.Name, err)
continue
}
log.Infof("Recorded %d results for organization '%s'", len(results), organization.Name)
gaugeResults = append(gaugeResults, results...)
select {
case <-ctx.Done():
return nil
case <-ticker.C:
pollAPI(ctx, &client, organizations)
}
log.Infof("Exposing %d results as metrics", len(gaugeResults))
scrapeMutex.Lock()
register(gaugeResults)
scrapeMutex.Unlock()
readyMutex.Lock()
ready = true
readyMutex.Unlock()
time.Sleep(requestInterval)
}
}

// pollAPI collects data from provided organizations and registers them in the
// prometheus registry.
func pollAPI(ctx context.Context, client *client, organizations []org) {
var gaugeResults []gaugeResult
for _, organization := range organizations {
log.Infof("Collecting for organization '%s'", organization.Name)
results, err := collect(ctx, client, organization)
if err != nil {
log.With("error", err).
With("organzationName", organization.Name).
With("organzationId", organization.ID).
Errorf("Collection failed for organization '%s': %v", organization.Name, err)
continue
}
log.Infof("Recorded %d results for organization '%s'", len(results), organization.Name)
gaugeResults = append(gaugeResults, results...)
// stop right away in case of the context being cancelled. This ensures that
// we don't wait for a complete collect run for all organizations before
// stopping.
select {
case <-ctx.Done():
return
default:
}
}
log.Infof("Exposing %d results as metrics", len(gaugeResults))
scrapeMutex.Lock()
register(gaugeResults)
scrapeMutex.Unlock()
readyMutex.Lock()
ready = true
readyMutex.Unlock()
}

func organizationNames(orgs []org) []string {
var names []string
for _, org := range orgs {
Expand Down Expand Up @@ -221,7 +274,7 @@ type gaugeResult struct {
results []aggregateResult
}

func collect(client *client, organization org) ([]gaugeResult, error) {
func collect(ctx context.Context, client *client, organization org) ([]gaugeResult, error) {
projects, err := client.getProjects(organization.ID)
if err != nil {
return nil, fmt.Errorf("get projects for organization: %w", err)
Expand All @@ -243,6 +296,14 @@ func collect(client *client, organization org) ([]gaugeResult, error) {
})
duration := time.Since(start)
log.Debugf("Collected data in %v for %s %s", duration, project.ID, project.Name)
// stop right away in case of the context being cancelled. This ensures that
// we don't wait for a complete collect run for all projects before
// stopping.
select {
case <-ctx.Done():
return nil, nil
default:
}
}
return gaugeResults, nil
}
Expand Down
30 changes: 22 additions & 8 deletions main_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package main

import (
"context"
"net/http"
"net/http/httptest"
"reflect"
"sort"
"sync"
"testing"
"time"
)
Expand Down Expand Up @@ -169,16 +171,28 @@ func TestRunAPIPolling_issuesTimeout(t *testing.T) {
time.Sleep(1 * time.Second)
rw.WriteHeader(http.StatusOK)
}))
done := make(chan error, 1)

go runAPIPolling(done, server.URL, "token", nil, 20*time.Millisecond, 1*time.Millisecond)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

select {
case result := <-done:
if result != nil {
t.Errorf("unexpected error result: %v", result)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
err := runAPIPolling(ctx, server.URL, "token", nil, 20*time.Millisecond, 1*time.Millisecond)
if err != nil {
t.Errorf("unexpected error result: %v", err)
}
case <-time.After(100 * time.Millisecond):
// success path if timeout errors are suppressed
}()

// stop the polling again after 100ms
<-time.After(100 * time.Millisecond)
cancel()

// wait for the polling to stop
wg.Wait()

if !ready {
t.Fatalf("Ready not set but it should be")
}
}

0 comments on commit f37c05d

Please sign in to comment.