Skip to content

Commit

Permalink
feat: skip error when extract fail due to access (#175)
Browse files Browse the repository at this point in the history
  • Loading branch information
deryrahman authored Nov 2, 2023
1 parent 9d7cb13 commit c0e6289
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 21 deletions.
31 changes: 22 additions & 9 deletions ext/extractor/bq_extractor.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,30 +40,37 @@ func NewBQExtractor(client DDLViewGetter, l log.Logger) (*BQExtractor, error) {

// Extract returns map of urns and its query string given list of urns
// It extract the corresponding query only if the urn is considered as a view
func (e BQExtractor) Extract(ctx context.Context, resourceURNs []bigquery.ResourceURN) (urnToDDL map[bigquery.ResourceURN]string, err error) {
func (e BQExtractor) Extract(ctx context.Context, resourceURNs []bigquery.ResourceURN) (urnToDDLAll map[bigquery.ResourceURN]string, err error) {
// grouping
dsToNames := bigquery.ResourceURNs(resourceURNs).GroupByProjectDataset()

// fetch ddl for each resourceURN
urnToDDL = make(map[bigquery.ResourceURN]string, len(resourceURNs))
me := errors.NewMultiError("extract resourceURN to ddl errors")
urnToDDLAll = make(map[bigquery.ResourceURN]string)
const maxRetry = 3
for ds, names := range dsToNames {
urnToDDLView, err := bulkGetDDLViewWithRetry(e.client, e.l, maxRetry)(ctx, ds, names)
urnToDDL, err := bulkGetDDLViewWithRetry(e.client, e.l, maxRetry)(ctx, ds, names)
if err != nil {
return nil, err
if isIgnorableError(err) {
e.l.Error(err.Error())
} else {
me.Append(err)
}
}
for urn, ddl := range urnToDDLView {
urnToDDL[urn] = ddl
for urn, ddl := range urnToDDL {
urnToDDLAll[urn] = ddl
}
}

return urnToDDL, nil
return urnToDDLAll, me.ToErr()
}

func bulkGetDDLViewWithRetry(c DDLViewGetter, l log.Logger, retry int) func(context.Context, bigquery.ProjectDataset, []string) (map[bigquery.ResourceURN]string, error) {
return func(ctx context.Context, dataset bigquery.ProjectDataset, names []string) (map[bigquery.ResourceURN]string, error) {
var urnToDDL map[bigquery.ResourceURN]string
var err error
for try := 1; try <= retry; try++ {
urnToDDL, err := c.BulkGetDDLView(ctx, dataset, names)
urnToDDL, err = c.BulkGetDDLView(ctx, dataset, names)
if err != nil {
if strings.Contains(err.Error(), "net/http: TLS handshake timeout") ||
strings.Contains(err.Error(), "unexpected EOF") ||
Expand All @@ -74,10 +81,16 @@ func bulkGetDDLViewWithRetry(c DDLViewGetter, l log.Logger, retry int) func(cont
}

l.Error("error extracting upstreams", err)
return urnToDDL, err
}

return urnToDDL, nil
}
return nil, fmt.Errorf("bigquery api retries exhausted")
return urnToDDL, fmt.Errorf("bigquery api retries exhausted")
}
}

func isIgnorableError(err error) bool {
msg := strings.ToLower(err.Error())
return strings.Contains(msg, "access denied") || strings.Contains(msg, "user does not have permission")
}
17 changes: 15 additions & 2 deletions ext/extractor/bq_extractor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,32 @@ func TestBQExtractor(t *testing.T) {
assert.ErrorContains(t, err, "logger is nil")
assert.Nil(t, bqExtractor)
})
t.Run("should return no error if get ddl is fail", func(t *testing.T) {
t.Run("should return no error if get ddl is fail because of access denied", func(t *testing.T) {
resourceURNTable, _ := bigquery.NewResourceURN("project", "dataset", "name")
resourceURNs := []bigquery.ResourceURN{resourceURNTable}

client := new(Client)
defer client.AssertExpectations(t)

client.On("BulkGetDDLView", ctx, mock.Anything, mock.Anything).Return(nil, fmt.Errorf("some error"))
client.On("BulkGetDDLView", ctx, mock.Anything, mock.Anything).Return(nil, fmt.Errorf("access denied"))
bqExtractor, _ := extractor.NewBQExtractor(client, l)
urnToDDL, err := bqExtractor.Extract(ctx, resourceURNs)
assert.NoError(t, err)
assert.Empty(t, urnToDDL)
})
t.Run("should return error if get ddl is fail due to an error other than access related error", func(t *testing.T) {
resourceURNTable, _ := bigquery.NewResourceURN("project", "dataset", "name")
resourceURNs := []bigquery.ResourceURN{resourceURNTable}

client := new(Client)
defer client.AssertExpectations(t)

client.On("BulkGetDDLView", ctx, mock.Anything, mock.Anything).Return(nil, fmt.Errorf("some error"))
bqExtractor, _ := extractor.NewBQExtractor(client, l)
urnToDDL, err := bqExtractor.Extract(ctx, resourceURNs)
assert.Error(t, err)
assert.Empty(t, urnToDDL)
})
t.Run("should return ddl given corresponding resourceURN", func(t *testing.T) {
resourceURNTable, _ := bigquery.NewResourceURN("project", "dataset", "name")
resourceURNView, _ := bigquery.NewResourceURN("project", "dataset", "view")
Expand Down
15 changes: 12 additions & 3 deletions ext/store/bigquery/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,24 @@ func (c *BqClient) ExternalTableHandleFrom(ds Dataset, name string) ResourceHand
}

func (c *BqClient) BulkGetDDLView(ctx context.Context, pd ProjectDataset, names []string) (map[ResourceURN]string, error) {
me := errors.NewMultiError("bulk get ddl view errors")
urnToDDL := make(map[ResourceURN]string, len(names))
for _, name := range names {
resourceURN, err := NewResourceURN(pd.Project, pd.Dataset, name)
if err != nil {
me.Append(err)
continue
}
urnToDDL[resourceURN] = ""
}

queryContent := buildGetDDLQuery(pd.Project, pd.Dataset, names...)
queryStatement := c.Client.Query(queryContent)
rowIterator, err := queryStatement.Read(ctx)
if err != nil {
return nil, err
return urnToDDL, err
}

urnToDDL := map[ResourceURN]string{}
me := errors.NewMultiError("bulk get ddl view errors")
for {
var values []bigquery.Value
if err := rowIterator.Next(&values); err != nil {
Expand Down
4 changes: 3 additions & 1 deletion plugin/plugin_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,16 +125,18 @@ func (s PluginService) IdentifyUpstreams(ctx context.Context, taskName string, c

// identify all upstream resource urns by all identifier from given asset
resourceURNs := []string{}
me := errors.NewMultiError("identify upstream errors")
for _, upstreamIdentifier := range upstreamIdentifiers {
currentResourceURNs, err := upstreamIdentifier.IdentifyResources(ctx, assets)
if err != nil {
s.l.Error("error when identify upstream")
me.Append(err)
continue
}
resourceURNs = append(resourceURNs, currentResourceURNs...)
}

return resourceURNs, nil
return resourceURNs, me.ToErr()
}

func (s PluginService) ConstructDestinationURN(_ context.Context, taskName string, compiledConfig map[string]string) (string, error) {
Expand Down
10 changes: 6 additions & 4 deletions plugin/upstream_identifier/bq_upstream_identifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,19 @@ func (g BQUpstreamIdentifier) IdentifyResources(ctx context.Context, assets map[
resourcesAccumulation := []*bigquery.ResourceURNWithUpstreams{}

// generate resource urn with upstream from each evaluator
me := errors.NewMultiError("identify resource errors")
for _, evaluatorFunc := range g.evaluatorFuncs {
query := evaluatorFunc(assets)
if query == "" {
return []string{}, nil
continue
}

visited := map[string][]*bigquery.ResourceURNWithUpstreams{}
paths := map[string]bool{}
resources, err := g.identifyResources(ctx, query, visited, paths)
if err != nil {
return nil, err
me.Append(err)
continue
}
resourcesAccumulation = append(resourcesAccumulation, resources...)
}
Expand All @@ -47,7 +49,7 @@ func (g BQUpstreamIdentifier) IdentifyResources(ctx context.Context, assets map[
for i, r := range flattenedResources {
resourceURNs[i] = r.ResourceURN.URN()
}
return resourceURNs, nil
return resourceURNs, me.ToErr()
}

func (g BQUpstreamIdentifier) identifyResources(ctx context.Context, query string, visited map[string][]*bigquery.ResourceURNWithUpstreams, paths map[string]bool) ([]*bigquery.ResourceURNWithUpstreams, error) {
Expand All @@ -61,7 +63,7 @@ func (g BQUpstreamIdentifier) identifyResources(ctx context.Context, query strin
urnToQuery, err := g.extractorFunc(ctx, resourceURNs)
if err != nil {
g.logger.Error(fmt.Sprintf("error when extract ddl resource: %s", err.Error()))
return resources, nil
return nil, err
}

for _, resourceURN := range resourceURNs {
Expand Down
4 changes: 2 additions & 2 deletions plugin/upstream_identifier/bq_upstream_identifier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func TestIdentifyResources(t *testing.T) {
assert.NoError(t, err)
assert.Empty(t, resourceURNs)
})
t.Run("should return empty resource and no error when extractor fail to extract", func(t *testing.T) {
t.Run("should return empty resource and error when extractor fail to extract", func(t *testing.T) {
evaluatorFunc := new(EvalAssetFunc)
defer evaluatorFunc.AssertExpectations(t)
parserFunc := new(ParserFunc)
Expand All @@ -92,7 +92,7 @@ func TestIdentifyResources(t *testing.T) {
assert.NotNil(t, bqUpstreamIdentifier)

resourceURNs, err := bqUpstreamIdentifier.IdentifyResources(ctx, assets)
assert.NoError(t, err)
assert.Error(t, err)
assert.Empty(t, resourceURNs)
})
t.Run("should skip the urn if parser passed with wrong urn formt", func(t *testing.T) {
Expand Down

0 comments on commit c0e6289

Please sign in to comment.