Skip to content

Commit

Permalink
handle binary errors and shutdowns more elegantly
Browse files Browse the repository at this point in the history
  • Loading branch information
steezeburger committed Apr 2, 2024
1 parent 786e51b commit 13c33b2
Show file tree
Hide file tree
Showing 7 changed files with 149 additions and 115 deletions.
35 changes: 12 additions & 23 deletions cmd/devtools/run.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package devtools

import (
"context"
"os"
"path/filepath"

"github.com/astria/astria-cli-go/cmd"
"github.com/astria/astria-cli-go/internal/processrunner"
"github.com/astria/astria-cli-go/internal/ui"
log "github.com/sirupsen/logrus"
Expand All @@ -16,17 +16,19 @@ var runCmd = &cobra.Command{
Use: "run",
Short: "Run all the Astria services locally.",
Long: `Run all the Astria services locally. This will start the sequencer, cometbft, composer, and conductor.`,
Run: func(cmd *cobra.Command, args []string) {
runall()
},
Run: runall,
}

func init() {
devCmd.AddCommand(runCmd)
}

func runall() {
ctx := cmd.RootCmd.Context()
func runall(cmd *cobra.Command, args []string) {
ctx := cmd.Context()
ctx, cancel := context.WithCancel(ctx)
defer cancel()

log.WithField("ctx", ctx).Info("Running all services")

homePath, err := os.UserHomeDir()
if err != nil {
Expand Down Expand Up @@ -77,41 +79,28 @@ func runall() {
}
condRunner := processrunner.NewProcessRunner(ctx, conductorOpts)

// cleanup function to stop processes if there is an error starting another process
// FIXME - this isn't good enough. need to use context to stop all processes.
cleanup := func() {
seqRunner.Stop()
cometRunner.Stop()
compRunner.Stop()
condRunner.Stop()
}

// shouldStart acts as a control channel to start this first process
shouldStart := make(chan bool)
close(shouldStart)
err = seqRunner.Start(shouldStart)
if err != nil {
log.WithError(err).Error("Error running sequencer")
cleanup()
panic(err)
cancel()
}
err = cometRunner.Start(seqRunner.GetDidStart())
if err != nil {
log.WithError(err).Error("Error running cometbft")
cleanup()
panic(err)
cancel()
}
err = compRunner.Start(cometRunner.GetDidStart())
if err != nil {
log.WithError(err).Error("Error running composer")
cleanup()
panic(err)
cancel()
}
err = condRunner.Start(compRunner.GetDidStart())
if err != nil {
log.WithError(err).Error("Error running conductor")
cleanup()
panic(err)
cancel()
}

runners := []processrunner.ProcessRunner{seqRunner, cometRunner, compRunner, condRunner}
Expand Down
9 changes: 8 additions & 1 deletion cmd/root.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package cmd

import (
"context"
"os"

log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
)

Expand All @@ -15,8 +17,13 @@ var RootCmd = &cobra.Command{
// Execute adds all child commands to the root command and sets flags appropriately.
// This is called by main.main(). It only needs to happen once to the RootCmd.
func Execute() {
err := RootCmd.Execute()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

err := RootCmd.ExecuteContext(ctx)
if err != nil {
log.WithError(err).Error("Error executing root command")
cancel()
os.Exit(1)
}
}
Expand Down
146 changes: 84 additions & 62 deletions internal/processrunner/processrunner.go
Original file line number Diff line number Diff line change
@@ -1,23 +1,26 @@
package processrunner

import (
"bufio"
"bytes"
"context"
"fmt"
"io"
"os/exec"
"sync"
"syscall"

log "github.com/sirupsen/logrus"
)

// ProcessRunner is an interface that represents a process to be run.
type ProcessRunner interface {
Start(depStarted <-chan bool) error
Wait() error
Stop()
GetDidStart() <-chan bool
GetTitle() string
GetScanner() *bufio.Scanner
GetOutput() string
SetExitStatusString(status string)
GetExitStatusString() string
}

// ProcessRunner is a struct that represents a process to be run.
Expand All @@ -27,12 +30,15 @@ type processRunner struct {
// Title is the title of the process
title string

didStart chan bool
stdout io.ReadCloser
stderr io.ReadCloser
ctx context.Context
cancel context.CancelFunc
scanner *bufio.Scanner
exitStatusString string
exitStatusStringLock *sync.Mutex

didStart chan bool
stdout io.ReadCloser
stderr io.ReadCloser
ctx context.Context
cancel context.CancelFunc
outputBuf *bytes.Buffer
}

type NewProcessRunnerOpts struct {
Expand All @@ -47,83 +53,92 @@ type NewProcessRunnerOpts struct {
func NewProcessRunner(ctx context.Context, opts NewProcessRunnerOpts) ProcessRunner {
ctx, cancel := context.WithCancel(ctx)

cmd := exec.Command(opts.BinPath, opts.Args...)
cmd := exec.CommandContext(ctx, opts.BinPath, opts.Args...)
cmd.Env = opts.Env
return &processRunner{
cmd: cmd,
title: opts.Title,
didStart: make(chan bool),
ctx: ctx,
cancel: cancel,
cmd: cmd,
title: opts.Title,
didStart: make(chan bool),
outputBuf: new(bytes.Buffer),
ctx: ctx,
cancel: cancel,
exitStatusStringLock: &sync.Mutex{},
}
}

// Start starts the process and returns the ProcessRunner and an error.
// It takes a channel that's closed when the dependency starts.
// This allows us to control the order of process startup.
func (pr *processRunner) Start(depStarted <-chan bool) error {
var wg sync.WaitGroup
wg.Add(1)
select {
case <-depStarted:
// continue if the dependency has started.
case <-pr.ctx.Done():
log.Info("Context cancelled before starting process", pr.title)
return pr.ctx.Err()
}

go func() {
defer wg.Done()
select {
// wait for the dependency to start
case <-depStarted:
case <-pr.ctx.Done():
fmt.Println("context cancelled before starting process", pr.title)
return
}
// get stdout and stderr
stdout, err := pr.cmd.StdoutPipe()
if err != nil {
log.WithError(err).Errorf("Error obtaining stdout for process %s", pr.title)
return err
}
pr.stdout = stdout

stdout, err := pr.cmd.StdoutPipe()
if err != nil {
fmt.Println("error obtaining stdout:", err)
return
}
pr.stdout = stdout
stderr, err := pr.cmd.StderrPipe()
if err != nil {
log.WithError(err).Errorf("Error obtaining stderr for process %s", pr.title)
return err
}
pr.stderr = stderr

stderr, err := pr.cmd.StderrPipe()
if err != nil {
fmt.Println("error obtaining stderr:", err)
return
}
pr.stderr = stderr
// multiwriter to write both stdout and stderr to the same buffer
mw := io.MultiWriter(pr.outputBuf)
go io.Copy(mw, stdout)
go io.Copy(mw, stderr)

err = pr.cmd.Start()
if err != nil {
fmt.Println("error starting process:", err)
return
}
// actually start the process
if err := pr.cmd.Start(); err != nil {
log.WithError(err).Errorf("error starting process %s", pr.title)
return err
}

pr.scanner = bufio.NewScanner(pr.stdout)
// signal that this process has started.
close(pr.didStart)

// signal that this process started
close(pr.didStart)
// asynchronously monitor process
go func() {
err = pr.cmd.Wait()
if err != nil {
err = fmt.Errorf("process exited with error: %w", err)
log.Error(err)
pr.SetExitStatusString(err.Error())
} else {
s := fmt.Sprintf("process exited cleanly")
log.Infof(s)
pr.SetExitStatusString(s)
}
}()

wg.Wait()

if pr.ctx.Err() != nil {
// the context was cancelled, return the context's error
return pr.ctx.Err()
}

return nil
}

// Wait waits for the process to finish.
func (pr *processRunner) Wait() error {
return pr.cmd.Wait()
func (pr *processRunner) SetExitStatusString(status string) {
if status != "" {
pr.exitStatusStringLock.Lock()
defer pr.exitStatusStringLock.Unlock()
pr.exitStatusString = status
}
pr.cancel()
}

// Stop stops the process.
func (pr *processRunner) Stop() {
// send SIGINT to the process
if err := pr.cmd.Process.Signal(syscall.SIGINT); err != nil {
fmt.Println("Error sending SIGINT:", err)
log.WithError(err).Errorf("Error sending SIGINT for process %s", pr.title)
}
// this will terminate the process if it's running
pr.cancel()
}

// GetDidStart returns a channel that's closed when the process starts.
Expand All @@ -136,7 +151,14 @@ func (pr *processRunner) GetTitle() string {
return pr.title
}

// GetScanner returns a scanner for the process's stdout.
func (pr *processRunner) GetScanner() *bufio.Scanner {
return pr.scanner
// GetExitStatusString returns the exit status string of the process.
func (pr *processRunner) GetExitStatusString() string {
pr.exitStatusStringLock.Lock()
defer pr.exitStatusStringLock.Unlock()
return pr.exitStatusString
}

// GetOutput returns the combined stdout and stderr output of the process.
func (pr *processRunner) GetOutput() string {
return pr.outputBuf.String()
}
18 changes: 10 additions & 8 deletions internal/testutils/mocks.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package testutils

import (
"bufio"

"github.com/stretchr/testify/mock"
)

Expand All @@ -14,14 +12,13 @@ func (m *MockProcessRunner) Start(depStarted <-chan bool) error {
args := m.Called(depStarted)
return args.Error(0)
}
func (m *MockProcessRunner) Wait() error {
args := m.Called()
return args.Error(0)
}

func (m *MockProcessRunner) Stop() {
}

func (m *MockProcessRunner) SetExitStatusString(status string) {
}

func (m *MockProcessRunner) GetDidStart() <-chan bool {
args := m.Called()
return args.Get(0).(<-chan bool)
Expand All @@ -32,7 +29,12 @@ func (m *MockProcessRunner) GetTitle() string {
return args.Get(0).(string)
}

func (m *MockProcessRunner) GetScanner() *bufio.Scanner {
func (m *MockProcessRunner) GetExitStatusString() string {
args := m.Called()
return args.Get(0).(*bufio.Scanner)
return args.Get(0).(string)
}

func (m *MockProcessRunner) GetOutput() string {
args := m.Called()
return args.Get(0).(string)
}
2 changes: 1 addition & 1 deletion internal/ui/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (a *App) Start() {
// run the tview application
if err := a.Application.Run(); err != nil {
fmt.Println("error running tview application:", err)
panic(err)
a.Exit()
}
}

Expand Down
Loading

0 comments on commit 13c33b2

Please sign in to comment.