diff --git a/cmd/devtools/run.go b/cmd/devtools/run.go index 5530b121..7f32734e 100644 --- a/cmd/devtools/run.go +++ b/cmd/devtools/run.go @@ -1,10 +1,10 @@ package devtools import ( + "context" "os" "path/filepath" - "github.com/astria/astria-cli-go/cmd" "github.com/astria/astria-cli-go/internal/processrunner" "github.com/astria/astria-cli-go/internal/ui" log "github.com/sirupsen/logrus" @@ -16,17 +16,19 @@ var runCmd = &cobra.Command{ Use: "run", Short: "Run all the Astria services locally.", Long: `Run all the Astria services locally. This will start the sequencer, cometbft, composer, and conductor.`, - Run: func(cmd *cobra.Command, args []string) { - runall() - }, + Run: runall, } func init() { devCmd.AddCommand(runCmd) } -func runall() { - ctx := cmd.RootCmd.Context() +func runall(cmd *cobra.Command, args []string) { + ctx := cmd.Context() + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + log.WithField("ctx", ctx).Info("Running all services") homePath, err := os.UserHomeDir() if err != nil { @@ -77,41 +79,28 @@ func runall() { } condRunner := processrunner.NewProcessRunner(ctx, conductorOpts) - // cleanup function to stop processes if there is an error starting another process - // FIXME - this isn't good enough. need to use context to stop all processes. - cleanup := func() { - seqRunner.Stop() - cometRunner.Stop() - compRunner.Stop() - condRunner.Stop() - } - // shouldStart acts as a control channel to start this first process shouldStart := make(chan bool) close(shouldStart) err = seqRunner.Start(shouldStart) if err != nil { log.WithError(err).Error("Error running sequencer") - cleanup() - panic(err) + cancel() } err = cometRunner.Start(seqRunner.GetDidStart()) if err != nil { log.WithError(err).Error("Error running cometbft") - cleanup() - panic(err) + cancel() } err = compRunner.Start(cometRunner.GetDidStart()) if err != nil { log.WithError(err).Error("Error running composer") - cleanup() - panic(err) + cancel() } err = condRunner.Start(compRunner.GetDidStart()) if err != nil { log.WithError(err).Error("Error running conductor") - cleanup() - panic(err) + cancel() } runners := []processrunner.ProcessRunner{seqRunner, cometRunner, compRunner, condRunner} diff --git a/cmd/root.go b/cmd/root.go index 3b6ff3d2..57f70616 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -1,8 +1,10 @@ package cmd import ( + "context" "os" + log "github.com/sirupsen/logrus" "github.com/spf13/cobra" ) @@ -15,8 +17,13 @@ var RootCmd = &cobra.Command{ // Execute adds all child commands to the root command and sets flags appropriately. // This is called by main.main(). It only needs to happen once to the RootCmd. func Execute() { - err := RootCmd.Execute() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + err := RootCmd.ExecuteContext(ctx) if err != nil { + log.WithError(err).Error("Error executing root command") + cancel() os.Exit(1) } } diff --git a/internal/processrunner/processrunner.go b/internal/processrunner/processrunner.go index 5d35bc63..cbb515be 100644 --- a/internal/processrunner/processrunner.go +++ b/internal/processrunner/processrunner.go @@ -1,23 +1,26 @@ package processrunner import ( - "bufio" + "bytes" "context" "fmt" "io" "os/exec" "sync" "syscall" + + log "github.com/sirupsen/logrus" ) // ProcessRunner is an interface that represents a process to be run. type ProcessRunner interface { Start(depStarted <-chan bool) error - Wait() error Stop() GetDidStart() <-chan bool GetTitle() string - GetScanner() *bufio.Scanner + GetOutput() string + SetExitStatusString(status string) + GetExitStatusString() string } // ProcessRunner is a struct that represents a process to be run. @@ -27,12 +30,15 @@ type processRunner struct { // Title is the title of the process title string - didStart chan bool - stdout io.ReadCloser - stderr io.ReadCloser - ctx context.Context - cancel context.CancelFunc - scanner *bufio.Scanner + exitStatusString string + exitStatusStringLock *sync.Mutex + + didStart chan bool + stdout io.ReadCloser + stderr io.ReadCloser + ctx context.Context + cancel context.CancelFunc + outputBuf *bytes.Buffer } type NewProcessRunnerOpts struct { @@ -47,14 +53,16 @@ type NewProcessRunnerOpts struct { func NewProcessRunner(ctx context.Context, opts NewProcessRunnerOpts) ProcessRunner { ctx, cancel := context.WithCancel(ctx) - cmd := exec.Command(opts.BinPath, opts.Args...) + cmd := exec.CommandContext(ctx, opts.BinPath, opts.Args...) cmd.Env = opts.Env return &processRunner{ - cmd: cmd, - title: opts.Title, - didStart: make(chan bool), - ctx: ctx, - cancel: cancel, + cmd: cmd, + title: opts.Title, + didStart: make(chan bool), + outputBuf: new(bytes.Buffer), + ctx: ctx, + cancel: cancel, + exitStatusStringLock: &sync.Mutex{}, } } @@ -62,68 +70,75 @@ func NewProcessRunner(ctx context.Context, opts NewProcessRunnerOpts) ProcessRun // It takes a channel that's closed when the dependency starts. // This allows us to control the order of process startup. func (pr *processRunner) Start(depStarted <-chan bool) error { - var wg sync.WaitGroup - wg.Add(1) + select { + case <-depStarted: + // continue if the dependency has started. + case <-pr.ctx.Done(): + log.Info("Context cancelled before starting process", pr.title) + return pr.ctx.Err() + } - go func() { - defer wg.Done() - select { - // wait for the dependency to start - case <-depStarted: - case <-pr.ctx.Done(): - fmt.Println("context cancelled before starting process", pr.title) - return - } + // get stdout and stderr + stdout, err := pr.cmd.StdoutPipe() + if err != nil { + log.WithError(err).Errorf("Error obtaining stdout for process %s", pr.title) + return err + } + pr.stdout = stdout - stdout, err := pr.cmd.StdoutPipe() - if err != nil { - fmt.Println("error obtaining stdout:", err) - return - } - pr.stdout = stdout + stderr, err := pr.cmd.StderrPipe() + if err != nil { + log.WithError(err).Errorf("Error obtaining stderr for process %s", pr.title) + return err + } + pr.stderr = stderr - stderr, err := pr.cmd.StderrPipe() - if err != nil { - fmt.Println("error obtaining stderr:", err) - return - } - pr.stderr = stderr + // multiwriter to write both stdout and stderr to the same buffer + mw := io.MultiWriter(pr.outputBuf) + go io.Copy(mw, stdout) + go io.Copy(mw, stderr) - err = pr.cmd.Start() - if err != nil { - fmt.Println("error starting process:", err) - return - } + // actually start the process + if err := pr.cmd.Start(); err != nil { + log.WithError(err).Errorf("error starting process %s", pr.title) + return err + } - pr.scanner = bufio.NewScanner(pr.stdout) + // signal that this process has started. + close(pr.didStart) - // signal that this process started - close(pr.didStart) + // asynchronously monitor process + go func() { + err = pr.cmd.Wait() + if err != nil { + err = fmt.Errorf("process exited with error: %w", err) + log.Error(err) + pr.SetExitStatusString(err.Error()) + } else { + s := fmt.Sprintf("process exited cleanly") + log.Infof(s) + pr.SetExitStatusString(s) + } }() - wg.Wait() - - if pr.ctx.Err() != nil { - // the context was cancelled, return the context's error - return pr.ctx.Err() - } - return nil } -// Wait waits for the process to finish. -func (pr *processRunner) Wait() error { - return pr.cmd.Wait() +func (pr *processRunner) SetExitStatusString(status string) { + if status != "" { + pr.exitStatusStringLock.Lock() + defer pr.exitStatusStringLock.Unlock() + pr.exitStatusString = status + } + pr.cancel() } // Stop stops the process. func (pr *processRunner) Stop() { // send SIGINT to the process if err := pr.cmd.Process.Signal(syscall.SIGINT); err != nil { - fmt.Println("Error sending SIGINT:", err) + log.WithError(err).Errorf("Error sending SIGINT for process %s", pr.title) } - // this will terminate the process if it's running - pr.cancel() } // GetDidStart returns a channel that's closed when the process starts. @@ -136,7 +151,14 @@ func (pr *processRunner) GetTitle() string { return pr.title } -// GetScanner returns a scanner for the process's stdout. -func (pr *processRunner) GetScanner() *bufio.Scanner { - return pr.scanner +// GetExitStatusString returns the exit status string of the process. +func (pr *processRunner) GetExitStatusString() string { + pr.exitStatusStringLock.Lock() + defer pr.exitStatusStringLock.Unlock() + return pr.exitStatusString +} + +// GetOutput returns the combined stdout and stderr output of the process. +func (pr *processRunner) GetOutput() string { + return pr.outputBuf.String() } diff --git a/internal/testutils/mocks.go b/internal/testutils/mocks.go index 778c8f89..a5b272e3 100644 --- a/internal/testutils/mocks.go +++ b/internal/testutils/mocks.go @@ -1,8 +1,6 @@ package testutils import ( - "bufio" - "github.com/stretchr/testify/mock" ) @@ -14,14 +12,13 @@ func (m *MockProcessRunner) Start(depStarted <-chan bool) error { args := m.Called(depStarted) return args.Error(0) } -func (m *MockProcessRunner) Wait() error { - args := m.Called() - return args.Error(0) -} func (m *MockProcessRunner) Stop() { } +func (m *MockProcessRunner) SetExitStatusString(status string) { +} + func (m *MockProcessRunner) GetDidStart() <-chan bool { args := m.Called() return args.Get(0).(<-chan bool) @@ -32,7 +29,12 @@ func (m *MockProcessRunner) GetTitle() string { return args.Get(0).(string) } -func (m *MockProcessRunner) GetScanner() *bufio.Scanner { +func (m *MockProcessRunner) GetExitStatusString() string { args := m.Called() - return args.Get(0).(*bufio.Scanner) + return args.Get(0).(string) +} + +func (m *MockProcessRunner) GetOutput() string { + args := m.Called() + return args.Get(0).(string) } diff --git a/internal/ui/app.go b/internal/ui/app.go index 0db71177..4456823d 100644 --- a/internal/ui/app.go +++ b/internal/ui/app.go @@ -65,7 +65,7 @@ func (a *App) Start() { // run the tview application if err := a.Application.Run(); err != nil { fmt.Println("error running tview application:", err) - panic(err) + a.Exit() } } diff --git a/internal/ui/processpane.go b/internal/ui/processpane.go index f8c0e14e..d0949eea 100644 --- a/internal/ui/processpane.go +++ b/internal/ui/processpane.go @@ -1,12 +1,13 @@ package ui import ( - "fmt" "io" + "time" "github.com/astria/astria-cli-go/internal/processrunner" "github.com/gdamore/tcell/v2" "github.com/rivo/tview" + log "github.com/sirupsen/logrus" ) // ProcessPane is a struct containing a tview.TextView and processrunner.ProcessRunner @@ -45,28 +46,38 @@ func NewProcessPane(tApp *tview.Application, pr processrunner.ProcessRunner) *Pr // StartScan starts scanning the stdout of the process and writes to the textView func (pp *ProcessPane) StartScan() { - // scan stdout and write using ansiWriter go func() { - stdoutScanner := pp.pr.GetScanner() - for stdoutScanner.Scan() { - line := stdoutScanner.Text() - pp.tApp.QueueUpdateDraw(func() { - _, err := pp.ansiWriter.Write([]byte(line + "\n")) - pp.lineCount++ + // initialize a ticker for periodic updates + ticker := time.NewTicker(250 * time.Millisecond) // adjust the duration as needed + defer ticker.Stop() + + var lastOutputSize int // tracks the last processed output size + + for range ticker.C { + currentOutput := pp.pr.GetOutput() // get the current full output + currentSize := len(currentOutput) + + if currentSize > lastOutputSize { + // new, unprocessed data. + newOutput := currentOutput[lastOutputSize:] // extract new data since last check + pp.tApp.QueueUpdateDraw(func() { + _, err := pp.ansiWriter.Write([]byte(newOutput)) + if err != nil { + log.WithError(err).Error("Error writing to textView") + } + }) + lastOutputSize = currentSize + } + + // check for exit status string and break out of infinite loop if found + if pp.pr.GetExitStatusString() != "" { + _, err := pp.ansiWriter.Write([]byte(pp.pr.GetExitStatusString() + "\n")) if err != nil { - fmt.Println("error writing to textView:", err) - panic(err) + log.WithError(err).Error("Error writing status to textView") } - }) - } - if err := stdoutScanner.Err(); err != nil { - fmt.Println("error reading stdout:", err) - panic(err) - } - // FIXME - do i need to wait?? - if err := pp.pr.Wait(); err != nil { - fmt.Println("error waiting for process:", err) - panic(err) + // break out of for loop when the exit status changes + break + } } }() } diff --git a/justfile b/justfile index 79d99540..e2fbc282 100644 --- a/justfile +++ b/justfile @@ -41,6 +41,9 @@ defaultargs := '' run args=defaultargs: go run main.go {{args}} > tview_log.txt 2>&1 +run-race args=defaultargs: + go run -race main.go {{args}} > tview_log.txt 2>&1 + # show any running Astria processes [no-exit-message] @pscheck: