Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: Wait for Sequencer gRPC server to start before starting other components #65

Merged
merged 11 commits into from
Apr 29, 2024
100 changes: 76 additions & 24 deletions cmd/devtools/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@ package devtools

import (
"fmt"
"net/http"
"os"
"path/filepath"
"strings"
"time"

"github.com/astria/astria-cli-go/cmd"
"github.com/astria/astria-cli-go/internal/processrunner"
Expand Down Expand Up @@ -80,39 +83,51 @@ func runCmdHandler(c *cobra.Command, args []string) {
log.Debugf("Using binaries from %s", binDir)

// sequencer
seqRCOpts := processrunner.ReadyCheckerOpts{
CallBackName: "Sequencer gRPC server is OK",
Callback: gRPCServerIsOK(envPath),
RetryCount: 10,
RetryInterval: 100 * time.Millisecond,
HaltIfFailed: false,
}
seqReadinessCheck := processrunner.NewReadyChecker(seqRCOpts)
seqOpts := processrunner.NewProcessRunnerOpts{
Title: "Sequencer",
BinPath: sequencerPath,
EnvPath: envPath,
Args: nil,
Title: "Sequencer",
BinPath: sequencerPath,
EnvPath: envPath,
Args: nil,
ReadyCheck: &seqReadinessCheck,
}
seqRunner := processrunner.NewProcessRunner(ctx, seqOpts)

// cometbft
cometDataPath := filepath.Join(dataDir, ".cometbft")
cometOpts := processrunner.NewProcessRunnerOpts{
Title: "Comet BFT",
BinPath: cometbftPath,
EnvPath: envPath,
Args: []string{"node", "--home", cometDataPath},
Title: "Comet BFT",
BinPath: cometbftPath,
EnvPath: envPath,
Args: []string{"node", "--home", cometDataPath},
ReadyCheck: nil,
}
cometRunner := processrunner.NewProcessRunner(ctx, cometOpts)

// composer
composerOpts := processrunner.NewProcessRunnerOpts{
Title: "Composer",
BinPath: composerPath,
EnvPath: envPath,
Args: nil,
Title: "Composer",
BinPath: composerPath,
EnvPath: envPath,
Args: nil,
ReadyCheck: nil,
}
compRunner := processrunner.NewProcessRunner(ctx, composerOpts)

// conductor
conductorOpts := processrunner.NewProcessRunnerOpts{
Title: "Conductor",
BinPath: conductorPath,
EnvPath: envPath,
Args: nil,
Title: "Conductor",
BinPath: conductorPath,
EnvPath: envPath,
Args: nil,
ReadyCheck: nil,
}
condRunner := processrunner.NewProcessRunner(ctx, conductorOpts)

Expand Down Expand Up @@ -150,19 +165,21 @@ func runCmdHandler(c *cobra.Command, args []string) {

// composer
composerOpts := processrunner.NewProcessRunnerOpts{
Title: "Composer",
BinPath: composerPath,
EnvPath: envPath,
Args: nil,
Title: "Composer",
BinPath: composerPath,
EnvPath: envPath,
Args: nil,
ReadyCheck: nil,
}
compRunner := processrunner.NewProcessRunner(ctx, composerOpts)

// conductor
conductorOpts := processrunner.NewProcessRunnerOpts{
Title: "Conductor",
BinPath: conductorPath,
EnvPath: envPath,
Args: nil,
Title: "Conductor",
BinPath: conductorPath,
EnvPath: envPath,
Args: nil,
ReadyCheck: nil,
}
condRunner := processrunner.NewProcessRunner(ctx, conductorOpts)

Expand Down Expand Up @@ -221,3 +238,38 @@ func getFlagPathOrPanic(c *cobra.Command, flagName string, defaultValue string)
return defaultValue
}
}

// gRPCServerIsOK builds an anonymous function for use in a ProcessRunner
// ReadyChecker callback. The anonymous function checks if the gRPC server that
// is started by the sequencer is OK by making an HTTP request to the health
// endpoint. Being able to connect to the gRPC server is a requirement for both
// the Conductor and Composer services.
func gRPCServerIsOK(envPath string) func() bool {
sambukowski marked this conversation as resolved.
Show resolved Hide resolved
return func() bool {
// Get the sequencer gRPC address from the environment
seqEnv := processrunner.GetEnvironment(envPath)
var seqGRPCAddr string
for _, envVar := range seqEnv {
if strings.HasPrefix(envVar, "ASTRIA_SEQUENCER_GRPC_ADDR") {
seqGRPCAddr = strings.Split(envVar, "=")[1]
break
}
}

// Make the HTTP request
resp, err := http.Get("http://" + seqGRPCAddr + "/health")
if err != nil {
log.WithError(err).Debug("Startup callback check to sequencer gRPC /health did not succeed")
return false
}
defer resp.Body.Close()

// Check status code
if resp.StatusCode == 200 {
log.Debug("Sequencer gRPC server started")
return true
}

return false
}
}
2 changes: 1 addition & 1 deletion cmd/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,5 +53,5 @@ func CreateUILog(destDir string) {
DisableColors: true, // Disable ANSI color codes
FullTimestamp: true,
})
log.Info("New log file created successfully:", logPath)
log.Debug("New log file created successfully:", logPath)
}
59 changes: 41 additions & 18 deletions internal/processrunner/processrunner.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type ProcessRunner interface {
GetTitle() string
GetOutputAndClearBuf() string
GetInfo() string
GetEnvironment() []string
}

