Skip to content

Commit

Permalink
handle Elasticsearch errors and improve StreamRowsContext
Browse files Browse the repository at this point in the history
- Updated `StreamRowsContext` function to handle potential errors during Elasticsearch search execution more robustly.
- Improved error handling by checking HTTP status codes and including response body details in error messages.
- Changed function parameter `indexName` to `tableName` for better clarity and consistency.
- Added schema parsing to handle table name resolution correctly.
- Updated Datastream creation to use iop.Columns{} instead of iop.NewColumnsFromFields("data").
  • Loading branch information
flarco committed Jan 9, 2025
1 parent 60b361d commit ba7dcde
Showing 1 changed file with 7 additions and 2 deletions.
9 changes: 7 additions & 2 deletions core/dbio/database/database_elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ func (conn *ElasticsearchConn) BulkExportFlow(table Table) (df *iop.Dataflow, er
return df, nil
}

func (conn *ElasticsearchConn) StreamRowsContext(ctx context.Context, indexName string, Opts ...map[string]interface{}) (ds *iop.Datastream, err error) {
func (conn *ElasticsearchConn) StreamRowsContext(ctx context.Context, tableName string, Opts ...map[string]interface{}) (ds *iop.Datastream, err error) {
opts := getQueryOptions(Opts)
Limit := int64(0) // infinite
if val := cast.ToInt64(opts["limit"]); val > 0 {
Expand Down Expand Up @@ -336,6 +336,8 @@ func (conn *ElasticsearchConn) StreamRowsContext(ctx context.Context, indexName
}

// Create the search request
table, _ := ParseTableName(tableName, conn.Type)
indexName := table.Schema
res, err := conn.Client.Search(
conn.Client.Search.WithContext(ctx),
conn.Client.Search.WithIndex(indexName),
Expand All @@ -344,6 +346,9 @@ func (conn *ElasticsearchConn) StreamRowsContext(ctx context.Context, indexName
)
if err != nil {
return nil, g.Error(err, "could not execute search")
} else if res.StatusCode >= 400 {
bytes, _ := io.ReadAll(res.Body)
return nil, g.Error("could not execute search (status %s) => %s", res.StatusCode, string(bytes))
}

var searchResponse map[string]interface{}
Expand All @@ -360,7 +365,7 @@ func (conn *ElasticsearchConn) StreamRowsContext(ctx context.Context, indexName
}

// Create base datastream
ds = iop.NewDatastreamContext(ctx, iop.NewColumnsFromFields("data"))
ds = iop.NewDatastreamContext(ctx, iop.Columns{})

// Handle flattening option
flatten := true
Expand Down

0 comments on commit ba7dcde

Please sign in to comment.