Skip to content

Commit

Permalink
add HTTP import method for DuckDB
Browse files Browse the repository at this point in the history
- Added a new import method using a local HTTP server to improve performance and handle large datasets.
- The HTTP server serves data in CSV format, allowing DuckDB to efficiently import the data using `read_csv`.
- Implemented error handling for server startup and data streaming.
- Improved logging to track import progress and handle potential issues.
- Added support for configuring the CSV import parameters.
  • Loading branch information
flarco committed Dec 25, 2024
1 parent 0ba1b92 commit 0a1c836
Show file tree
Hide file tree
Showing 5 changed files with 140 additions and 13 deletions.
118 changes: 108 additions & 10 deletions core/dbio/database/database_duckdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package database
import (
"context"
"database/sql"
"io"
"net/http"
"os"
"os/exec"
"path"
Expand All @@ -11,6 +13,7 @@ import (

"github.com/flarco/g"
"github.com/flarco/g/net"
"github.com/labstack/echo/v4"
"github.com/samber/lo"
"github.com/slingdata-io/sling-cli/core/dbio"
"github.com/slingdata-io/sling-cli/core/dbio/filesys"
Expand Down Expand Up @@ -186,16 +189,7 @@ func (conn *DuckDbConn) importViaTempCSVs(tableFName string, df *iop.Dataflow) (
return
}

config := iop.DefaultStreamConfig()
config.FileMaxRows = 250000
config.Header = true
config.Delimiter = ","
config.Escape = `"`
config.Quote = `"`
config.NullAs = `\N`
config.DatetimeFormat = conn.Type.GetTemplateValue("variable.timestampz_layout")

_, err = fs.WriteDataflowReady(df, folderPath, fileReadyChn, config)
_, err = fs.WriteDataflowReady(df, folderPath, fileReadyChn, conn.defaultCsvConfig())
if err != nil {
df.Context.CaptureErr(g.Error(err, "Error writing dataflow to disk: "+folderPath))
return
Expand Down Expand Up @@ -242,6 +236,110 @@ func (conn *DuckDbConn) importViaTempCSVs(tableFName string, df *iop.Dataflow) (
return df.Count(), nil
}

func (conn *DuckDbConn) importViaHTTP(tableFName string, df *iop.Dataflow) (count uint64, err error) {

table, err := ParseTableName(tableFName, conn.GetType())
if err != nil {
err = g.Error(err, "could not get table name for import")
return
}

// start local http server
port, err := g.GetPort("localhost:0")
if err != nil {
err = g.Error(err, "could not acquire local port for duck http import")
return 0, err
}
// create reader channel to pass between handler and main flow
readerCh := make(chan io.Reader, 1)

// create http server to serve data
importContext := g.NewContext(conn.context.Ctx)
httpURL := g.F("http://localhost:%d/data", port)
server := echo.New()
{
server.HidePort = true
server.HideBanner = true
// server.Use(middleware.Logger())
server.Add(http.MethodGet, "/data", func(c echo.Context) (err error) {
select {
case reader := <-readerCh:
if reader != nil {
return c.Stream(200, "text/csv", reader)
}
default:
}
return c.NoContent(http.StatusOK)
})

server.Add(http.MethodHead, "/data", func(c echo.Context) error {
c.Response().Header().Set("Content-Type", "text/csv")
return c.NoContent(http.StatusOK)
})

// start server in background, wait for it to start
importContext.Wg.Read.Add()
go func() {
g.Debug("started %s for duckdb direct import", httpURL)
importContext.Wg.Read.Done()
if err := server.Start(g.F("localhost:%d", port)); err != http.ErrServerClosed {
g.Error(err, "duckdb import http server error")
}
}()
}

// wait for local server startup
importContext.Wg.Read.Wait()
defer server.Shutdown(conn.Context().Ctx)

sc := conn.defaultCsvConfig()
df.SetBatchLimit(sc.BatchLimit)
ds := iop.MergeDataflow(df)

for batchR := range ds.NewCsvReaderChnl(sc) {
g.Trace("processing duckdb batch %s", batchR.Batch.ID())
readerCh <- batchR.Reader

columnNames := lo.Map(batchR.Columns.Names(), func(col string, i int) string {
return `"` + col + `"`
})

sqlLines := []string{
g.F(`insert into %s (%s) select * from read_csv('%s', delim=',', header=True, columns=%s, max_line_size=134217728, parallel=false, quote='"', escape='"', nullstr='\N');`, table.FDQN(), strings.Join(columnNames, ", "), httpURL, conn.generateCsvColumns(batchR.Columns)),
}

sql := strings.Join(sqlLines, ";\n")

result, err := conn.duck.ExecContext(conn.Context().Ctx, sql)
if err != nil {
return df.Count(), g.Error(err, "could not insert into %s", tableFName)
}

if err = importContext.Err(); err != nil {
return df.Count(), g.Error(err, "error importing insert into %s", tableFName)
}

if result != nil {
inserted, _ := result.RowsAffected()
g.Trace("inserted %d rows into temp duckdb", inserted)
}
}

return df.Count(), nil
}

func (conn *DuckDbConn) defaultCsvConfig() (config iop.StreamConfig) {
config = iop.DefaultStreamConfig()
config.FileMaxRows = 250000
config.Header = true
config.Delimiter = ","
config.Escape = `"`
config.Quote = `"`
config.NullAs = `\N`
config.DatetimeFormat = conn.Type.GetTemplateValue("variable.timestampz_layout")
return config
}

func (conn *DuckDbConn) generateCsvColumns(columns iop.Columns) (colStr string) {
// {'FlightDate': 'DATE', 'UniqueCarrier': 'VARCHAR', 'OriginCityName': 'VARCHAR', 'DestCityName': 'VARCHAR'}

Expand Down
10 changes: 8 additions & 2 deletions core/dbio/database/database_duckdb_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,16 @@ import (
)

func (conn *DuckDbConn) BulkImportFlow(tableFName string, df *iop.Dataflow) (count uint64, err error) {
if conn.GetProp("copy_method") == "named_pipes" {
switch conn.GetProp("copy_method") {
case "named_pipes":
return conn.importViaNamedPipe(tableFName, df)
case "csv_files":
return conn.importViaTempCSVs(tableFName, df)
case "http_server":
fallthrough
default:
return conn.importViaHTTP(tableFName, df)
}
return conn.importViaTempCSVs(tableFName, df)
}

func (conn *DuckDbConn) importViaNamedPipe(tableFName string, df *iop.Dataflow) (count uint64, err error) {
Expand Down
9 changes: 8 additions & 1 deletion core/dbio/database/database_duckdb_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,12 @@ import (
)

func (conn *DuckDbConn) BulkImportFlow(tableFName string, df *iop.Dataflow) (count uint64, err error) {
return conn.importViaTempCSVs(tableFName, df)
switch conn.GetProp("copy_method") {
case "csv_files":
return conn.importViaTempCSVs(tableFName, df)
case "http_server":
fallthrough
default:
return conn.importViaHTTP(tableFName, df)
}
}
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ require (
github.com/kardianos/osext v0.0.0-20190222173326-2bc1f35cddc0
github.com/klauspost/compress v1.17.9
github.com/kshedden/datareader v0.0.0-20210325133423-816b6ffdd011
github.com/labstack/echo/v4 v4.10.2
github.com/lib/pq v1.10.9
github.com/linkedin/goavro/v2 v2.12.0
github.com/mattn/go-sqlite3 v1.14.22
Expand Down Expand Up @@ -148,6 +149,7 @@ require (
github.com/goccy/go-json v0.10.3 // indirect
github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang-jwt/jwt v3.2.2+incompatible // indirect
github.com/golang-jwt/jwt/v4 v4.5.0 // indirect
github.com/golang-jwt/jwt/v5 v5.2.1 // indirect
github.com/golang-sql/civil v0.0.0-20220223132316-b832511892a9 // indirect
Expand Down Expand Up @@ -184,6 +186,7 @@ require (
github.com/kr/fs v0.1.0 // indirect
github.com/kylelemons/godebug v1.1.0 // indirect
github.com/labstack/echo/v5 v5.0.0-20230722203903-ec5b858dab61 // indirect
github.com/labstack/gommon v0.4.0 // indirect
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect
github.com/matoous/go-nanoid/v2 v2.1.0 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
Expand Down
13 changes: 13 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,8 @@ github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2/go.mod h1:bBOAhwG1umN6
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/golang-jwt/jwt v3.2.2+incompatible h1:IfV12K8xAKAnZqdXVzCZ+TOjboZ2keLg81eXfW3O+oY=
github.com/golang-jwt/jwt v3.2.2+incompatible/go.mod h1:8pz2t5EyA70fFQQSrl6XZXzqecmYZeUEB8OUGHkxJ+I=
github.com/golang-jwt/jwt/v4 v4.5.0 h1:7cYmW1XlMY7h7ii7UhUyChSgS5wUJEnm9uZVTGqOWzg=
github.com/golang-jwt/jwt/v4 v4.5.0/go.mod h1:m21LjoU+eqJr34lmDMbreY2eSTRJ1cv77w39/MY0Ch0=
github.com/golang-jwt/jwt/v5 v5.2.1 h1:OuVbFODueb089Lh128TAcimifWaLhJwVflnrgM17wHk=
Expand Down Expand Up @@ -419,8 +421,12 @@ github.com/kshedden/datareader v0.0.0-20210325133423-816b6ffdd011 h1:PNO6bcxsCMs
github.com/kshedden/datareader v0.0.0-20210325133423-816b6ffdd011/go.mod h1:oTL9FJaM6f+gPQyrBN/Dd274KKAEkHw9ATjZ+7GD86U=
github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc=
github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw=
github.com/labstack/echo/v4 v4.10.2 h1:n1jAhnq/elIFTHr1EYpiYtyKgx4RW9ccVgkqByZaN2M=
github.com/labstack/echo/v4 v4.10.2/go.mod h1:OEyqf2//K1DFdE57vw2DRgWY0M7s65IVQO2FzvI4J5k=
github.com/labstack/echo/v5 v5.0.0-20230722203903-ec5b858dab61 h1:FwuzbVh87iLiUQj1+uQUsuw9x5t9m5n5g7rG7o4svW4=
github.com/labstack/echo/v5 v5.0.0-20230722203903-ec5b858dab61/go.mod h1:paQfF1YtHe+GrGg5fOgjsjoCX/UKDr9bc1DoWpZfns8=
github.com/labstack/gommon v0.4.0 h1:y7cvthEAEbU0yHOf4axH8ZG2NH8knB9iNSoTO8dyIk8=
github.com/labstack/gommon v0.4.0/go.mod h1:uW6kP17uPlLJsD3ijUYn3/M5bAxtlZhMI6m3MFxTMTM=
github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
Expand All @@ -435,11 +441,13 @@ github.com/matoous/go-nanoid/v2 v2.1.0/go.mod h1:KlbGNQ+FhrUNIHUxZdL63t7tl4LaPkZ
github.com/mattn/go-colorable v0.1.2/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE=
github.com/mattn/go-colorable v0.1.4/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE=
github.com/mattn/go-colorable v0.1.7/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc=
github.com/mattn/go-colorable v0.1.11/go.mod h1:u5H1YNBxpqRaxsYJYSkiCWKzEfiAb1Gb520KVy5xxl4=
github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA=
github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg=
github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s=
github.com/mattn/go-isatty v0.0.10/go.mod h1:qgIWMr58cqv1PHHyhnkY9lrL7etaEgOFcMEpPG5Rm84=
github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU=
github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94=
github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
Expand Down Expand Up @@ -660,6 +668,7 @@ github.com/trinodb/trino-go-client v0.318.0 h1:Vsyru4AkX2yyWg0MqnYt1WMYA+w5RKA+o
github.com/trinodb/trino-go-client v0.318.0/go.mod h1:F+7TZRD0+0M8XqYsgXT8+EJT1pSlbxTECVD1BDzCc70=
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
github.com/valyala/fasttemplate v1.2.1/go.mod h1:KHLXt3tVN2HBp8eijSv/kGJopbvo7S+qRAEEKiv+SiQ=
github.com/valyala/fasttemplate v1.2.2 h1:lxLXG0uE3Qnshl9QyaK6XJxMXlQZELvChBOCmQD0Loo=
github.com/valyala/fasttemplate v1.2.2/go.mod h1:KHLXt3tVN2HBp8eijSv/kGJopbvo7S+qRAEEKiv+SiQ=
github.com/viant/assertly v0.4.8/go.mod h1:aGifi++jvCrUaklKEKT0BU95igDNaqkvz+49uaYMPRU=
Expand Down Expand Up @@ -814,7 +823,10 @@ golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20201204225414-ed752295db88/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210819135213-f52c844e1c1c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211103235746-7861aae1554b/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
Expand Down Expand Up @@ -950,6 +962,7 @@ gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
Expand Down

0 comments on commit 0a1c836

Please sign in to comment.