diff --git a/core/dbio/database/database_elasticsearch.go b/core/dbio/database/database_elasticsearch.go index c5718c6f..aa1c1763 100644 --- a/core/dbio/database/database_elasticsearch.go +++ b/core/dbio/database/database_elasticsearch.go @@ -277,7 +277,7 @@ func (conn *ElasticsearchConn) BulkExportFlow(table Table) (df *iop.Dataflow, er return df, nil } -func (conn *ElasticsearchConn) StreamRowsContext(ctx context.Context, indexName string, Opts ...map[string]interface{}) (ds *iop.Datastream, err error) { +func (conn *ElasticsearchConn) StreamRowsContext(ctx context.Context, tableName string, Opts ...map[string]interface{}) (ds *iop.Datastream, err error) { opts := getQueryOptions(Opts) Limit := int64(0) // infinite if val := cast.ToInt64(opts["limit"]); val > 0 { @@ -336,6 +336,8 @@ func (conn *ElasticsearchConn) StreamRowsContext(ctx context.Context, indexName } // Create the search request + table, _ := ParseTableName(tableName, conn.Type) + indexName := table.Schema res, err := conn.Client.Search( conn.Client.Search.WithContext(ctx), conn.Client.Search.WithIndex(indexName), @@ -344,6 +346,9 @@ func (conn *ElasticsearchConn) StreamRowsContext(ctx context.Context, indexName ) if err != nil { return nil, g.Error(err, "could not execute search") + } else if res.StatusCode >= 400 { + bytes, _ := io.ReadAll(res.Body) + return nil, g.Error("could not execute search (status %s) => %s", res.StatusCode, string(bytes)) } var searchResponse map[string]interface{} @@ -360,7 +365,7 @@ func (conn *ElasticsearchConn) StreamRowsContext(ctx context.Context, indexName } // Create base datastream - ds = iop.NewDatastreamContext(ctx, iop.NewColumnsFromFields("data")) + ds = iop.NewDatastreamContext(ctx, iop.Columns{}) // Handle flattening option flatten := true