diff --git a/pandas_gbq/exceptions.py b/pandas_gbq/exceptions.py index 1b4f6925..e37e753c 100644 --- a/pandas_gbq/exceptions.py +++ b/pandas_gbq/exceptions.py @@ -35,3 +35,12 @@ class PerformanceWarning(RuntimeWarning): Such warnings can occur when dependencies for the requested feature aren't up-to-date. """ + + +class QueryTimeout(ValueError): + """ + Raised when the query request exceeds the timeoutMs value specified in the + BigQuery configuration. + """ + + pass diff --git a/pandas_gbq/gbq.py b/pandas_gbq/gbq.py index dbb9e5b5..2164c043 100644 --- a/pandas_gbq/gbq.py +++ b/pandas_gbq/gbq.py @@ -3,7 +3,6 @@ # license that can be found in the LICENSE file. import copy -import concurrent.futures from datetime import datetime import logging import re @@ -20,8 +19,9 @@ if typing.TYPE_CHECKING: # pragma: NO COVER import pandas -from pandas_gbq.exceptions import AccessDenied, GenericGBQException +from pandas_gbq.exceptions import GenericGBQException, QueryTimeout from pandas_gbq.features import FEATURES +import pandas_gbq.query import pandas_gbq.schema import pandas_gbq.timestamp @@ -130,15 +130,6 @@ class NotFoundException(ValueError): pass -class QueryTimeout(ValueError): - """ - Raised when the query request exceeds the timeoutMs value specified in the - BigQuery configuration. - """ - - pass - - class TableCreationError(ValueError): """ Raised when the create table method fails @@ -340,10 +331,6 @@ def __init__( self.client = self.get_client() self.use_bqstorage_api = use_bqstorage_api - # BQ Queries costs $5 per TB. First 1 TB per month is free - # see here for more: https://cloud.google.com/bigquery/pricing - self.query_price_for_TB = 5.0 / 2**40 # USD/TB - def _start_timer(self): self.start = time.time() @@ -355,16 +342,6 @@ def log_elapsed_seconds(self, prefix="Elapsed", postfix="s.", overlong=6): if sec > overlong: logger.info("{} {} {}".format(prefix, sec, postfix)) - # http://stackoverflow.com/questions/1094841/reusable-library-to-get-human-readable-version-of-file-size - @staticmethod - def sizeof_fmt(num, suffix="B"): - fmt = "%3.1f %s%s" - for unit in ["", "K", "M", "G", "T", "P", "E", "Z"]: - if abs(num) < 1024.0: - return fmt % (num, unit, suffix) - num /= 1024.0 - return fmt % (num, "Y", suffix) - def get_client(self): import google.api_core.client_info from google.cloud import bigquery @@ -421,46 +398,10 @@ def download_table( user_dtypes=dtypes, ) - def _wait_for_query_job(self, query_reply, timeout_ms): - """Wait for query to complete, pausing occasionally to update progress. - - Args: - query_reply (QueryJob): - A query job which has started. - - timeout_ms (Optional[int]): - How long to wait before cancelling the query. - """ - # Wait at most 10 seconds so we can show progress. - # TODO(https://github.com/googleapis/python-bigquery-pandas/issues/327): - # Include a tqdm progress bar here instead of a stream of log messages. - timeout_sec = 10.0 - if timeout_ms: - timeout_sec = min(timeout_sec, timeout_ms / 1000.0) - - while query_reply.state != "DONE": - self.log_elapsed_seconds(" Elapsed", "s. Waiting...") - - if timeout_ms and timeout_ms < self.get_elapsed_seconds() * 1000: - self.client.cancel_job( - query_reply.job_id, location=query_reply.location - ) - raise QueryTimeout("Query timeout: {} ms".format(timeout_ms)) - - try: - query_reply.result(timeout=timeout_sec) - except concurrent.futures.TimeoutError: - # Use our own timeout logic - pass - except self.http_error as ex: - self.process_http_error(ex) - def run_query(self, query, max_results=None, progress_bar_type=None, **kwargs): - from google.auth.exceptions import RefreshError from google.cloud import bigquery - import pandas - job_config = { + job_config_dict = { "query": { "useLegacySql": self.dialect == "legacy" @@ -470,74 +411,27 @@ def run_query(self, query, max_results=None, progress_bar_type=None, **kwargs): } config = kwargs.get("configuration") if config is not None: - job_config.update(config) + job_config_dict.update(config) - self._start_timer() - - try: - logger.debug("Requesting query... ") - query_reply = self.client.query( - query, - job_config=bigquery.QueryJobConfig.from_api_repr(job_config), - location=self.location, - project=self.project_id, - ) - logger.debug("Query running...") - except (RefreshError, ValueError) as ex: - if self.private_key: - raise AccessDenied( - f"The service account credentials are not valid: {ex}" - ) - else: - raise AccessDenied( - "The credentials have been revoked or expired, " - f"please re-run the application to re-authorize: {ex}" - ) - except self.http_error as ex: - self.process_http_error(ex) - - job_id = query_reply.job_id - logger.debug("Job ID: %s" % job_id) - - timeout_ms = job_config.get("jobTimeoutMs") or job_config["query"].get( - "timeoutMs" - ) + timeout_ms = job_config_dict.get("jobTimeoutMs") or job_config_dict[ + "query" + ].get("timeoutMs") timeout_ms = int(timeout_ms) if timeout_ms else None - self._wait_for_query_job(query_reply, timeout_ms) - if query_reply.cache_hit: - logger.debug("Query done.\nCache hit.\n") - else: - bytes_processed = query_reply.total_bytes_processed or 0 - bytes_billed = query_reply.total_bytes_billed or 0 - logger.debug( - "Query done.\nProcessed: {} Billed: {}".format( - self.sizeof_fmt(bytes_processed), - self.sizeof_fmt(bytes_billed), - ) - ) - logger.debug( - "Standard price: ${:,.2f} USD\n".format( - bytes_billed * self.query_price_for_TB - ) - ) + self._start_timer() + job_config = bigquery.QueryJobConfig.from_api_repr(job_config_dict) + rows_iter = pandas_gbq.query.query_and_wait( + self, + self.client, + query, + location=self.location, + project_id=self.project_id, + job_config=job_config, + max_results=max_results, + timeout_ms=timeout_ms, + ) dtypes = kwargs.get("dtypes") - - # Ensure destination is populated. - try: - query_reply.result() - except self.http_error as ex: - self.process_http_error(ex) - - # Avoid attempting to download results from DML queries, which have no - # destination. - if query_reply.destination is None: - return pandas.DataFrame() - - rows_iter = self.client.list_rows( - query_reply.destination, max_results=max_results - ) return self._download_results( rows_iter, max_results=max_results, diff --git a/pandas_gbq/query.py b/pandas_gbq/query.py new file mode 100644 index 00000000..bb8fae2b --- /dev/null +++ b/pandas_gbq/query.py @@ -0,0 +1,135 @@ +# Copyright (c) 2017 pandas-gbq Authors All rights reserved. +# Use of this source code is governed by a BSD-style +# license that can be found in the LICENSE file. + +from __future__ import annotations + +import concurrent.futures +import logging +from typing import Optional + +from google.cloud import bigquery + +import pandas_gbq.exceptions + + +logger = logging.getLogger(__name__) + + +# On-demand BQ Queries costs $6.25 per TB. First 1 TB per month is free +# see here for more: https://cloud.google.com/bigquery/pricing +QUERY_PRICE_FOR_TB = 6.25 / 2**40 # USD/TB + + +# http://stackoverflow.com/questions/1094841/reusable-library-to-get-human-readable-version-of-file-size +def sizeof_fmt(num, suffix="B"): + fmt = "%3.1f %s%s" + for unit in ["", "K", "M", "G", "T", "P", "E", "Z"]: + if abs(num) < 1024.0: + return fmt % (num, unit, suffix) + num /= 1024.0 + return fmt % (num, "Y", suffix) + + +def _wait_for_query_job( + connector, + client: bigquery.Client, + query_reply: bigquery.QueryJob, + timeout_ms: Optional[float], +): + """Wait for query to complete, pausing occasionally to update progress. + + Args: + query_reply (QueryJob): + A query job which has started. + + timeout_ms (Optional[int]): + How long to wait before cancelling the query. + """ + # Wait at most 10 seconds so we can show progress. + # TODO(https://github.com/googleapis/python-bigquery-pandas/issues/327): + # Include a tqdm progress bar here instead of a stream of log messages. + timeout_sec = 10.0 + if timeout_ms: + timeout_sec = min(timeout_sec, timeout_ms / 1000.0) + + while query_reply.state != "DONE": + connector.log_elapsed_seconds(" Elapsed", "s. Waiting...") + + if timeout_ms and timeout_ms < connector.get_elapsed_seconds() * 1000: + client.cancel_job(query_reply.job_id, location=query_reply.location) + raise pandas_gbq.exceptions.QueryTimeout( + "Query timeout: {} ms".format(timeout_ms) + ) + + try: + query_reply.result(timeout=timeout_sec) + except concurrent.futures.TimeoutError: + # Use our own timeout logic + pass + except connector.http_error as ex: + connector.process_http_error(ex) + + +def query_and_wait( + connector, + client: bigquery.Client, + query: str, + *, + job_config: bigquery.QueryJobConfig, + location: Optional[str], + project_id: Optional[str], + max_results: Optional[int], + timeout_ms: Optional[int], +): + from google.auth.exceptions import RefreshError + + try: + logger.debug("Requesting query... ") + query_reply = client.query( + query, + job_config=job_config, + location=location, + project=project_id, + ) + logger.debug("Query running...") + except (RefreshError, ValueError) as ex: + if connector.private_key: + raise pandas_gbq.exceptions.AccessDenied( + f"The service account credentials are not valid: {ex}" + ) + else: + raise pandas_gbq.exceptions.AccessDenied( + "The credentials have been revoked or expired, " + f"please re-run the application to re-authorize: {ex}" + ) + except connector.http_error as ex: + connector.process_http_error(ex) + + job_id = query_reply.job_id + logger.debug("Job ID: %s" % job_id) + + _wait_for_query_job(connector, connector.client, query_reply, timeout_ms) + + if query_reply.cache_hit: + logger.debug("Query done.\nCache hit.\n") + else: + bytes_processed = query_reply.total_bytes_processed or 0 + bytes_billed = query_reply.total_bytes_billed or 0 + logger.debug( + "Query done.\nProcessed: {} Billed: {}".format( + sizeof_fmt(bytes_processed), + sizeof_fmt(bytes_billed), + ) + ) + logger.debug( + "Standard price: ${:,.2f} USD\n".format(bytes_billed * QUERY_PRICE_FOR_TB) + ) + + # As of google-cloud-bigquery 2.3.0, QueryJob.result() uses + # getQueryResults() instead of tabledata.list, which returns the correct + # response with DML/DDL queries. + try: + return query_reply.result(max_results=max_results) + except connector.http_error as ex: + connector.process_http_error(ex) diff --git a/setup.py b/setup.py index d0b16c2e..0d0e03ff 100644 --- a/setup.py +++ b/setup.py @@ -34,11 +34,6 @@ "google-api-core >= 2.10.2, <3.0.0dev", "google-auth >=2.13.0", "google-auth-oauthlib >=0.7.0", - # Require 1.27.* because it has a fix for out-of-bounds timestamps. See: - # https://github.com/googleapis/python-bigquery/pull/209 and - # https://github.com/googleapis/python-bigquery-pandas/issues/365 - # Exclude 2.4.* because it has a bug where waiting for the query can hang - # indefinitely. https://github.com/pydata/pandas-gbq/issues/343 "google-cloud-bigquery >=3.3.5,<4.0.0dev,!=2.4.*", "google-cloud-bigquery-storage >=2.16.2,<3.0.0dev", "packaging >=20.0.0", diff --git a/tests/unit/test_gbq.py b/tests/unit/test_gbq.py index ba620686..06439d8e 100644 --- a/tests/unit/test_gbq.py +++ b/tests/unit/test_gbq.py @@ -4,12 +4,10 @@ # -*- coding: utf-8 -*- -import concurrent.futures import copy import datetime from unittest import mock -import freezegun import google.api_core.exceptions import numpy import pandas @@ -135,42 +133,6 @@ def test__transform_read_gbq_configuration_makes_copy(original, expected): assert did_change == should_change -def test__wait_for_query_job_exits_when_done(mock_bigquery_client): - connector = _make_connector() - connector.client = mock_bigquery_client - connector.start = datetime.datetime(2020, 1, 1).timestamp() - - mock_query = mock.create_autospec(google.cloud.bigquery.QueryJob) - type(mock_query).state = mock.PropertyMock(side_effect=("RUNNING", "DONE")) - mock_query.result.side_effect = concurrent.futures.TimeoutError("fake timeout") - - with freezegun.freeze_time("2020-01-01 00:00:00", tick=False): - connector._wait_for_query_job(mock_query, 60) - - mock_bigquery_client.cancel_job.assert_not_called() - - -def test__wait_for_query_job_cancels_after_timeout(mock_bigquery_client): - connector = _make_connector() - connector.client = mock_bigquery_client - connector.start = datetime.datetime(2020, 1, 1).timestamp() - - mock_query = mock.create_autospec(google.cloud.bigquery.QueryJob) - mock_query.job_id = "a-random-id" - mock_query.location = "job-location" - mock_query.state = "RUNNING" - mock_query.result.side_effect = concurrent.futures.TimeoutError("fake timeout") - - with freezegun.freeze_time( - "2020-01-01 00:00:00", auto_tick_seconds=15 - ), pytest.raises(gbq.QueryTimeout): - connector._wait_for_query_job(mock_query, 60) - - mock_bigquery_client.cancel_job.assert_called_with( - "a-random-id", location="job-location" - ) - - def test_GbqConnector_get_client_w_new_bq(mock_bigquery_client): gbq._test_google_api_imports() pytest.importorskip("google.api_core.client_info") @@ -519,10 +481,10 @@ def test_read_gbq_with_max_results_zero(monkeypatch): assert df is None -def test_read_gbq_with_max_results_ten(monkeypatch, mock_bigquery_client): +def test_read_gbq_with_max_results_ten(monkeypatch, mock_query_job): df = gbq.read_gbq("SELECT 1", dialect="standard", max_results=10) assert df is not None - mock_bigquery_client.list_rows.assert_called_with(mock.ANY, max_results=10) + mock_query_job.result.assert_called_with(max_results=10) @pytest.mark.parametrize(["verbose"], [(True,), (False,)]) @@ -833,28 +795,6 @@ def test_read_gbq_with_list_rows_error_translates_exception( ) -@pytest.mark.parametrize( - ["size_in_bytes", "formatted_text"], - [ - (999, "999.0 B"), - (1024, "1.0 KB"), - (1099, "1.1 KB"), - (1044480, "1020.0 KB"), - (1048576, "1.0 MB"), - (1048576000, "1000.0 MB"), - (1073741824, "1.0 GB"), - (1.099512e12, "1.0 TB"), - (1.125900e15, "1.0 PB"), - (1.152922e18, "1.0 EB"), - (1.180592e21, "1.0 ZB"), - (1.208926e24, "1.0 YB"), - (1.208926e28, "10000.0 YB"), - ], -) -def test_query_response_bytes(size_in_bytes, formatted_text): - assert gbq.GbqConnector.sizeof_fmt(size_in_bytes) == formatted_text - - def test_run_query_with_dml_query(mock_bigquery_client, mock_query_job): """ Don't attempt to download results from a DML query / query with no results. diff --git a/tests/unit/test_query.py b/tests/unit/test_query.py new file mode 100644 index 00000000..b9a64535 --- /dev/null +++ b/tests/unit/test_query.py @@ -0,0 +1,83 @@ +# Copyright (c) 2017 pandas-gbq Authors All rights reserved. +# Use of this source code is governed by a BSD-style +# license that can be found in the LICENSE file. + +from __future__ import annotations + +import datetime +import concurrent.futures +from unittest import mock + +import freezegun +import google.cloud.bigquery +import pytest + +import pandas_gbq.exceptions +import pandas_gbq.gbq +import pandas_gbq.query as module_under_test + + +def _make_connector(project_id: str = "some-project", **kwargs): + return pandas_gbq.gbq.GbqConnector(project_id, **kwargs) + + +@pytest.mark.parametrize( + ["size_in_bytes", "formatted_text"], + [ + (999, "999.0 B"), + (1024, "1.0 KB"), + (1099, "1.1 KB"), + (1044480, "1020.0 KB"), + (1048576, "1.0 MB"), + (1048576000, "1000.0 MB"), + (1073741824, "1.0 GB"), + (1.099512e12, "1.0 TB"), + (1.125900e15, "1.0 PB"), + (1.152922e18, "1.0 EB"), + (1.180592e21, "1.0 ZB"), + (1.208926e24, "1.0 YB"), + (1.208926e28, "10000.0 YB"), + ], +) +def test_query_response_bytes(size_in_bytes, formatted_text): + assert module_under_test.sizeof_fmt(size_in_bytes) == formatted_text + + +def test__wait_for_query_job_exits_when_done(mock_bigquery_client): + connector = _make_connector() + connector.client = mock_bigquery_client + connector.start = datetime.datetime(2020, 1, 1).timestamp() + + mock_query = mock.create_autospec(google.cloud.bigquery.QueryJob) + type(mock_query).state = mock.PropertyMock(side_effect=("RUNNING", "DONE")) + mock_query.result.side_effect = concurrent.futures.TimeoutError("fake timeout") + + with freezegun.freeze_time("2020-01-01 00:00:00", tick=False): + module_under_test._wait_for_query_job( + connector, mock_bigquery_client, mock_query, 60 + ) + + mock_bigquery_client.cancel_job.assert_not_called() + + +def test__wait_for_query_job_cancels_after_timeout(mock_bigquery_client): + connector = _make_connector() + connector.client = mock_bigquery_client + connector.start = datetime.datetime(2020, 1, 1).timestamp() + + mock_query = mock.create_autospec(google.cloud.bigquery.QueryJob) + mock_query.job_id = "a-random-id" + mock_query.location = "job-location" + mock_query.state = "RUNNING" + mock_query.result.side_effect = concurrent.futures.TimeoutError("fake timeout") + + with freezegun.freeze_time( + "2020-01-01 00:00:00", auto_tick_seconds=15 + ), pytest.raises(pandas_gbq.exceptions.QueryTimeout): + module_under_test._wait_for_query_job( + connector, mock_bigquery_client, mock_query, 60 + ) + + mock_bigquery_client.cancel_job.assert_called_with( + "a-random-id", location="job-location" + )