diff --git a/extractors/sqldump_step.go b/extractors/sqldump_step.go index f246e0a..8557fe1 100644 --- a/extractors/sqldump_step.go +++ b/extractors/sqldump_step.go @@ -87,7 +87,8 @@ func (ml *SQLDumpStep) Start(task task.RuntimeTask) (chan map[string]interface{} case *sqlparser.Insert: //fmt.Printf("Inserting into: %s\n", stmt.Table.Name) - tableName := stmt.Table.Name.CompliantName() + t, _ := stmt.Table.TableName() + tableName := t.Name.CompliantName() if _, ok := tables[tableName]; ok || len(tables) == 0 { cols := tableColumns[tableName] diff --git a/transform/graph_build.go b/transform/graph_build.go index a01c8c8..14eceeb 100644 --- a/transform/graph_build.go +++ b/transform/graph_build.go @@ -28,7 +28,10 @@ type graphBuildProcess struct { sch graph.GraphSchema class string - edgeFix evaluate.Processor + edgeFix evaluate.Processor + objectCount int + vertexCount int + edgeCount int } func (ts GraphBuildStep) Init(task task.RuntimeTask) (Processor, error) { @@ -60,7 +63,7 @@ func (ts GraphBuildStep) Init(task task.RuntimeTask) (Processor, error) { edgeFix = c } } - return &graphBuildProcess{ts, task, sc, ts.Title, edgeFix}, nil + return &graphBuildProcess{ts, task, sc, ts.Title, edgeFix, 0, 0, 0}, nil } func (ts GraphBuildStep) GetConfigFields() []config.Variable { @@ -77,15 +80,22 @@ func (ts *graphBuildProcess) PoolReady() bool { return true } -func (ts *graphBuildProcess) Close() {} +func (ts *graphBuildProcess) Close() { + logger.Info("Graph Emit", + "objects", ts.objectCount, + "edges", ts.edgeCount, + "vertices", ts.vertexCount, + "class", ts.class) +} func (ts *graphBuildProcess) Process(i map[string]interface{}) []map[string]interface{} { out := []map[string]any{} - if o, err := ts.sch.Generate(ts.class, i, ts.config.Clean, map[string]any{}); err == nil { + ts.objectCount++ for _, j := range o { if j.Vertex != nil { + ts.vertexCount++ err := ts.task.Emit("vertex", ts.vertexToMap(j.Vertex), false) if err != nil { logger.Error("Emit Error: %s", err) @@ -101,6 +111,7 @@ func (ts *graphBuildProcess) Process(i map[string]interface{}) []map[string]inte edgeData = o } } + ts.edgeCount++ err := ts.task.Emit("edge", edgeData, false) if err != nil { logger.Error("Emit Error: %s", err)