Skip to content

Commit

Permalink
handle connection errors in replication
Browse files Browse the repository at this point in the history
- Improved error handling during replication to prevent cascading failures.
- Added `FailErr` field to `ReplicationConfig` to store the error encountered when a connection issue occurs.
- Modified `replicationRun` to set `FailErr` when a connection error is detected, stopping further execution.
  • Loading branch information
flarco committed Dec 17, 2024
1 parent 28c02f6 commit 27c75cf
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 3 deletions.
11 changes: 8 additions & 3 deletions cmd/sling/sling_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,15 +357,17 @@ func runTask(cfg *sling.Config, replication *sling.ReplicationConfig) (err error

if cast.ToBool(cfg.Env["SLING_DRY_RUN"]) || cast.ToBool(os.Getenv("SLING_DRY_RUN")) {
return nil
} else if replication.FailErr != "" {
task.Status = sling.ExecStatusError
task.Err = g.Error(replication.FailErr)
}

// set log sink
env.LogSink = func(ll *g.LogLine) {
task.AppendOutput(ll)
}

sling.StoreSet(task) // set into store
defer sling.StoreSet(task) // set into store after
sling.StoreSet(task) // set into store

if task.Err != nil {
err = g.Error(task.Err)
Expand All @@ -375,6 +377,9 @@ func runTask(cfg *sling.Config, replication *sling.ReplicationConfig) (err error
// set context
task.Context = ctx

// set into store after
defer sling.StoreSet(task)

// run task
setTM()
err = task.Execute()
Expand Down Expand Up @@ -484,7 +489,7 @@ func replicationRun(cfgPath string, cfgOverwrite *sling.Config, selectStreams ..

// if a connection issue, stop
if e, ok := err.(*g.ErrType); ok && strings.Contains(e.Debug(), "Could not connect to ") {
break
replication.FailErr = g.ErrMsg(e)
}
} else {
successes++
Expand Down
1 change: 1 addition & 0 deletions core/sling/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type ReplicationConfig struct {
// Tasks are compiled tasks
Tasks []*Config `json:"tasks"`
Compiled bool `json:"compiled"`
FailErr string // error string to fail all (e.g. when the first tasks fails to connect)

streamsOrdered []string
originalCfg string
Expand Down

0 comments on commit 27c75cf

Please sign in to comment.