diff --git a/core/dbio/database/database_duckdb.go b/core/dbio/database/database_duckdb.go index a19a8234..da28bdbc 100644 --- a/core/dbio/database/database_duckdb.go +++ b/core/dbio/database/database_duckdb.go @@ -3,6 +3,8 @@ package database import ( "context" "database/sql" + "io" + "net/http" "os" "os/exec" "path" @@ -11,6 +13,7 @@ import ( "github.com/flarco/g" "github.com/flarco/g/net" + "github.com/labstack/echo/v4" "github.com/samber/lo" "github.com/slingdata-io/sling-cli/core/dbio" "github.com/slingdata-io/sling-cli/core/dbio/filesys" @@ -186,16 +189,7 @@ func (conn *DuckDbConn) importViaTempCSVs(tableFName string, df *iop.Dataflow) ( return } - config := iop.DefaultStreamConfig() - config.FileMaxRows = 250000 - config.Header = true - config.Delimiter = "," - config.Escape = `"` - config.Quote = `"` - config.NullAs = `\N` - config.DatetimeFormat = conn.Type.GetTemplateValue("variable.timestampz_layout") - - _, err = fs.WriteDataflowReady(df, folderPath, fileReadyChn, config) + _, err = fs.WriteDataflowReady(df, folderPath, fileReadyChn, conn.defaultCsvConfig()) if err != nil { df.Context.CaptureErr(g.Error(err, "Error writing dataflow to disk: "+folderPath)) return @@ -242,6 +236,110 @@ func (conn *DuckDbConn) importViaTempCSVs(tableFName string, df *iop.Dataflow) ( return df.Count(), nil } +func (conn *DuckDbConn) importViaHTTP(tableFName string, df *iop.Dataflow) (count uint64, err error) { + + table, err := ParseTableName(tableFName, conn.GetType()) + if err != nil { + err = g.Error(err, "could not get table name for import") + return + } + + // start local http server + port, err := g.GetPort("localhost:0") + if err != nil { + err = g.Error(err, "could not acquire local port for duck http import") + return 0, err + } + // create reader channel to pass between handler and main flow + readerCh := make(chan io.Reader, 1) + + // create http server to serve data + importContext := g.NewContext(conn.context.Ctx) + httpURL := g.F("http://localhost:%d/data", port) + server := echo.New() + { + server.HidePort = true + server.HideBanner = true + // server.Use(middleware.Logger()) + server.Add(http.MethodGet, "/data", func(c echo.Context) (err error) { + select { + case reader := <-readerCh: + if reader != nil { + return c.Stream(200, "text/csv", reader) + } + default: + } + return c.NoContent(http.StatusOK) + }) + + server.Add(http.MethodHead, "/data", func(c echo.Context) error { + c.Response().Header().Set("Content-Type", "text/csv") + return c.NoContent(http.StatusOK) + }) + + // start server in background, wait for it to start + importContext.Wg.Read.Add() + go func() { + g.Debug("started %s for duckdb direct import", httpURL) + importContext.Wg.Read.Done() + if err := server.Start(g.F("localhost:%d", port)); err != http.ErrServerClosed { + g.Error(err, "duckdb import http server error") + } + }() + } + + // wait for local server startup + importContext.Wg.Read.Wait() + defer server.Shutdown(conn.Context().Ctx) + + sc := conn.defaultCsvConfig() + df.SetBatchLimit(sc.BatchLimit) + ds := iop.MergeDataflow(df) + + for batchR := range ds.NewCsvReaderChnl(sc) { + g.Trace("processing duckdb batch %s", batchR.Batch.ID()) + readerCh <- batchR.Reader + + columnNames := lo.Map(batchR.Columns.Names(), func(col string, i int) string { + return `"` + col + `"` + }) + + sqlLines := []string{ + g.F(`insert into %s (%s) select * from read_csv('%s', delim=',', header=True, columns=%s, max_line_size=134217728, parallel=false, quote='"', escape='"', nullstr='\N');`, table.FDQN(), strings.Join(columnNames, ", "), httpURL, conn.generateCsvColumns(batchR.Columns)), + } + + sql := strings.Join(sqlLines, ";\n") + + result, err := conn.duck.ExecContext(conn.Context().Ctx, sql) + if err != nil { + return df.Count(), g.Error(err, "could not insert into %s", tableFName) + } + + if err = importContext.Err(); err != nil { + return df.Count(), g.Error(err, "error importing insert into %s", tableFName) + } + + if result != nil { + inserted, _ := result.RowsAffected() + g.Trace("inserted %d rows into temp duckdb", inserted) + } + } + + return df.Count(), nil +} + +func (conn *DuckDbConn) defaultCsvConfig() (config iop.StreamConfig) { + config = iop.DefaultStreamConfig() + config.FileMaxRows = 250000 + config.Header = true + config.Delimiter = "," + config.Escape = `"` + config.Quote = `"` + config.NullAs = `\N` + config.DatetimeFormat = conn.Type.GetTemplateValue("variable.timestampz_layout") + return config +} + func (conn *DuckDbConn) generateCsvColumns(columns iop.Columns) (colStr string) { // {'FlightDate': 'DATE', 'UniqueCarrier': 'VARCHAR', 'OriginCityName': 'VARCHAR', 'DestCityName': 'VARCHAR'} diff --git a/core/dbio/database/database_duckdb_unix.go b/core/dbio/database/database_duckdb_unix.go index df12ee61..335733e5 100644 --- a/core/dbio/database/database_duckdb_unix.go +++ b/core/dbio/database/database_duckdb_unix.go @@ -17,10 +17,16 @@ import ( ) func (conn *DuckDbConn) BulkImportFlow(tableFName string, df *iop.Dataflow) (count uint64, err error) { - if conn.GetProp("copy_method") == "named_pipes" { + switch conn.GetProp("copy_method") { + case "named_pipes": return conn.importViaNamedPipe(tableFName, df) + case "csv_files": + return conn.importViaTempCSVs(tableFName, df) + case "http_server": + fallthrough + default: + return conn.importViaHTTP(tableFName, df) } - return conn.importViaTempCSVs(tableFName, df) } func (conn *DuckDbConn) importViaNamedPipe(tableFName string, df *iop.Dataflow) (count uint64, err error) { diff --git a/core/dbio/database/database_duckdb_windows.go b/core/dbio/database/database_duckdb_windows.go index c13b30b6..db80a8fd 100644 --- a/core/dbio/database/database_duckdb_windows.go +++ b/core/dbio/database/database_duckdb_windows.go @@ -7,5 +7,12 @@ import ( ) func (conn *DuckDbConn) BulkImportFlow(tableFName string, df *iop.Dataflow) (count uint64, err error) { - return conn.importViaTempCSVs(tableFName, df) + switch conn.GetProp("copy_method") { + case "csv_files": + return conn.importViaTempCSVs(tableFName, df) + case "http_server": + fallthrough + default: + return conn.importViaHTTP(tableFName, df) + } } diff --git a/go.mod b/go.mod index a6f26da6..d82ba7c5 100644 --- a/go.mod +++ b/go.mod @@ -36,6 +36,7 @@ require ( github.com/kardianos/osext v0.0.0-20190222173326-2bc1f35cddc0 github.com/klauspost/compress v1.17.9 github.com/kshedden/datareader v0.0.0-20210325133423-816b6ffdd011 + github.com/labstack/echo/v4 v4.10.2 github.com/lib/pq v1.10.9 github.com/linkedin/goavro/v2 v2.12.0 github.com/mattn/go-sqlite3 v1.14.22 @@ -148,6 +149,7 @@ require ( github.com/goccy/go-json v0.10.3 // indirect github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2 // indirect github.com/gogo/protobuf v1.3.2 // indirect + github.com/golang-jwt/jwt v3.2.2+incompatible // indirect github.com/golang-jwt/jwt/v4 v4.5.0 // indirect github.com/golang-jwt/jwt/v5 v5.2.1 // indirect github.com/golang-sql/civil v0.0.0-20220223132316-b832511892a9 // indirect @@ -184,6 +186,7 @@ require ( github.com/kr/fs v0.1.0 // indirect github.com/kylelemons/godebug v1.1.0 // indirect github.com/labstack/echo/v5 v5.0.0-20230722203903-ec5b858dab61 // indirect + github.com/labstack/gommon v0.4.0 // indirect github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect github.com/matoous/go-nanoid/v2 v2.1.0 // indirect github.com/mattn/go-colorable v0.1.13 // indirect diff --git a/go.sum b/go.sum index 0d00451a..0adc3bc1 100644 --- a/go.sum +++ b/go.sum @@ -247,6 +247,8 @@ github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2/go.mod h1:bBOAhwG1umN6 github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= +github.com/golang-jwt/jwt v3.2.2+incompatible h1:IfV12K8xAKAnZqdXVzCZ+TOjboZ2keLg81eXfW3O+oY= +github.com/golang-jwt/jwt v3.2.2+incompatible/go.mod h1:8pz2t5EyA70fFQQSrl6XZXzqecmYZeUEB8OUGHkxJ+I= github.com/golang-jwt/jwt/v4 v4.5.0 h1:7cYmW1XlMY7h7ii7UhUyChSgS5wUJEnm9uZVTGqOWzg= github.com/golang-jwt/jwt/v4 v4.5.0/go.mod h1:m21LjoU+eqJr34lmDMbreY2eSTRJ1cv77w39/MY0Ch0= github.com/golang-jwt/jwt/v5 v5.2.1 h1:OuVbFODueb089Lh128TAcimifWaLhJwVflnrgM17wHk= @@ -419,8 +421,12 @@ github.com/kshedden/datareader v0.0.0-20210325133423-816b6ffdd011 h1:PNO6bcxsCMs github.com/kshedden/datareader v0.0.0-20210325133423-816b6ffdd011/go.mod h1:oTL9FJaM6f+gPQyrBN/Dd274KKAEkHw9ATjZ+7GD86U= github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= +github.com/labstack/echo/v4 v4.10.2 h1:n1jAhnq/elIFTHr1EYpiYtyKgx4RW9ccVgkqByZaN2M= +github.com/labstack/echo/v4 v4.10.2/go.mod h1:OEyqf2//K1DFdE57vw2DRgWY0M7s65IVQO2FzvI4J5k= github.com/labstack/echo/v5 v5.0.0-20230722203903-ec5b858dab61 h1:FwuzbVh87iLiUQj1+uQUsuw9x5t9m5n5g7rG7o4svW4= github.com/labstack/echo/v5 v5.0.0-20230722203903-ec5b858dab61/go.mod h1:paQfF1YtHe+GrGg5fOgjsjoCX/UKDr9bc1DoWpZfns8= +github.com/labstack/gommon v0.4.0 h1:y7cvthEAEbU0yHOf4axH8ZG2NH8knB9iNSoTO8dyIk8= +github.com/labstack/gommon v0.4.0/go.mod h1:uW6kP17uPlLJsD3ijUYn3/M5bAxtlZhMI6m3MFxTMTM= github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= @@ -435,11 +441,13 @@ github.com/matoous/go-nanoid/v2 v2.1.0/go.mod h1:KlbGNQ+FhrUNIHUxZdL63t7tl4LaPkZ github.com/mattn/go-colorable v0.1.2/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE= github.com/mattn/go-colorable v0.1.4/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE= github.com/mattn/go-colorable v0.1.7/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc= +github.com/mattn/go-colorable v0.1.11/go.mod h1:u5H1YNBxpqRaxsYJYSkiCWKzEfiAb1Gb520KVy5xxl4= github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s= github.com/mattn/go-isatty v0.0.10/go.mod h1:qgIWMr58cqv1PHHyhnkY9lrL7etaEgOFcMEpPG5Rm84= github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= +github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94= github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= @@ -660,6 +668,7 @@ github.com/trinodb/trino-go-client v0.318.0 h1:Vsyru4AkX2yyWg0MqnYt1WMYA+w5RKA+o github.com/trinodb/trino-go-client v0.318.0/go.mod h1:F+7TZRD0+0M8XqYsgXT8+EJT1pSlbxTECVD1BDzCc70= github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= +github.com/valyala/fasttemplate v1.2.1/go.mod h1:KHLXt3tVN2HBp8eijSv/kGJopbvo7S+qRAEEKiv+SiQ= github.com/valyala/fasttemplate v1.2.2 h1:lxLXG0uE3Qnshl9QyaK6XJxMXlQZELvChBOCmQD0Loo= github.com/valyala/fasttemplate v1.2.2/go.mod h1:KHLXt3tVN2HBp8eijSv/kGJopbvo7S+qRAEEKiv+SiQ= github.com/viant/assertly v0.4.8/go.mod h1:aGifi++jvCrUaklKEKT0BU95igDNaqkvz+49uaYMPRU= @@ -814,7 +823,10 @@ golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20201204225414-ed752295db88/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210819135213-f52c844e1c1c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20211103235746-7861aae1554b/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= @@ -950,6 +962,7 @@ gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=