From ba7dcde4b62a535b3663dcc4d71eccc26f9afbb9 Mon Sep 17 00:00:00 2001 From: Fritz Larco Date: Wed, 8 Jan 2025 21:04:48 -0300 Subject: [PATCH] handle Elasticsearch errors and improve StreamRowsContext - Updated `StreamRowsContext` function to handle potential errors during Elasticsearch search execution more robustly. - Improved error handling by checking HTTP status codes and including response body details in error messages. - Changed function parameter `indexName` to `tableName` for better clarity and consistency. - Added schema parsing to handle table name resolution correctly. - Updated Datastream creation to use iop.Columns{} instead of iop.NewColumnsFromFields("data"). --- core/dbio/database/database_elasticsearch.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/core/dbio/database/database_elasticsearch.go b/core/dbio/database/database_elasticsearch.go index c5718c6f..aa1c1763 100644 --- a/core/dbio/database/database_elasticsearch.go +++ b/core/dbio/database/database_elasticsearch.go @@ -277,7 +277,7 @@ func (conn *ElasticsearchConn) BulkExportFlow(table Table) (df *iop.Dataflow, er return df, nil } -func (conn *ElasticsearchConn) StreamRowsContext(ctx context.Context, indexName string, Opts ...map[string]interface{}) (ds *iop.Datastream, err error) { +func (conn *ElasticsearchConn) StreamRowsContext(ctx context.Context, tableName string, Opts ...map[string]interface{}) (ds *iop.Datastream, err error) { opts := getQueryOptions(Opts) Limit := int64(0) // infinite if val := cast.ToInt64(opts["limit"]); val > 0 { @@ -336,6 +336,8 @@ func (conn *ElasticsearchConn) StreamRowsContext(ctx context.Context, indexName } // Create the search request + table, _ := ParseTableName(tableName, conn.Type) + indexName := table.Schema res, err := conn.Client.Search( conn.Client.Search.WithContext(ctx), conn.Client.Search.WithIndex(indexName), @@ -344,6 +346,9 @@ func (conn *ElasticsearchConn) StreamRowsContext(ctx context.Context, indexName ) if err != nil { return nil, g.Error(err, "could not execute search") + } else if res.StatusCode >= 400 { + bytes, _ := io.ReadAll(res.Body) + return nil, g.Error("could not execute search (status %s) => %s", res.StatusCode, string(bytes)) } var searchResponse map[string]interface{} @@ -360,7 +365,7 @@ func (conn *ElasticsearchConn) StreamRowsContext(ctx context.Context, indexName } // Create base datastream - ds = iop.NewDatastreamContext(ctx, iop.NewColumnsFromFields("data")) + ds = iop.NewDatastreamContext(ctx, iop.Columns{}) // Handle flattening option flatten := true