Skip to content

Commit

Permalink
support local duckdb computation for remote filesystems
Browse files Browse the repository at this point in the history
- Added support for copying remote files to a local temporary directory before processing with DuckDB.
- Improved error handling during remote file copy process.
- Added concurrency control for efficient parallel file copying.
- Implemented a function to find the deepest common parent path for better file organization during copying.
- Enhanced file copy process to handle directories recursively.
- Modified `GetDataflowViaDuckDB` to handle remote files by copying them locally.
- Updated `FileStreamConfig` to include a `GetProp` and `SetProp` methods for managing properties.
- Added a `working_dir` property in `DuckDb` to specify the working directory for DuckDB processes.
- Adjusted DuckDB query generation to handle paths relative to the working directory.
- Added support for setting properties in `FileStreamConfig` and retrieving them in `DuckDB`.
- Implemented a mechanism to automatically clean up temporary local files after processing.
- Improved logging to provide more detailed information during file copy and processing.
- Modified `GetDataflowViaDuckDB` to use relative paths when `working_dir` is set.
- Changed the way incremental keys are handled in `MakeScanQuery`, excluding the reserved column `_sling_loaded_at`.

handle errors during remote file copy and duckdb processing

- Improved error handling for file copying, providing more specific error messages.
- Added error checking and handling throughout the remote file copy process to improve robustness.
- Modified the error handling to provide more context and details for better debugging.
- Improved error messages and logging to pinpoint issues during DuckDB processing.

♻️ refactor(dbio): optimize duckdb file processing and improve code structure

- Refactored code to enhance readability and maintainability.
- Optimized the code for better performance and efficiency in file handling.
- Improved code structure and organization to improve clarity and understanding.
- Updated function signatures and variable names to enhance code readability.
- Simplified the logic for processing files with DuckDB.
- Restructured the code to follow a more consistent style.
  • Loading branch information
flarco committed Jan 13, 2025
1 parent 9cef79c commit 25c6e43
Show file tree
Hide file tree
Showing 3 changed files with 145 additions and 3 deletions.
121 changes: 119 additions & 2 deletions core/dbio/filesys/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -583,7 +583,30 @@ func (fs *BaseFileSysClient) ReadDataflow(url string, cfg ...iop.FileStreamConfi
}

