Skip to content

Commit

Permalink
Merge pull request #188 from slingdata-io/v1.1.8
Browse files Browse the repository at this point in the history
V1.1.8
  • Loading branch information
flarco authored Feb 26, 2024
2 parents b5c8956 + cdf510c commit 3756bc6
Show file tree
Hide file tree
Showing 60 changed files with 9,553 additions and 2,850 deletions.
5 changes: 5 additions & 0 deletions .github/workflows/build-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ jobs:
- name: Run Go Tests
run: |
export DEBUG=''
# Oracle env
export LD_LIBRARY_PATH=$ORACLE_HOME/lib:$LD_LIBRARY_PATH
export PATH="$PATH:$ORACLE_HOME/bin"
bash scripts/prep.gomod.sh
bash scripts/test.sh
Expand Down
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,5 @@ demo/sling_commands_demo.workflow
demo/sling.demo.2023.10.screenstudio
./sling
core/dbio/filesys/test/dataset1M.csv
core/dbio/filesys/test/dataset100k.csv
core/dbio/filesys/test/dataset100k.csv
cmd/sling/tests/suite/
18 changes: 11 additions & 7 deletions cmd/sling/sling_cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
var slingFolder embed.FS
var examples = ``
var ctx = g.NewContext(context.Background())
var telemetryMap = g.M("begin_time", time.Now().UnixMicro(), "run_mode", "task")
var telemetryMap = g.M("begin_time", time.Now().UnixMicro(), "run_mode", "cli")
var telemetry = true
var interrupted = false
var machineID = ""
Expand Down Expand Up @@ -222,6 +222,11 @@ var cliConns = &g.CliSC{
Type: "string",
Description: "discover streams in a specific schema (for database connections)",
},
{
Name: "stream",
Type: "string",
Description: "discover columns in a specific stream",
},
{
Name: "recursive",
ShortName: "",
Expand Down Expand Up @@ -320,7 +325,7 @@ func Track(event string, props ...map[string]interface{}) {
"application", "sling-cli",
"version", core.Version,
"package", getSlingPackage(),
"os", runtime.GOOS,
"os", runtime.GOOS+"/"+runtime.GOARCH,
"emit_time", time.Now().UnixMicro(),
"user_id", machineID,
)
Expand Down Expand Up @@ -380,6 +385,7 @@ func main() {
}()

exit := func() {
time.Sleep(50 * time.Millisecond) // so logger can flush
os.Exit(exitCode)
}

Expand Down Expand Up @@ -453,7 +459,9 @@ func cliInit() int {

// set transaction
taskMap, _ := g.UnmarshalMap(cast.ToString(telemetryMap["task"]))
evt.Transaction = cast.ToString(taskMap["type"])
sourceType := lo.Ternary(taskMap["source_type"] == nil, "unknown", cast.ToString(taskMap["source_type"]))
targetType := lo.Ternary(taskMap["target_type"] == nil, "unknown", cast.ToString(taskMap["target_type"]))
evt.Transaction = g.F("%s (%s => %s)", taskMap["type"], sourceType, targetType)

E, ok := err.(*g.ErrType)
if ok {
Expand All @@ -476,10 +484,6 @@ func cliInit() int {

sentry.ConfigureScope(func(scope *sentry.Scope) {
scope.SetUser(sentry.User{ID: machineID})

sourceType := lo.Ternary(taskMap["source_type"] == nil, "unknown", cast.ToString(taskMap["source_type"]))
targetType := lo.Ternary(taskMap["target_type"] == nil, "unknown", cast.ToString(taskMap["target_type"]))

scope.SetTag("source_type", sourceType)
scope.SetTag("target_type", targetType)
scope.SetTag("mode", cast.ToString(taskMap["mode"]))
Expand Down
68 changes: 53 additions & 15 deletions cmd/sling/sling_logic.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"gopkg.in/yaml.v2"

"github.com/slingdata-io/sling-cli/core/dbio/connection"
"github.com/slingdata-io/sling-cli/core/dbio/database"
"github.com/slingdata-io/sling-cli/core/env"
"github.com/slingdata-io/sling-cli/core/sling"
"github.com/slingdata-io/sling-cli/core/store"
Expand Down Expand Up @@ -40,7 +41,7 @@ func processRun(c *g.CliSC) (ok bool, err error) {
ok = true
cfg := &sling.Config{}
replicationCfgPath := ""
cfgStr := ""
taskCfgStr := ""
showExamples := false
selectStreams := []string{}
iterate := 1
Expand All @@ -63,9 +64,11 @@ func processRun(c *g.CliSC) (ok bool, err error) {
for k, v := range c.Vals {
switch k {
case "replication":
telemetryMap["run_mode"] = "replication"
replicationCfgPath = cast.ToString(v)
case "config":
cfgStr = cast.ToString(v)
telemetryMap["run_mode"] = "task"
taskCfgStr = cast.ToString(v)
case "src-conn":
cfg.Source.Conn = cast.ToString(v)
case "src-stream", "src-table", "src-sql", "src-file":
Expand Down Expand Up @@ -169,6 +172,10 @@ func processRun(c *g.CliSC) (ok bool, err error) {
return ok, nil
}

if replicationCfgPath != "" && taskCfgStr != "" {
return ok, g.Error("cannot provide replication and task configuration. Choose one.")
}

os.Setenv("SLING_CLI", "TRUE")
os.Setenv("SLING_CLI_ARGS", g.Marshal(os.Args[1:]))

Expand All @@ -185,8 +192,8 @@ func processRun(c *g.CliSC) (ok bool, err error) {
}
} else {
// run task
if cfgStr != "" {
err = cfg.Unmarshal(cfgStr)
if taskCfgStr != "" {
err = cfg.Unmarshal(taskCfgStr)
if err != nil {
return ok, g.Error(err, "could not parse task configuration (see docs @ https://docs.slingdata.io/sling-cli)")
}
Expand Down Expand Up @@ -343,9 +350,13 @@ func runReplication(cfgPath string, selectStreams ...string) (err error) {
streamCnt := lo.Ternary(len(selectStreams) > 0, len(selectStreams), len(replication.Streams))
g.Info("Sling Replication [%d streams] | %s -> %s", streamCnt, replication.Source, replication.Target)

streamsOrdered := replication.StreamsOrdered()
eG := g.ErrorGroup{}
succcess := 0
errors := make([]error, len(streamsOrdered))

counter := 0
for _, name := range replication.StreamsOrdered() {
for i, name := range streamsOrdered {
if interrupted {
break
}
Expand Down Expand Up @@ -395,26 +406,35 @@ func runReplication(cfgPath string, selectStreams ...string) (err error) {
println()

if stream.Disabled {
g.Info("[%d / %d] skipping stream %s since it is disabled", counter, streamCnt, name)
g.Debug("[%d / %d] skipping stream %s since it is disabled", counter, streamCnt, name)
continue
} else {
g.Info("[%d / %d] running stream %s", counter, streamCnt, name)
}

telemetryMap["run_mode"] = "replication"
telemetryMap["replication_md5"] = replication.MD5()
err = runTask(&cfg, &replication)
if err != nil {
err = g.Error(err, "error for stream %s", name)
g.LogError(err)
eG.Capture(err)
errors[i] = g.Error(err, "error for stream %s", name)
eG.Capture(err, streamsOrdered[i])
} else {
succcess++
}
telemetryMap = g.M("begin_time", time.Now().UnixMicro()) // reset map
}

println()
delta := time.Since(startTime)
g.Info("Sling Replication Completed in %s | %s -> %s", g.DurationString(delta), replication.Source, replication.Target)

successStr := env.GreenString(g.F("%d Successes", succcess))
failureStr := g.F("%d Failures", len(eG.Errors))
if len(eG.Errors) > 0 {
failureStr = env.RedString(failureStr)
} else {
failureStr = env.GreenString(failureStr)
}

g.Info("Sling Replication Completed in %s | %s -> %s | %s | %s\n", g.DurationString(delta), replication.Source, replication.Target, successStr, failureStr)

return eG.Err()
}
Expand Down Expand Up @@ -487,20 +507,38 @@ func processConns(c *g.CliSC) (ok bool, err error) {

opt := connection.DiscoverOptions{
Schema: cast.ToString(c.Vals["schema"]),
Stream: cast.ToString(c.Vals["stream"]),
Folder: cast.ToString(c.Vals["folder"]),
Filter: cast.ToString(c.Vals["filter"]),
Recursive: cast.ToBool(c.Vals["recursive"]),
}

var streamNames []string
streamNames, err = ec.Discover(name, opt)
var schemata database.Schemata
streamNames, schemata, err = ec.Discover(name, opt)
if err != nil {
return ok, g.Error(err, "could not discover %s (See https://docs.slingdata.io/sling-cli/environment)", name)
}

g.Info("Found %d streams:", len(streamNames))
for _, sn := range streamNames {
println(g.F(" - %s", sn))
if tables := lo.Values(schemata.Tables()); len(tables) > 0 {
if opt.Stream != "" {
println(tables[0].Columns.PrettyTable())
} else {
header := []string{"ID", "Schema", "Name", "Type", "Columns"}
rows := lo.Map(tables, func(table database.Table, i int) []any {
tableType := lo.Ternary(table.IsView, "view", "table")
if table.Dialect.DBNameUpperCase() {
tableType = strings.ToUpper(tableType)
}
return []any{i + 1, table.Schema, table.Name, tableType, len(table.Columns)}
})
println(g.PrettyTable(header, rows))
}
} else {
g.Info("Found %d streams:", len(streamNames))
for _, sn := range streamNames {
println(g.F(" - %s", sn))
}
}

case "":
Expand Down
Loading

0 comments on commit 3756bc6

Please sign in to comment.