Skip to content

Commit

Permalink
fix: ignore views in ignoreupstream (#46)
Browse files Browse the repository at this point in the history
* Fix: fix ignoreupstream helper for big query view

* fix: fix failing build

Co-authored-by: Sumit Agrawal <[email protected]>
  • Loading branch information
sbchaos and BitWielderSumit authored Aug 25, 2022
1 parent 221796c commit 66e09f1
Show file tree
Hide file tree
Showing 2 changed files with 156 additions and 0 deletions.
21 changes: 21 additions & 0 deletions task/bq2bq/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ var (
SecretName = "TASK_BQ2BQ"
BqServiceAccount = "BQ_SERVICE_ACCOUNT"

TimeoutDuration = time.Second * 180
MaxBQApiRetries = 3
FakeSelectStmt = "SELECT * from `%s` WHERE FALSE LIMIT 1"

Expand Down Expand Up @@ -437,6 +438,26 @@ func (b *BQ2BQ) GenerateDependencies(ctx context.Context, request models.Generat
return response, err
}

timeoutCtx, cancel := context.WithTimeout(ctx, TimeoutDuration)
defer cancel()

// try to resolve referenced tables for ignoredDependencies
var ignoredDependenciesReferencedTables []string
for _, tableName := range ignoredDependencies {
// ignore the tables with :
if strings.Contains(tableName, ":") { // project:dataset.table
continue
}
// find referenced tables and add it to ignoredDependenciesReferencedTables
fakeQuery := fmt.Sprintf(FakeSelectStmt, tableName)
deps, err := b.FindDependenciesWithRetryableDryRun(timeoutCtx, fakeQuery, svcAcc)
if err != nil {
return response, err
}
ignoredDependenciesReferencedTables = append(ignoredDependenciesReferencedTables, deps...)
}
ignoredDependencies = append(ignoredDependencies, ignoredDependenciesReferencedTables...)

// try to resolve referenced tables directly from BQ APIs
response.Dependencies, err = b.FindDependenciesWithRetryableDryRun(spanCtx, queryData.Value, svcAcc)
if err != nil {
Expand Down
135 changes: 135 additions & 0 deletions task/bq2bq/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -811,20 +811,155 @@ Select * from table where ts > "2021-01-16T00:00:00Z"`
})
defer job.AssertExpectations(t)

jobTableReferenceTables := new(bqJob)
jobTableReferenceTables.On("LastStatus").Return(&bigquery.JobStatus{
Errors: nil,
Statistics: &bigquery.JobStatistics{
Details: &bigquery.QueryStatistics{
ReferencedTables: []*bigquery.Table{
{
ProjectID: "proj",
DatasetID: "dataset",
TableID: "table1",
},
},
},
},
})
defer jobTableReferenceTables.AssertExpectations(t)

qry := new(bqQuery)
qry.On("Run", mock.Anything).Return(job, nil)
qry.On("SetQueryConfig", mock.AnythingOfType("bqiface.QueryConfig")).Once()
defer qry.AssertExpectations(t)

qryTableReferenceTables := new(bqQuery)
qryTableReferenceTables.On("Run", mock.Anything).Return(jobTableReferenceTables, nil)
qryTableReferenceTables.On("SetQueryConfig", mock.AnythingOfType("bqiface.QueryConfig")).Once()
defer qry.AssertExpectations(t)

client := new(bqClientMock)
qf, _ := data.Assets.Get(QueryFileName)
client.On("Query", qf.Value).Return(qry)
client.On("Query", "SELECT * from `proj.dataset.table1` WHERE FALSE LIMIT 1").Return(qryTableReferenceTables)
defer client.AssertExpectations(t)

bqClientFac := new(bqClientFactoryMock)
bqClientFac.On("New", mock.Anything, "BQ_ACCOUNT_SECRET").Return(client, nil)
defer bqClientFac.AssertExpectations(t)

b := &BQ2BQ{
ClientFac: bqClientFac,
}
got, err := b.GenerateDependencies(context.Background(), data)
if err != nil {
t.Errorf("error = %v", err)
return
}
if !reflect.DeepEqual(got.Dependencies, expectedDeps) {
t.Errorf("got = %v, want %v", got, expectedDeps)
}
})
t.Run("should generate dependencies using BQ APIs for select statements but ignore if asked explicitly for view", func(t *testing.T) {
expectedDeps := []string{"bigquery://proj:dataset.table1"}
data := models.GenerateDependenciesRequest{
Assets: models.PluginAssets{}.FromJobSpec(*models.JobAssets{}.New([]models.JobSpecAsset{
{
Name: QueryFileName,
Value: "Select * from proj.dataset.table1 t1 left join /* @ignoreupstream */ proj.dataset.view1 v1 on t1.date=v1.date",
},
})),
Config: models.PluginConfigs{}.FromJobSpec(models.JobSpecConfigs{
{
Name: "PROJECT",
Value: "proj",
},
{
Name: "DATASET",
Value: "datas",
},
{
Name: "TABLE",
Value: "tab",
},
}),
Project: models.ProjectSpec{Secret: models.ProjectSecrets{
{
Name: SecretName,
Value: "some_secret",
},
}},
}

job := new(bqJob)
job.On("LastStatus").Return(&bigquery.JobStatus{
Errors: nil,
Statistics: &bigquery.JobStatistics{
Details: &bigquery.QueryStatistics{
ReferencedTables: []*bigquery.Table{
{
ProjectID: "proj",
DatasetID: "dataset",
TableID: "table1",
},
{
ProjectID: "proj",
DatasetID: "dataset",
TableID: "viewtable1",
},
{
ProjectID: "proj",
DatasetID: "dataset",
TableID: "viewtable2",
},
},
},
},
})

jobViewReferenceTables := new(bqJob)
jobViewReferenceTables.On("LastStatus").Return(&bigquery.JobStatus{
Errors: nil,
Statistics: &bigquery.JobStatistics{
Details: &bigquery.QueryStatistics{
ReferencedTables: []*bigquery.Table{
{
ProjectID: "proj",
DatasetID: "dataset",
TableID: "viewtable1",
},
{
ProjectID: "proj",
DatasetID: "dataset",
TableID: "viewtable2",
},
},
},
},
})

defer job.AssertExpectations(t)

qry := new(bqQuery)
qry.On("Run", mock.Anything).Return(job, nil)
qry.On("SetQueryConfig", mock.AnythingOfType("bqiface.QueryConfig")).Once()
defer qry.AssertExpectations(t)

qryViewReferenceTables := new(bqQuery)
qryViewReferenceTables.On("Run", mock.Anything).Return(jobViewReferenceTables, nil)
qryViewReferenceTables.On("SetQueryConfig", mock.AnythingOfType("bqiface.QueryConfig")).Once()
defer qry.AssertExpectations(t)

client := new(bqClientMock)
qf, _ := data.Assets.Get(QueryFileName)
client.On("Query", qf.Value).Return(qry)
client.On("Query", "SELECT * from `proj.dataset.view1` WHERE FALSE LIMIT 1").Return(qryViewReferenceTables)
defer client.AssertExpectations(t)

bqClientFac := new(bqClientFactoryMock)
bqClientFac.On("New", mock.Anything, "some_secret").Return(client, nil)
defer bqClientFac.AssertExpectations(t)

b := &BQ2BQ{
ClientFac: bqClientFac,
}
Expand Down

0 comments on commit 66e09f1

Please sign in to comment.