Skip to content

Commit

Permalink
fix(bigquery): Allow inserts to a target dataset/table in another pro…
Browse files Browse the repository at this point in the history
…ject (#28097)
  • Loading branch information
alvarowolfx authored Dec 13, 2024
1 parent e901260 commit 0430f6c
Show file tree
Hide file tree
Showing 8 changed files with 136 additions and 8 deletions.
5 changes: 3 additions & 2 deletions google-cloud-bigquery/lib/google/cloud/bigquery/dataset.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2810,7 +2810,7 @@ def insert_async table_id, skip_invalid: nil, ignore_unknown: nil, max_bytes: 10
ensure_service!

# Get table, don't use Dataset#table which handles NotFoundError
gapi = service.get_table dataset_id, table_id, metadata_view: view
gapi = service.get_project_table project_id, dataset_id, table_id, metadata_view: view
table = Table.from_gapi gapi, service, metadata_view: view
# Get the AsyncInserter from the table
table.insert_async skip_invalid: skip_invalid,
Expand Down Expand Up @@ -2865,7 +2865,8 @@ def insert_data table_id, rows, skip_invalid: nil, ignore_unknown: nil, insert_i
ensure_service!
gapi = service.insert_tabledata dataset_id, table_id, rows, skip_invalid: skip_invalid,
ignore_unknown: ignore_unknown,
insert_ids: insert_ids
insert_ids: insert_ids,
project_id: project_id
InsertResponse.from_gapi rows, gapi
end

Expand Down
11 changes: 7 additions & 4 deletions google-cloud-bigquery/lib/google/cloud/bigquery/service.rb
Original file line number Diff line number Diff line change
Expand Up @@ -257,15 +257,17 @@ def list_tabledata dataset_id, table_id, max: nil, token: nil, start: nil
end
end

def insert_tabledata dataset_id, table_id, rows, insert_ids: nil, ignore_unknown: nil, skip_invalid: nil
def insert_tabledata dataset_id, table_id, rows, insert_ids: nil, ignore_unknown: nil,
skip_invalid: nil, project_id: nil
json_rows = Array(rows).map { |row| Convert.to_json_row row }
insert_tabledata_json_rows dataset_id, table_id, json_rows, insert_ids: insert_ids,
ignore_unknown: ignore_unknown,
skip_invalid: skip_invalid
skip_invalid: skip_invalid,
project_id: project_id
end

def insert_tabledata_json_rows dataset_id, table_id, json_rows, insert_ids: nil, ignore_unknown: nil,
skip_invalid: nil
skip_invalid: nil, project_id: nil
rows_and_ids = Array(json_rows).zip Array(insert_ids)
insert_rows = rows_and_ids.map do |json_row, insert_id|
if insert_id == :skip
Expand All @@ -286,9 +288,10 @@ def insert_tabledata_json_rows dataset_id, table_id, json_rows, insert_ids: nil,
}.to_json

# The insertAll with insertId operation is considered idempotent
project_id ||= @project
execute backoff: true do
service.insert_all_table_data(
@project, dataset_id, table_id, insert_req,
project_id, dataset_id, table_id, insert_req,
options: { skip_serialization: true }
)
end
Expand Down
3 changes: 2 additions & 1 deletion google-cloud-bigquery/lib/google/cloud/bigquery/table.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2747,7 +2747,8 @@ def insert rows, insert_ids: nil, skip_invalid: nil, ignore_unknown: nil
rows,
skip_invalid: skip_invalid,
ignore_unknown: ignore_unknown,
insert_ids: insert_ids
insert_ids: insert_ids,
project_id: project_id
InsertResponse.from_gapi rows, gapi
end

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,8 @@ def push_batch_request!
json_rows,
skip_invalid: @skip_invalid,
ignore_unknown: @ignore_unknown,
insert_ids: insert_ids
insert_ids: insert_ids,
project_id: @table.project_id

result = Result.new InsertResponse.from_gapi(orig_rows, insert_resp)
rescue StandardError => e
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,46 @@

mock.verify
end

it "insert rows into another project" do
mock = Minitest::Mock.new

