From 66e09f100feb0ce003b590da45ff96d7f83b95d5 Mon Sep 17 00:00:00 2001 From: Sandeep Bhardwaj Date: Thu, 25 Aug 2022 19:57:18 +0530 Subject: [PATCH] fix: ignore views in ignoreupstream (#46) * Fix: fix ignoreupstream helper for big query view * fix: fix failing build Co-authored-by: Sumit Agrawal --- task/bq2bq/main.go | 21 +++++++ task/bq2bq/main_test.go | 135 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 156 insertions(+) diff --git a/task/bq2bq/main.go b/task/bq2bq/main.go index 70c2c7f..32673bf 100644 --- a/task/bq2bq/main.go +++ b/task/bq2bq/main.go @@ -64,6 +64,7 @@ var ( SecretName = "TASK_BQ2BQ" BqServiceAccount = "BQ_SERVICE_ACCOUNT" + TimeoutDuration = time.Second * 180 MaxBQApiRetries = 3 FakeSelectStmt = "SELECT * from `%s` WHERE FALSE LIMIT 1" @@ -437,6 +438,26 @@ func (b *BQ2BQ) GenerateDependencies(ctx context.Context, request models.Generat return response, err } + timeoutCtx, cancel := context.WithTimeout(ctx, TimeoutDuration) + defer cancel() + + // try to resolve referenced tables for ignoredDependencies + var ignoredDependenciesReferencedTables []string + for _, tableName := range ignoredDependencies { + // ignore the tables with : + if strings.Contains(tableName, ":") { // project:dataset.table + continue + } + // find referenced tables and add it to ignoredDependenciesReferencedTables + fakeQuery := fmt.Sprintf(FakeSelectStmt, tableName) + deps, err := b.FindDependenciesWithRetryableDryRun(timeoutCtx, fakeQuery, svcAcc) + if err != nil { + return response, err + } + ignoredDependenciesReferencedTables = append(ignoredDependenciesReferencedTables, deps...) + } + ignoredDependencies = append(ignoredDependencies, ignoredDependenciesReferencedTables...) + // try to resolve referenced tables directly from BQ APIs response.Dependencies, err = b.FindDependenciesWithRetryableDryRun(spanCtx, queryData.Value, svcAcc) if err != nil { diff --git a/task/bq2bq/main_test.go b/task/bq2bq/main_test.go index 643879c..d7e917a 100644 --- a/task/bq2bq/main_test.go +++ b/task/bq2bq/main_test.go @@ -811,20 +811,155 @@ Select * from table where ts > "2021-01-16T00:00:00Z"` }) defer job.AssertExpectations(t) + jobTableReferenceTables := new(bqJob) + jobTableReferenceTables.On("LastStatus").Return(&bigquery.JobStatus{ + Errors: nil, + Statistics: &bigquery.JobStatistics{ + Details: &bigquery.QueryStatistics{ + ReferencedTables: []*bigquery.Table{ + { + ProjectID: "proj", + DatasetID: "dataset", + TableID: "table1", + }, + }, + }, + }, + }) + defer jobTableReferenceTables.AssertExpectations(t) + qry := new(bqQuery) qry.On("Run", mock.Anything).Return(job, nil) qry.On("SetQueryConfig", mock.AnythingOfType("bqiface.QueryConfig")).Once() defer qry.AssertExpectations(t) + qryTableReferenceTables := new(bqQuery) + qryTableReferenceTables.On("Run", mock.Anything).Return(jobTableReferenceTables, nil) + qryTableReferenceTables.On("SetQueryConfig", mock.AnythingOfType("bqiface.QueryConfig")).Once() + defer qry.AssertExpectations(t) + client := new(bqClientMock) qf, _ := data.Assets.Get(QueryFileName) client.On("Query", qf.Value).Return(qry) + client.On("Query", "SELECT * from `proj.dataset.table1` WHERE FALSE LIMIT 1").Return(qryTableReferenceTables) defer client.AssertExpectations(t) bqClientFac := new(bqClientFactoryMock) bqClientFac.On("New", mock.Anything, "BQ_ACCOUNT_SECRET").Return(client, nil) defer bqClientFac.AssertExpectations(t) + b := &BQ2BQ{ + ClientFac: bqClientFac, + } + got, err := b.GenerateDependencies(context.Background(), data) + if err != nil { + t.Errorf("error = %v", err) + return + } + if !reflect.DeepEqual(got.Dependencies, expectedDeps) { + t.Errorf("got = %v, want %v", got, expectedDeps) + } + }) + t.Run("should generate dependencies using BQ APIs for select statements but ignore if asked explicitly for view", func(t *testing.T) { + expectedDeps := []string{"bigquery://proj:dataset.table1"} + data := models.GenerateDependenciesRequest{ + Assets: models.PluginAssets{}.FromJobSpec(*models.JobAssets{}.New([]models.JobSpecAsset{ + { + Name: QueryFileName, + Value: "Select * from proj.dataset.table1 t1 left join /* @ignoreupstream */ proj.dataset.view1 v1 on t1.date=v1.date", + }, + })), + Config: models.PluginConfigs{}.FromJobSpec(models.JobSpecConfigs{ + { + Name: "PROJECT", + Value: "proj", + }, + { + Name: "DATASET", + Value: "datas", + }, + { + Name: "TABLE", + Value: "tab", + }, + }), + Project: models.ProjectSpec{Secret: models.ProjectSecrets{ + { + Name: SecretName, + Value: "some_secret", + }, + }}, + } + + job := new(bqJob) + job.On("LastStatus").Return(&bigquery.JobStatus{ + Errors: nil, + Statistics: &bigquery.JobStatistics{ + Details: &bigquery.QueryStatistics{ + ReferencedTables: []*bigquery.Table{ + { + ProjectID: "proj", + DatasetID: "dataset", + TableID: "table1", + }, + { + ProjectID: "proj", + DatasetID: "dataset", + TableID: "viewtable1", + }, + { + ProjectID: "proj", + DatasetID: "dataset", + TableID: "viewtable2", + }, + }, + }, + }, + }) + + jobViewReferenceTables := new(bqJob) + jobViewReferenceTables.On("LastStatus").Return(&bigquery.JobStatus{ + Errors: nil, + Statistics: &bigquery.JobStatistics{ + Details: &bigquery.QueryStatistics{ + ReferencedTables: []*bigquery.Table{ + { + ProjectID: "proj", + DatasetID: "dataset", + TableID: "viewtable1", + }, + { + ProjectID: "proj", + DatasetID: "dataset", + TableID: "viewtable2", + }, + }, + }, + }, + }) + + defer job.AssertExpectations(t) + + qry := new(bqQuery) + qry.On("Run", mock.Anything).Return(job, nil) + qry.On("SetQueryConfig", mock.AnythingOfType("bqiface.QueryConfig")).Once() + defer qry.AssertExpectations(t) + + qryViewReferenceTables := new(bqQuery) + qryViewReferenceTables.On("Run", mock.Anything).Return(jobViewReferenceTables, nil) + qryViewReferenceTables.On("SetQueryConfig", mock.AnythingOfType("bqiface.QueryConfig")).Once() + defer qry.AssertExpectations(t) + + client := new(bqClientMock) + qf, _ := data.Assets.Get(QueryFileName) + client.On("Query", qf.Value).Return(qry) + client.On("Query", "SELECT * from `proj.dataset.view1` WHERE FALSE LIMIT 1").Return(qryViewReferenceTables) + defer client.AssertExpectations(t) + + bqClientFac := new(bqClientFactoryMock) + bqClientFac.On("New", mock.Anything, "some_secret").Return(client, nil) + defer bqClientFac.AssertExpectations(t) + b := &BQ2BQ{ ClientFac: bqClientFac, }