if g.In(Cfg.Format, dbio.FileTypeParquet) && Cfg.ComputeWithDuckDB() {
df, err = GetDataflowViaDuckDB(fs.Self(), url, nodes, Cfg)
if g.In(fs.FsType(), dbio.TypeFileLocal, dbio.TypeFileS3, dbio.TypeFileAzure) {
// duckdb read natively
df, err = GetDataflowViaDuckDB(fs.Self(), url, nodes, Cfg)
} else {
localRoot := path.Join(env.GetTempFolder(), g.NewTsID("duck.temp"))

// copy to local first
_, localNodes, err := CopyFromRemoteNodes(fs.Self(), url, nodes, localRoot)
if err != nil {
return nil, g.Error(err, "could not copy files locally to use duckdb compute")
}

localFs, _ := NewFileSysClientContext(fs.context.Ctx, dbio.TypeFileLocal, g.MapToKVArr(fs.properties)...)
Cfg.SetProp("working_dir", localRoot) // for duckdb working dir
df, err = GetDataflowViaDuckDB(localFs, url, localNodes, Cfg)
if err != nil {
err = g.Error(err, "error getting dataflow")
return df, err
}

df.Defer(func() { env.RemoveAllLocalTempFile(localRoot) })

return df, err
}
} else {
df, err = GetDataflow(fs.Self(), nodes, Cfg)
}
Expand Down Expand Up @@ -1105,7 +1128,7 @@ func GetDataflowViaDuckDB(fs FileSysClient, uri string, nodes FileNodes, cfg iop
uris := strings.Join(nodes.Files().URIs(), iop.DuckDbURISeparator)

// no reader needed for iceberg, delta, duckdb will handle it
cfg.Props = map[string]string{"fs_props": g.Marshal(fs.Props())}
cfg.SetProp("fs_props", g.Marshal(fs.Props()))
switch cfg.Format {
case dbio.FileTypeParquet:
err = ds.ConsumeParquetReaderDuckDb(uris, cfg)
Expand Down Expand Up @@ -1501,6 +1524,100 @@ func CopyFromLocalRecursive(fs FileSysClient, localPath string, remotePath strin
return totalBytes, nil
}

// CopyFromRemoteNodes copies the nodes to local path maintaining the same folder structure
func CopyFromRemoteNodes(fs FileSysClient, url string, nodes FileNodes, localRoot string) (totalBytes int64, localNodes FileNodes, err error) {
if len(nodes) == 0 {
return 0, nil, g.Error("No files provided in nodes")
}

// Create context for managing concurrency
concurrency := cast.ToInt(fs.GetProp("concurrency"))
if concurrency == 0 {
concurrency = 10
}
copyContext := g.NewContext(fs.Context().Ctx, concurrency)

// Find the deepest common parent path
commonParent := GetDeepestParent(url)

// Create base local directory if it doesn't exist
err = os.MkdirAll(localRoot, 0755)
if err != nil {
return 0, nil, g.Error(err, "Error creating local directory: "+localRoot)
}

// Copy function for each file
copyFile := func(node FileNode) {
defer copyContext.Wg.Read.Done()

if node.IsDir {
return
}

// Get relative path from the common parent
relPath := strings.TrimPrefix(node.Path(), commonParent)
relPath = strings.TrimPrefix(relPath, "/")

// Construct local file path
localFilePath := filepath.Join(localRoot, relPath)

// Create parent directory if it doesn't exist
err = os.MkdirAll(filepath.Dir(localFilePath), 0755)
if err != nil {
copyContext.CaptureErr(g.Error(err, "Error creating local directory: "+filepath.Dir(localFilePath)))
return
}

// Get reader from remote file
reader, err := fs.GetReader(node.URI)
if err != nil {
copyContext.CaptureErr(g.Error(err, "Error getting reader for: "+node.URI))
return
}

// Create local file
file, err := os.Create(localFilePath)
if err != nil {
copyContext.CaptureErr(g.Error(err, "Error creating local file: "+localFilePath))
return
}
defer file.Close()

// Copy the content
written, err := io.Copy(file, reader)
if err != nil {
copyContext.CaptureErr(g.Error(err, "Error copying file content for: "+node.URI))
return
}

atomic.AddInt64(&totalBytes, written)

// Add to localNodes with updated local path
localNode := node
localNode.Size = cast.ToUint64(written)
localNode.URI = "file://" + localFilePath
copyContext.Mux.Lock()
localNodes = append(localNodes, localNode)
copyContext.Mux.Unlock()

g.Trace("copied %s to %s [%d bytes]", node.URI, localFilePath, written)
}

// Process each file concurrently
g.Debug("copying %d files from remote to %s for local processing", len(nodes), localRoot)
for _, node := range nodes {
copyContext.Wg.Read.Add()
go copyFile(node)
}

copyContext.Wg.Read.Wait()
if err = copyContext.Err(); err != nil {
return totalBytes, localNodes, g.Error(err, "Error during remote nodes copy")
}

return totalBytes, localNodes, nil
}

// CopyFromRemoteRecursive copies a remote file or directory recursively to a local filesystem
func CopyFromRemoteRecursive(fs FileSysClient, remotePath string, localPath string) (totalBytes int64, err error) {
// Check if source exists by listing files
Expand Down
14 changes: 14 additions & 0 deletions core/dbio/iop/datastream.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,20 @@ func (sc *FileStreamConfig) ShouldUseDuckDB() bool {
return g.In(sc.Format, dbio.FileTypeIceberg, dbio.FileTypeDelta) || sc.SQL != ""
}

func (sc *FileStreamConfig) GetProp(key string) string {
if sc.Props == nil {
sc.Props = map[string]string{}
}
return sc.Props[key]
}

func (sc *FileStreamConfig) SetProp(key, val string) {
if sc.Props == nil {
sc.Props = map[string]string{}
}
sc.Props[key] = val
}

type KeyValue struct {
Key string `json:"key"`
Value any `json:"value"`
Expand Down
13 changes: 12 additions & 1 deletion core/dbio/iop/duckdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,10 @@ func (duck *DuckDb) Open(timeOut ...int) (err error) {
args = append(args, "-readonly")
}

if workingDir := duck.GetProp("working_dir"); workingDir != "" {
duck.Proc.WorkDir = workingDir
}

if instance := duck.GetProp("instance"); instance != "" {
args = append(args, instance)
}
Expand Down Expand Up @@ -984,11 +988,18 @@ func (duck *DuckDb) MakeScanQuery(format dbio.FileType, uri string, fsc FileStre
incrementalWhereCond := "1=1"

uris := strings.Split(uri, DuckDbURISeparator)
workDir := duck.GetProp("working_dir")
for i, val := range uris {
if workDir != "" {
val = strings.TrimPrefix(strings.TrimPrefix(val, workDir), "/")
}
uris[i] = g.F("'%s'", val) // add quotes
}

if fsc.IncrementalKey != "" && fsc.IncrementalValue != "" {
// reserved word to use for timestamp comparison (when listing)
const slingLoadedAtColumn = "_sling_loaded_at"
if fsc.IncrementalKey != "" && fsc.IncrementalKey != slingLoadedAtColumn &&
fsc.IncrementalValue != "" {
incrementalWhereCond = g.F("%s > %s", dbio.TypeDbDuckDb.Quote(fsc.IncrementalKey), fsc.IncrementalValue)
where = g.F("where %s", incrementalWhereCond)
}
Expand Down

0 comments on commit 25c6e43

Please sign in to comment.