another_project_id = "another-project"
dataset_another_project = Google::Cloud::Bigquery::Dataset.new_reference another_project_id, dataset_id, bigquery.service
table_another_project_hash = random_table_hash dataset_id, table_id
table_gapi_another_project = Google::Apis::BigqueryV2::Table.from_json table_another_project_hash.to_json
table_gapi_another_project.table_reference.project_id = another_project_id

insert_req = {
rows: [insert_rows.first], ignoreUnknownValues: nil, skipInvalidRows: nil
}.to_json
mock.expect :get_table, table_gapi_another_project, [another_project_id, dataset_id, table_id], **patch_table_args
mock.expect :insert_all_table_data, success_table_insert_gapi,
[another_project_id, dataset_id, table_id, insert_req], options: { skip_serialization: true }
dataset_another_project.service.mocked_service = mock

inserter = dataset_another_project.insert_async table_id

SecureRandom.stub :uuid, insert_id do
inserter.insert rows.first

_(inserter.batch.rows).must_equal [rows.first]

_(inserter).must_be :started?
_(inserter).wont_be :stopped?

# force the queued rows to be inserted
inserter.flush
inserter.stop.wait!

_(inserter).wont_be :started?
_(inserter).must_be :stopped?

_(inserter.batch).must_be :nil?
end

mock.verify
end
end

it "inserts three rows at the same time" do
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,29 @@
_(result.error_count).must_equal 0
end

it "can insert rows into another project" do
mock = Minitest::Mock.new
insert_req = {
rows: [insert_rows.first], ignoreUnknownValues: nil, skipInvalidRows: nil
}.to_json
another_project_id = "another-project"
mock.expect :insert_all_table_data, success_table_insert_gapi,
[another_project_id, dataset_id, table_id, insert_req], options: { skip_serialization: true }
dataset.service.mocked_service = mock
dataset.gapi.dataset_reference.project_id = another_project_id

result = nil
SecureRandom.stub :uuid, insert_id do
result = dataset.insert table_id, rows.first
end

mock.verify

_(result).must_be :success?
_(result.insert_count).must_equal 1
_(result.error_count).must_equal 0
end

describe "dataset reference" do
let(:dataset) {Google::Cloud::Bigquery::Dataset.new_reference project, dataset_id, bigquery.service }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,42 @@
mock.verify
end

it "insert rows into another project" do
mock = Minitest::Mock.new
insert_req = {
rows: [insert_rows.first], ignoreUnknownValues: nil, skipInvalidRows: nil
}.to_json
another_project_id = "another-project"
mock.expect :insert_all_table_data, success_table_insert_gapi,
[another_project_id, table.dataset_id, table.table_id, insert_req], options: { skip_serialization: true }
table.service.mocked_service = mock
table.gapi.table_reference.project_id = another_project_id

inserter = table.insert_async do |result|
puts "table.insert_async: #{result.error.inspect}" if result.error
end

SecureRandom.stub :uuid, insert_id do
inserter.insert rows.first

_(inserter.batch.rows).must_equal [rows.first]

_(inserter).must_be :started?
_(inserter).wont_be :stopped?

# force the queued rows to be inserted
inserter.flush
inserter.stop.wait!

_(inserter).wont_be :started?
_(inserter).must_be :stopped?

_(inserter.batch).must_be :nil?
end

mock.verify
end

it "inserts three rows at the same time" do
mock = Minitest::Mock.new
insert_req = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,29 @@
_(result.error_count).must_equal 0
end

it "can insert rows into another project" do
mock = Minitest::Mock.new
insert_req = {
rows: [insert_rows.first], ignoreUnknownValues: nil, skipInvalidRows: nil
}.to_json
another_project_id = "another-project"
table.gapi.table_reference.project_id = another_project_id
mock.expect :insert_all_table_data, success_table_insert_gapi,
[another_project_id, table.dataset_id, table.table_id, insert_req], options: { skip_serialization: true }
table.service.mocked_service = mock

result = nil
SecureRandom.stub :uuid, insert_id do
result = table.insert rows.first
end

mock.verify

_(result).must_be :success?
_(result.insert_count).must_equal 1
_(result.error_count).must_equal 0
end

it "can insert multiple rows" do
mock = Minitest::Mock.new
insert_req = {
Expand Down

0 comments on commit 0430f6c

Please sign in to comment.