// ProcessRunner is a struct that represents a process to be run.
Expand All @@ -39,13 +40,16 @@ type processRunner struct {

didStart chan bool
outputBuf *safebuffer.SafeBuffer

readyChecker *ReadyChecker
}

type NewProcessRunnerOpts struct {
Title string
BinPath string
EnvPath string
Args []string
Title string
BinPath string
EnvPath string
Args []string
ReadyCheck *ReadyChecker
}

// NewProcessRunner creates a new ProcessRunner.
Expand All @@ -63,18 +67,20 @@ func NewProcessRunner(ctx context.Context, opts NewProcessRunnerOpts) ProcessRun
cmd := exec.CommandContext(ctx, opts.BinPath, opts.Args...)
cmd.Env = env
return &processRunner{
ctx: ctx,
cmd: cmd,
title: opts.Title,
didStart: make(chan bool),
outputBuf: &safebuffer.SafeBuffer{},
opts: opts,
env: env,
ctx: ctx,
cmd: cmd,
title: opts.Title,
didStart: make(chan bool),
outputBuf: &safebuffer.SafeBuffer{},
opts: opts,
env: env,
readyChecker: opts.ReadyCheck,
}
}

// Restart stops the process and starts it again.
func (pr *processRunner) Restart() error {
log.Debug(fmt.Sprintf("Stopping process %s", pr.title))
pr.Stop()

// NOTE - you have to recreate the exec.Cmd. you can't just call cmd.Start() again.
Expand Down Expand Up @@ -110,6 +116,8 @@ func (pr *processRunner) Restart() error {
// It takes a channel that's closed when the dependency starts.
// This allows us to control the order of process startup.
func (pr *processRunner) Start(ctx context.Context, depStarted <-chan bool) error {
log.Debug(fmt.Sprintf("Starting process %s", pr.title))

select {
case <-depStarted:
// continue if the dependency has started.
Expand Down Expand Up @@ -146,27 +154,37 @@ func (pr *processRunner) Start(ctx context.Context, depStarted <-chan bool) erro

// actually start the process
if err := pr.cmd.Start(); err != nil {
log.WithError(err).Errorf("error starting process %s", pr.title)
log.WithError(err).Errorf("Error starting process %s", pr.title)
return err
}

// run the readiness check if present
if pr.readyChecker != nil {
err := pr.readyChecker.waitUntilReady()
if err != nil {
log.WithError(err).Errorf("Error when running readiness check for %s", pr.title)
}
}

// signal that this process has started.
close(pr.didStart)

// asynchronously monitor process
go func() {
err = pr.cmd.Wait()
if err != nil {
err = fmt.Errorf("[white:red][astria-go] %s process exited with error: %w[-:-]", pr.title, err)
log.Error(err)
_, err := pr.outputBuf.WriteString(err.Error())
logErr := fmt.Errorf("%s process exited with error: %w", pr.title, err)
outputErr := fmt.Errorf("[white:red][astria-go] %s[-:-]", logErr)
log.Error(logErr)
_, err := pr.outputBuf.WriteString(outputErr.Error())
if err != nil {
return
}
} else {
s := fmt.Sprintf("[black:white][astria-go] %s process exited cleanly[-:-]", pr.title)
log.Infof(s)
_, err := pr.outputBuf.WriteString(s)
exitStatusMessage := fmt.Sprintf("%s process exited cleanly", pr.title)
outputStatusMessage := fmt.Sprintf("[black:white][astria-go] %s[-:-]", exitStatusMessage)
log.Infof(exitStatusMessage)
_, err := pr.outputBuf.WriteString(outputStatusMessage)
if err != nil {
return
}
Expand Down Expand Up @@ -218,3 +236,8 @@ func (pr *processRunner) GetInfo() string {
output += fmt.Sprintf("%-*s", maxLen+1, environmentPathTitle) + pr.opts.EnvPath + "\n"
return output
}

// GetEnvironment returns the environment variables for the process.
func (pr *processRunner) GetEnvironment() []string {
return pr.env
}
71 changes: 71 additions & 0 deletions internal/processrunner/readychecker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package processrunner

import (
"fmt"
"time"

log "github.com/sirupsen/logrus"
)

// ReadyChecker is a struct used within the ProcessRunner to check if the
// process being run has completed all its startup steps.
type ReadyChecker struct {
callBackName string
callback func() bool
retryCount int
retryInterval time.Duration
haltIfFailed bool
}

// ReadyCheckerOpts is a struct used to pass options into NewReadyChecker.
type ReadyCheckerOpts struct {
// CallBackName is the name of the callback function and is used for logging purposes.
CallBackName string
// Callback is the anonymous function that will be called to check if all
// startup requirements for the process have been completed. The function
// should return true if all startup checks are complete, and false if any
// startup checks have not completed.
Callback func() bool
RetryCount int
RetryInterval time.Duration
// HaltIfFailed is a flag that determines if the process should halt the app or
sambukowski marked this conversation as resolved.
Show resolved Hide resolved
// continue if all retries of the callback complete without success.
HaltIfFailed bool
}

// NewReadyChecker creates a new ReadyChecker.
func NewReadyChecker(opts ReadyCheckerOpts) ReadyChecker {
return ReadyChecker{
callBackName: opts.CallBackName,
callback: opts.Callback,
retryCount: opts.RetryCount,
retryInterval: opts.RetryInterval,
haltIfFailed: opts.HaltIfFailed,
}
}

// WaitUntilReady calls the ReadyChecker.callback function N number of times,
sambukowski marked this conversation as resolved.
Show resolved Hide resolved
// waiting M amount of time between retries, where N = ReadyChecker.retryCount
// and M = ReadyChecker.retryInterval.
// If the callback returns true, the function returns nil.
// If ReadyChecker.haltIfFailed is false, the function will return nil after all
// retries have been completed without success.
// If ReadyChecker.haltIfFailed is true, the function will panic if the callback
// does not succeed after all retries.
func (r *ReadyChecker) waitUntilReady() error {
for i := 0; i < r.retryCount-1; i++ {
complete := r.callback()
if complete {
log.Debug(fmt.Sprintf("ReadyChecker callback to '%s' completed successfully.", r.callBackName))
return nil
}
log.Debug(fmt.Sprintf("ReadyChecker callback to '%s': attempt %d, failed to complete. Retrying...", r.callBackName, i+1))
time.Sleep(r.retryInterval)
}
complete := r.callback()
if !complete && r.haltIfFailed {
err := fmt.Errorf("ReadyChecker callback to '%s' failed to complete after %d retries. Halting.", r.callBackName, r.retryCount)
panic(err)
}
return nil
}
Loading