diff --git a/ci/requirements-3.5-0.18.1.pip b/ci/requirements-3.5-0.18.1.pip index dd33895c..dc651977 100644 --- a/ci/requirements-3.5-0.18.1.pip +++ b/ci/requirements-3.5-0.18.1.pip @@ -1,3 +1,3 @@ google-auth==1.4.1 google-auth-oauthlib==0.0.1 -google-cloud-bigquery==0.29.0 +google-cloud-bigquery==0.32.0 diff --git a/docs/source/changelog.rst b/docs/source/changelog.rst index 7864d81e..3d6fcedd 100644 --- a/docs/source/changelog.rst +++ b/docs/source/changelog.rst @@ -7,6 +7,8 @@ Changelog - Project ID parameter is optional in ``read_gbq`` and ``to_gbq`` when it can inferred from the environment. Note: you must still pass in a project ID when using user-based authentication. (:issue:`103`) +- Add location parameter to ``read_gbq`` and ``to_gbq`` so that pandas-gbq + can work with datasets in the Tokyo region. (:issue:`177`) - Progress bar added for ``to_gbq``, through an optional library `tqdm` as dependency. (:issue:`162`) diff --git a/pandas_gbq/gbq.py b/pandas_gbq/gbq.py index 5fb53bba..db5c435c 100644 --- a/pandas_gbq/gbq.py +++ b/pandas_gbq/gbq.py @@ -34,7 +34,7 @@ def _check_google_client_version(): raise ImportError('Could not import pkg_resources (setuptools).') # https://github.com/GoogleCloudPlatform/google-cloud-python/blob/master/bigquery/CHANGELOG.md - bigquery_minimum_version = pkg_resources.parse_version('0.29.0') + bigquery_minimum_version = pkg_resources.parse_version('0.32.0') BIGQUERY_INSTALLED_VERSION = pkg_resources.get_distribution( 'google-cloud-bigquery').parsed_version @@ -152,12 +152,13 @@ class GbqConnector(object): def __init__(self, project_id, reauth=False, private_key=None, auth_local_webserver=False, - dialect='legacy'): + dialect='legacy', location=None): from google.api_core.exceptions import GoogleAPIError from google.api_core.exceptions import ClientError from pandas_gbq import auth self.http_error = (ClientError, GoogleAPIError) self.project_id = project_id + self.location = location self.reauth = reauth self.private_key = private_key self.auth_local_webserver = auth_local_webserver @@ -215,9 +216,9 @@ def process_http_error(ex): raise GenericGBQException("Reason: {0}".format(ex)) def run_query(self, query, **kwargs): - from google.auth.exceptions import RefreshError from concurrent.futures import TimeoutError - import pandas_gbq.query + from google.auth.exceptions import RefreshError + from google.cloud import bigquery job_config = { 'query': { @@ -243,8 +244,8 @@ def run_query(self, query, **kwargs): logger.info('Requesting query... ') query_reply = self.client.query( query, - job_config=pandas_gbq.query.query_config( - job_config, BIGQUERY_INSTALLED_VERSION)) + job_config=bigquery.QueryJobConfig.from_api_repr(job_config), + location=self.location) logger.info('ok.\nQuery running...') except (RefreshError, ValueError): if self.private_key: @@ -319,7 +320,7 @@ def load_data( try: chunks = load.load_chunks(self.client, dataframe, dataset_id, table_id, chunksize=chunksize, - schema=schema) + schema=schema, location=self.location) if progress_bar and tqdm: chunks = tqdm.tqdm(chunks) for remaining_rows in chunks: @@ -470,7 +471,8 @@ def _parse_data(schema, rows): def read_gbq(query, project_id=None, index_col=None, col_order=None, reauth=False, verbose=None, private_key=None, - auth_local_webserver=False, dialect='legacy', **kwargs): + auth_local_webserver=False, dialect='legacy', location=None, + configuration=None): r"""Load data from Google BigQuery using google-cloud-python The main method a user calls to execute a Query in Google BigQuery @@ -520,10 +522,14 @@ def read_gbq(query, project_id=None, index_col=None, col_order=None, compliant with the SQL 2011 standard. For more information see `BigQuery SQL Reference `__ - verbose : None, deprecated - - **kwargs : Arbitrary keyword arguments - configuration (dict): query config parameters for job processing. + location : str (optional) + Location where the query job should run. See the `BigQuery locations + + documentation`__ for a list of available locations. The location must + match that of any datasets used in the query. + .. versionadded:: 0.5.0 + configuration : dict (optional) + Query config parameters for job processing. For example: configuration = {'query': {'useQueryCache': False}} @@ -531,6 +537,8 @@ def read_gbq(query, project_id=None, index_col=None, col_order=None, For more information see `BigQuery SQL Reference `__ + verbose : None, deprecated + Returns ------- df: DataFrame @@ -550,9 +558,9 @@ def read_gbq(query, project_id=None, index_col=None, col_order=None, raise ValueError("'{0}' is not valid for dialect".format(dialect)) connector = GbqConnector( - project_id, reauth=reauth, private_key=private_key, - dialect=dialect, auth_local_webserver=auth_local_webserver) - schema, rows = connector.run_query(query, **kwargs) + project_id, reauth=reauth, private_key=private_key, dialect=dialect, + auth_local_webserver=auth_local_webserver, location=location) + schema, rows = connector.run_query(query, configuration=configuration) final_df = _parse_data(schema, rows) # Reindex the DataFrame on the provided column @@ -595,7 +603,8 @@ def read_gbq(query, project_id=None, index_col=None, col_order=None, def to_gbq(dataframe, destination_table, project_id=None, chunksize=None, verbose=None, reauth=False, if_exists='fail', private_key=None, - auth_local_webserver=False, table_schema=None, progress_bar=True): + auth_local_webserver=False, table_schema=None, location=None, + progress_bar=True): """Write a DataFrame to a Google BigQuery table. The main method a user calls to export pandas DataFrame contents to @@ -648,9 +657,16 @@ def to_gbq(dataframe, destination_table, project_id=None, chunksize=None, of DataFrame columns. See BigQuery API documentation on available names of a field. .. versionadded:: 0.3.1 - verbose : None, deprecated + location : str (optional) + Location where the load job should run. See the `BigQuery locations + + documentation`__ for a list of available locations. The location must + match that of the target dataset. + .. versionadded:: 0.5.0 progress_bar : boolean, True by default. It uses the library `tqdm` to show the progress bar for the upload, chunk by chunk. + .. versionadded:: 0.5.0 + verbose : None, deprecated """ _test_google_api_imports() @@ -670,7 +686,7 @@ def to_gbq(dataframe, destination_table, project_id=None, chunksize=None, connector = GbqConnector( project_id, reauth=reauth, private_key=private_key, - auth_local_webserver=auth_local_webserver) + auth_local_webserver=auth_local_webserver, location=location) dataset_id, table_id = destination_table.rsplit('.', 1) table = _Table(project_id, dataset_id, reauth=reauth, diff --git a/pandas_gbq/load.py b/pandas_gbq/load.py index 2adb59f6..a7d53f89 100644 --- a/pandas_gbq/load.py +++ b/pandas_gbq/load.py @@ -44,7 +44,8 @@ def encode_chunks(dataframe, chunksize=None): def load_chunks( - client, dataframe, dataset_id, table_id, chunksize=None, schema=None): + client, dataframe, dataset_id, table_id, chunksize=None, schema=None, + location=None): destination_table = client.dataset(dataset_id).table(table_id) job_config = bigquery.LoadJobConfig() job_config.write_disposition = 'WRITE_APPEND' @@ -71,4 +72,5 @@ def load_chunks( client.load_table_from_file( chunk_buffer, destination_table, - job_config=job_config).result() + job_config=job_config, + location=location).result() diff --git a/pandas_gbq/query.py b/pandas_gbq/query.py deleted file mode 100644 index 864bbb37..00000000 --- a/pandas_gbq/query.py +++ /dev/null @@ -1,25 +0,0 @@ - -import pkg_resources -from google.cloud import bigquery - - -# Version with query config breaking change. -BIGQUERY_CONFIG_VERSION = pkg_resources.parse_version('0.32.0.dev1') - - -def query_config_old_version(resource): - # Verify that we got a query resource. In newer versions of - # google-cloud-bigquery enough of the configuration is passed on to the - # backend that we can expect a backend validation error instead. - if len(resource) != 1: - raise ValueError("Only one job type must be specified, but " - "given {}".format(','.join(resource.keys()))) - if 'query' not in resource: - raise ValueError("Only 'query' job type is supported") - return bigquery.QueryJobConfig.from_api_repr(resource['query']) - - -def query_config(resource, installed_version): - if installed_version < BIGQUERY_CONFIG_VERSION: - return query_config_old_version(resource) - return bigquery.QueryJobConfig.from_api_repr(resource) diff --git a/setup.py b/setup.py index 40cfa427..bac3e5fe 100644 --- a/setup.py +++ b/setup.py @@ -21,7 +21,7 @@ def readme(): 'pandas', 'google-auth', 'google-auth-oauthlib', - 'google-cloud-bigquery>=0.29.0', + 'google-cloud-bigquery>=0.32.0', ] extras = { diff --git a/tests/system/conftest.py b/tests/system/conftest.py index 18132e9a..fdd4d3f6 100644 --- a/tests/system/conftest.py +++ b/tests/system/conftest.py @@ -6,13 +6,13 @@ import pytest -@pytest.fixture +@pytest.fixture(scope='session') def project_id(): return (os.environ.get('GBQ_PROJECT_ID') or os.environ.get('GOOGLE_CLOUD_PROJECT')) # noqa -@pytest.fixture +@pytest.fixture(scope='session') def private_key_path(): path = None if 'TRAVIS_BUILD_DIR' in os.environ: @@ -36,7 +36,7 @@ def private_key_path(): return path -@pytest.fixture +@pytest.fixture(scope='session') def private_key_contents(private_key_path): if private_key_path is None: return None diff --git a/tests/system/test_gbq.py b/tests/system/test_gbq.py index 6cb792af..39b2f4ee 100644 --- a/tests/system/test_gbq.py +++ b/tests/system/test_gbq.py @@ -3,7 +3,6 @@ import sys from datetime import datetime from random import randint -from time import sleep import numpy as np import pandas.util.testing as tm @@ -50,46 +49,46 @@ def gbq_connector(project, credentials): return gbq.GbqConnector(project, private_key=credentials) -def clean_gbq_environment(dataset_prefix, private_key=None, project_id=None): - dataset = gbq._Dataset(project_id, private_key=private_key) - all_datasets = dataset.datasets() - - retry = 3 - while retry > 0: - try: - retry = retry - 1 - for i in range(1, 10): - dataset_id = dataset_prefix + str(i) - if dataset_id in all_datasets: - table = gbq._Table(project_id, dataset_id, - private_key=private_key) - - # Table listing is eventually consistent, so loop until - # all tables no longer appear (max 30 seconds). - table_retry = 30 - all_tables = dataset.tables(dataset_id) - while all_tables and table_retry > 0: - for table_id in all_tables: - try: - table.delete(table_id) - except gbq.NotFoundException: - pass - sleep(1) - table_retry = table_retry - 1 - all_tables = dataset.tables(dataset_id) - - dataset.delete(dataset_id) - retry = 0 - except gbq.GenericGBQException as ex: - # Build in retry logic to work around the following errors : - # An internal error occurred and the request could not be... - # Dataset ... is still in use - error_message = str(ex).lower() - if ('an internal error occurred' in error_message or - 'still in use' in error_message) and retry > 0: - sleep(30) - else: - raise ex +@pytest.fixture(scope='module') +def bigquery_client(project_id, private_key_path): + from google.cloud import bigquery + return bigquery.Client.from_service_account_json( + private_key_path, project=project_id) + + +@pytest.fixture(scope='module') +def tokyo_dataset(bigquery_client): + from google.cloud import bigquery + dataset_id = 'tokyo_{}'.format(_get_dataset_prefix_random()) + dataset_ref = bigquery_client.dataset(dataset_id) + dataset = bigquery.Dataset(dataset_ref) + dataset.location = 'asia-northeast1' + bigquery_client.create_dataset(dataset) + yield dataset_id + bigquery_client.delete_dataset(dataset_ref, delete_contents=True) + + +@pytest.fixture(scope='module') +def tokyo_table(bigquery_client, tokyo_dataset): + table_id = 'tokyo_table' + # Create a random table using DDL. + # https://github.com/GoogleCloudPlatform/golang-samples/blob/2ab2c6b79a1ea3d71d8f91609b57a8fbde07ae5d/bigquery/snippets/snippet.go#L739 + bigquery_client.query( + """CREATE TABLE {}.{} + AS SELECT + 2000 + CAST(18 * RAND() as INT64) as year, + IF(RAND() > 0.5,"foo","bar") as token + FROM UNNEST(GENERATE_ARRAY(0,5,1)) as r + """.format(tokyo_dataset, table_id), + location='asia-northeast1').result() + return table_id + + +def clean_gbq_environment(dataset_prefix, bigquery_client): + for dataset in bigquery_client.list_datasets(): + if not dataset.dataset_id.startswith(dataset_prefix): + continue + bigquery_client.delete_dataset(dataset.reference, delete_contents=True) def make_mixed_dataframe_v2(test_size): @@ -640,6 +639,16 @@ def test_array_of_floats(self, private_key_path, project_id): tm.assert_frame_equal(df, DataFrame([[[1.1, 2.2, 3.3], 4]], columns=["a", "b"])) + def test_tokyo(self, tokyo_dataset, tokyo_table, private_key_path): + df = gbq.read_gbq( + 'SELECT MAX(year) AS max_year FROM {}.{}'.format( + tokyo_dataset, tokyo_table), + dialect='standard', + location='asia-northeast1', + private_key=private_key_path) + print(df) + assert df['max_year'][0] >= 2000 + class TestToGBQIntegration(object): # Changes to BigQuery table schema may take up to 2 minutes as of May 2015 @@ -649,13 +658,13 @@ class TestToGBQIntegration(object): # `__ @pytest.fixture(autouse=True, scope='function') - def setup(self, project, credentials): + def setup(self, project, credentials, bigquery_client): # - PER-TEST FIXTURES - # put here any instruction you want to be run *BEFORE* *EVERY* test is # executed. self.dataset_prefix = _get_dataset_prefix_random() - clean_gbq_environment(self.dataset_prefix, credentials, project) + clean_gbq_environment(self.dataset_prefix, bigquery_client) self.dataset = gbq._Dataset(project, private_key=credentials) self.table = gbq._Table(project, self.dataset_prefix + "1", @@ -667,7 +676,7 @@ def setup(self, project, credentials): self.dataset.create(self.dataset_prefix + "1") self.credentials = credentials yield - clean_gbq_environment(self.dataset_prefix, self.credentials, project) + clean_gbq_environment(self.dataset_prefix, bigquery_client) def test_upload_data(self, project_id): test_id = "1" @@ -1192,6 +1201,21 @@ def test_upload_data_with_different_df_and_user_schema(self, project_id): assert self.table.verify_schema(dataset, table, dict(fields=test_schema)) + def test_upload_data_tokyo( + self, project_id, tokyo_dataset, bigquery_client): + test_size = 10 + df = make_mixed_dataframe_v2(test_size) + tokyo_destination = '{}.to_gbq_test'.format(tokyo_dataset) + + # Initialize table with sample data + gbq.to_gbq( + df, tokyo_destination, project_id, private_key=self.credentials, + location='asia-northeast1') + + table = bigquery_client.get_table( + bigquery_client.dataset(tokyo_dataset).table('to_gbq_test')) + assert table.num_rows > 0 + def test_list_dataset(self): dataset_id = self.dataset_prefix + "1" assert dataset_id in self.dataset.datasets() diff --git a/tests/unit/test_gbq.py b/tests/unit/test_gbq.py index 7b85a562..d3560450 100644 --- a/tests/unit/test_gbq.py +++ b/tests/unit/test_gbq.py @@ -14,6 +14,12 @@ from unittest import mock +@pytest.fixture +def min_bq_version(): + import pkg_resources + return pkg_resources.parse_version('0.32.0') + + @pytest.fixture(autouse=True) def mock_bigquery_client(monkeypatch): from google.api_core.exceptions import NotFound @@ -109,9 +115,8 @@ def test_to_gbq_with_no_project_id_given_should_fail(monkeypatch): assert 'Could not determine project ID' in str(exception) -def test_to_gbq_with_verbose_new_pandas_warns_deprecation(): +def test_to_gbq_with_verbose_new_pandas_warns_deprecation(min_bq_version): import pkg_resources - min_bq_version = pkg_resources.parse_version('0.29.0') pandas_version = pkg_resources.parse_version('0.23.0') with pytest.warns(FutureWarning), \ mock.patch( @@ -128,9 +133,8 @@ def test_to_gbq_with_verbose_new_pandas_warns_deprecation(): pass -def test_to_gbq_with_not_verbose_new_pandas_warns_deprecation(): +def test_to_gbq_with_not_verbose_new_pandas_warns_deprecation(min_bq_version): import pkg_resources - min_bq_version = pkg_resources.parse_version('0.29.0') pandas_version = pkg_resources.parse_version('0.23.0') with pytest.warns(FutureWarning), \ mock.patch( @@ -147,9 +151,8 @@ def test_to_gbq_with_not_verbose_new_pandas_warns_deprecation(): pass -def test_to_gbq_wo_verbose_w_new_pandas_no_warnings(recwarn): +def test_to_gbq_wo_verbose_w_new_pandas_no_warnings(recwarn, min_bq_version): import pkg_resources - min_bq_version = pkg_resources.parse_version('0.29.0') pandas_version = pkg_resources.parse_version('0.23.0') with mock.patch( 'pkg_resources.Distribution.parsed_version', @@ -163,9 +166,8 @@ def test_to_gbq_wo_verbose_w_new_pandas_no_warnings(recwarn): assert len(recwarn) == 0 -def test_to_gbq_with_verbose_old_pandas_no_warnings(recwarn): +def test_to_gbq_with_verbose_old_pandas_no_warnings(recwarn, min_bq_version): import pkg_resources - min_bq_version = pkg_resources.parse_version('0.29.0') pandas_version = pkg_resources.parse_version('0.22.0') with mock.patch( 'pkg_resources.Distribution.parsed_version', @@ -240,9 +242,8 @@ def test_read_gbq_with_corrupted_private_key_json_should_fail(): 'SELECT 1', project_id='x', private_key='99999999999999999') -def test_read_gbq_with_verbose_new_pandas_warns_deprecation(): +def test_read_gbq_with_verbose_new_pandas_warns_deprecation(min_bq_version): import pkg_resources - min_bq_version = pkg_resources.parse_version('0.29.0') pandas_version = pkg_resources.parse_version('0.23.0') with pytest.warns(FutureWarning), \ mock.patch( @@ -252,9 +253,9 @@ def test_read_gbq_with_verbose_new_pandas_warns_deprecation(): gbq.read_gbq('SELECT 1', project_id='my-project', verbose=True) -def test_read_gbq_with_not_verbose_new_pandas_warns_deprecation(): +def test_read_gbq_with_not_verbose_new_pandas_warns_deprecation( + min_bq_version): import pkg_resources - min_bq_version = pkg_resources.parse_version('0.29.0') pandas_version = pkg_resources.parse_version('0.23.0') with pytest.warns(FutureWarning), \ mock.patch( @@ -264,9 +265,8 @@ def test_read_gbq_with_not_verbose_new_pandas_warns_deprecation(): gbq.read_gbq('SELECT 1', project_id='my-project', verbose=False) -def test_read_gbq_wo_verbose_w_new_pandas_no_warnings(recwarn): +def test_read_gbq_wo_verbose_w_new_pandas_no_warnings(recwarn, min_bq_version): import pkg_resources - min_bq_version = pkg_resources.parse_version('0.29.0') pandas_version = pkg_resources.parse_version('0.23.0') with mock.patch( 'pkg_resources.Distribution.parsed_version', @@ -276,9 +276,8 @@ def test_read_gbq_wo_verbose_w_new_pandas_no_warnings(recwarn): assert len(recwarn) == 0 -def test_read_gbq_with_verbose_old_pandas_no_warnings(recwarn): +def test_read_gbq_with_verbose_old_pandas_no_warnings(recwarn, min_bq_version): import pkg_resources - min_bq_version = pkg_resources.parse_version('0.29.0') pandas_version = pkg_resources.parse_version('0.22.0') with mock.patch( 'pkg_resources.Distribution.parsed_version', diff --git a/tests/unit/test_query.py b/tests/unit/test_query.py deleted file mode 100644 index 0a89dfb9..00000000 --- a/tests/unit/test_query.py +++ /dev/null @@ -1,56 +0,0 @@ - -import pkg_resources - -try: - import mock -except ImportError: # pragma: NO COVER - from unittest import mock - -from pandas_gbq import query - - -@mock.patch('google.cloud.bigquery.QueryJobConfig') -def test_query_config_w_old_bq_version(mock_config): - old_version = pkg_resources.parse_version('0.29.0') - query.query_config({'query': {'useLegacySql': False}}, old_version) - mock_config.from_api_repr.assert_called_once_with({'useLegacySql': False}) - - -@mock.patch('google.cloud.bigquery.QueryJobConfig') -def test_query_config_w_dev_bq_version(mock_config): - dev_version = pkg_resources.parse_version('0.32.0.dev1') - query.query_config( - { - 'query': { - 'useLegacySql': False, - }, - 'labels': {'key': 'value'}, - }, - dev_version) - mock_config.from_api_repr.assert_called_once_with( - { - 'query': { - 'useLegacySql': False, - }, - 'labels': {'key': 'value'}, - }) - - -@mock.patch('google.cloud.bigquery.QueryJobConfig') -def test_query_config_w_new_bq_version(mock_config): - dev_version = pkg_resources.parse_version('1.0.0') - query.query_config( - { - 'query': { - 'useLegacySql': False, - }, - 'labels': {'key': 'value'}, - }, - dev_version) - mock_config.from_api_repr.assert_called_once_with( - { - 'query': { - 'useLegacySql': False, - }, - 'labels': {'key': 'value'}, - })