Skip to content

ENH: Add location parameter to read_gbq and to_gbq #185

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jun 11, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ci/requirements-3.5-0.18.1.pip
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
google-auth==1.4.1
google-auth-oauthlib==0.0.1
google-cloud-bigquery==0.29.0
google-cloud-bigquery==0.32.0
2 changes: 2 additions & 0 deletions docs/source/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ Changelog
- Project ID parameter is optional in ``read_gbq`` and ``to_gbq`` when it can
inferred from the environment. Note: you must still pass in a project ID when
using user-based authentication. (:issue:`103`)
- Add location parameter to ``read_gbq`` and ``to_gbq`` so that pandas-gbq
can work with datasets in the Tokyo region. (:issue:`177`)
- Progress bar added for ``to_gbq``, through an optional library `tqdm` as
dependency. (:issue:`162`)

Expand Down
52 changes: 34 additions & 18 deletions pandas_gbq/gbq.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def _check_google_client_version():
raise ImportError('Could not import pkg_resources (setuptools).')

# https://github.com/GoogleCloudPlatform/google-cloud-python/blob/master/bigquery/CHANGELOG.md
bigquery_minimum_version = pkg_resources.parse_version('0.29.0')
bigquery_minimum_version = pkg_resources.parse_version('0.32.0')
BIGQUERY_INSTALLED_VERSION = pkg_resources.get_distribution(
'google-cloud-bigquery').parsed_version

Expand Down Expand Up @@ -152,12 +152,13 @@ class GbqConnector(object):

def __init__(self, project_id, reauth=False,
private_key=None, auth_local_webserver=False,
dialect='legacy'):
dialect='legacy', location=None):
from google.api_core.exceptions import GoogleAPIError
from google.api_core.exceptions import ClientError
from pandas_gbq import auth
self.http_error = (ClientError, GoogleAPIError)
self.project_id = project_id
self.location = location
self.reauth = reauth
self.private_key = private_key
self.auth_local_webserver = auth_local_webserver
Expand Down Expand Up @@ -215,9 +216,9 @@ def process_http_error(ex):
raise GenericGBQException("Reason: {0}".format(ex))

def run_query(self, query, **kwargs):
from google.auth.exceptions import RefreshError
from concurrent.futures import TimeoutError
import pandas_gbq.query
from google.auth.exceptions import RefreshError
from google.cloud import bigquery

job_config = {
'query': {
Expand All @@ -243,8 +244,8 @@ def run_query(self, query, **kwargs):
logger.info('Requesting query... ')
query_reply = self.client.query(
query,
job_config=pandas_gbq.query.query_config(
job_config, BIGQUERY_INSTALLED_VERSION))
job_config=bigquery.QueryJobConfig.from_api_repr(job_config),
location=self.location)
logger.info('ok.\nQuery running...')
except (RefreshError, ValueError):
if self.private_key:
Expand Down Expand Up @@ -319,7 +320,7 @@ def load_data(
try:
chunks = load.load_chunks(self.client, dataframe, dataset_id,
table_id, chunksize=chunksize,
schema=schema)
schema=schema, location=self.location)
if progress_bar and tqdm:
chunks = tqdm.tqdm(chunks)
for remaining_rows in chunks:
Expand Down Expand Up @@ -470,7 +471,8 @@ def _parse_data(schema, rows):

def read_gbq(query, project_id=None, index_col=None, col_order=None,
reauth=False, verbose=None, private_key=None,
auth_local_webserver=False, dialect='legacy', **kwargs):
auth_local_webserver=False, dialect='legacy', location=None,
configuration=None):
r"""Load data from Google BigQuery using google-cloud-python

The main method a user calls to execute a Query in Google BigQuery
Expand Down Expand Up @@ -520,17 +522,23 @@ def read_gbq(query, project_id=None, index_col=None, col_order=None,
compliant with the SQL 2011 standard. For more information
see `BigQuery SQL Reference
<https://cloud.google.com/bigquery/sql-reference/>`__
verbose : None, deprecated

**kwargs : Arbitrary keyword arguments
configuration (dict): query config parameters for job processing.
location : str (optional)
Location where the query job should run. See the `BigQuery locations
<https://cloud.google.com/bigquery/docs/dataset-locations>
documentation`__ for a list of available locations. The location must
match that of any datasets used in the query.
.. versionadded:: 0.5.0
configuration : dict (optional)
Query config parameters for job processing.
For example:

configuration = {'query': {'useQueryCache': False}}

For more information see `BigQuery SQL Reference
<https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.query>`__

verbose : None, deprecated

Returns
-------
df: DataFrame
Expand All @@ -550,9 +558,9 @@ def read_gbq(query, project_id=None, index_col=None, col_order=None,
raise ValueError("'{0}' is not valid for dialect".format(dialect))

connector = GbqConnector(
project_id, reauth=reauth, private_key=private_key,
dialect=dialect, auth_local_webserver=auth_local_webserver)
schema, rows = connector.run_query(query, **kwargs)
project_id, reauth=reauth, private_key=private_key, dialect=dialect,
auth_local_webserver=auth_local_webserver, location=location)
schema, rows = connector.run_query(query, configuration=configuration)
final_df = _parse_data(schema, rows)

# Reindex the DataFrame on the provided column
Expand Down Expand Up @@ -595,7 +603,8 @@ def read_gbq(query, project_id=None, index_col=None, col_order=None,

def to_gbq(dataframe, destination_table, project_id=None, chunksize=None,
verbose=None, reauth=False, if_exists='fail', private_key=None,
auth_local_webserver=False, table_schema=None, progress_bar=True):
auth_local_webserver=False, table_schema=None, location=None,
progress_bar=True):
"""Write a DataFrame to a Google BigQuery table.

The main method a user calls to export pandas DataFrame contents to
Expand Down Expand Up @@ -648,9 +657,16 @@ def to_gbq(dataframe, destination_table, project_id=None, chunksize=None,
of DataFrame columns. See BigQuery API documentation on available
names of a field.
.. versionadded:: 0.3.1
verbose : None, deprecated
location : str (optional)
Location where the load job should run. See the `BigQuery locations
<https://cloud.google.com/bigquery/docs/dataset-locations>
documentation`__ for a list of available locations. The location must
match that of the target dataset.
.. versionadded:: 0.5.0
progress_bar : boolean, True by default. It uses the library `tqdm` to show
the progress bar for the upload, chunk by chunk.
.. versionadded:: 0.5.0
verbose : None, deprecated
"""

_test_google_api_imports()
Expand All @@ -670,7 +686,7 @@ def to_gbq(dataframe, destination_table, project_id=None, chunksize=None,

connector = GbqConnector(
project_id, reauth=reauth, private_key=private_key,
auth_local_webserver=auth_local_webserver)
auth_local_webserver=auth_local_webserver, location=location)
dataset_id, table_id = destination_table.rsplit('.', 1)

table = _Table(project_id, dataset_id, reauth=reauth,
Expand Down
6 changes: 4 additions & 2 deletions pandas_gbq/load.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ def encode_chunks(dataframe, chunksize=None):


def load_chunks(
client, dataframe, dataset_id, table_id, chunksize=None, schema=None):
client, dataframe, dataset_id, table_id, chunksize=None, schema=None,
location=None):
destination_table = client.dataset(dataset_id).table(table_id)
job_config = bigquery.LoadJobConfig()
job_config.write_disposition = 'WRITE_APPEND'
Expand All @@ -71,4 +72,5 @@ def load_chunks(
client.load_table_from_file(
chunk_buffer,
destination_table,
job_config=job_config).result()
job_config=job_config,
location=location).result()
25 changes: 0 additions & 25 deletions pandas_gbq/query.py

This file was deleted.

2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def readme():
'pandas',
'google-auth',
'google-auth-oauthlib',
'google-cloud-bigquery>=0.29.0',
'google-cloud-bigquery>=0.32.0',
]

extras = {
Expand Down
6 changes: 3 additions & 3 deletions tests/system/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@
import pytest


@pytest.fixture
@pytest.fixture(scope='session')
def project_id():
return (os.environ.get('GBQ_PROJECT_ID')
or os.environ.get('GOOGLE_CLOUD_PROJECT')) # noqa


@pytest.fixture
@pytest.fixture(scope='session')
def private_key_path():
path = None
if 'TRAVIS_BUILD_DIR' in os.environ:
Expand All @@ -36,7 +36,7 @@ def private_key_path():
return path


@pytest.fixture
@pytest.fixture(scope='session')
def private_key_contents(private_key_path):
if private_key_path is None:
return None
Expand Down
112 changes: 68 additions & 44 deletions tests/system/test_gbq.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import sys
from datetime import datetime
from random import randint
from time import sleep

import numpy as np
import pandas.util.testing as tm
Expand Down Expand Up @@ -50,46 +49,46 @@ def gbq_connector(project, credentials):
return gbq.GbqConnector(project, private_key=credentials)


def clean_gbq_environment(dataset_prefix, private_key=None, project_id=None):
dataset = gbq._Dataset(project_id, private_key=private_key)
all_datasets = dataset.datasets()

retry = 3
while retry > 0:
try:
retry = retry - 1
for i in range(1, 10):
dataset_id = dataset_prefix + str(i)
if dataset_id in all_datasets:
table = gbq._Table(project_id, dataset_id,
private_key=private_key)

# Table listing is eventually consistent, so loop until
# all tables no longer appear (max 30 seconds).
table_retry = 30
all_tables = dataset.tables(dataset_id)
while all_tables and table_retry > 0:
for table_id in all_tables:
try:
table.delete(table_id)
except gbq.NotFoundException:
pass
sleep(1)
table_retry = table_retry - 1
all_tables = dataset.tables(dataset_id)

dataset.delete(dataset_id)
retry = 0
except gbq.GenericGBQException as ex:
# Build in retry logic to work around the following errors :
# An internal error occurred and the request could not be...
# Dataset ... is still in use
error_message = str(ex).lower()
if ('an internal error occurred' in error_message or
'still in use' in error_message) and retry > 0:
sleep(30)
else:
raise ex
@pytest.fixture(scope='module')
def bigquery_client(project_id, private_key_path):
from google.cloud import bigquery
return bigquery.Client.from_service_account_json(
private_key_path, project=project_id)


@pytest.fixture(scope='module')
def tokyo_dataset(bigquery_client):
from google.cloud import bigquery
dataset_id = 'tokyo_{}'.format(_get_dataset_prefix_random())
dataset_ref = bigquery_client.dataset(dataset_id)
dataset = bigquery.Dataset(dataset_ref)
dataset.location = 'asia-northeast1'
bigquery_client.create_dataset(dataset)
yield dataset_id
bigquery_client.delete_dataset(dataset_ref, delete_contents=True)


@pytest.fixture(scope='module')
def tokyo_table(bigquery_client, tokyo_dataset):
table_id = 'tokyo_table'
# Create a random table using DDL.
# https://github.com/GoogleCloudPlatform/golang-samples/blob/2ab2c6b79a1ea3d71d8f91609b57a8fbde07ae5d/bigquery/snippets/snippet.go#L739
bigquery_client.query(
"""CREATE TABLE {}.{}
AS SELECT
2000 + CAST(18 * RAND() as INT64) as year,
IF(RAND() > 0.5,"foo","bar") as token
FROM UNNEST(GENERATE_ARRAY(0,5,1)) as r
""".format(tokyo_dataset, table_id),
location='asia-northeast1').result()
return table_id
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome!



def clean_gbq_environment(dataset_prefix, bigquery_client):
for dataset in bigquery_client.list_datasets():
if not dataset.dataset_id.startswith(dataset_prefix):
continue
bigquery_client.delete_dataset(dataset.reference, delete_contents=True)


def make_mixed_dataframe_v2(test_size):
Expand Down Expand Up @@ -640,6 +639,16 @@ def test_array_of_floats(self, private_key_path, project_id):
tm.assert_frame_equal(df, DataFrame([[[1.1, 2.2, 3.3], 4]],
columns=["a", "b"]))

def test_tokyo(self, tokyo_dataset, tokyo_table, private_key_path):
df = gbq.read_gbq(
'SELECT MAX(year) AS max_year FROM {}.{}'.format(
tokyo_dataset, tokyo_table),
dialect='standard',
location='asia-northeast1',
private_key=private_key_path)
print(df)
assert df['max_year'][0] >= 2000


class TestToGBQIntegration(object):
# Changes to BigQuery table schema may take up to 2 minutes as of May 2015
Expand All @@ -649,13 +658,13 @@ class TestToGBQIntegration(object):
# <https://code.google.com/p/google-bigquery/issues/detail?id=191>`__

@pytest.fixture(autouse=True, scope='function')
def setup(self, project, credentials):
def setup(self, project, credentials, bigquery_client):
# - PER-TEST FIXTURES -
# put here any instruction you want to be run *BEFORE* *EVERY* test is
# executed.

self.dataset_prefix = _get_dataset_prefix_random()
clean_gbq_environment(self.dataset_prefix, credentials, project)
clean_gbq_environment(self.dataset_prefix, bigquery_client)
self.dataset = gbq._Dataset(project,
private_key=credentials)
self.table = gbq._Table(project, self.dataset_prefix + "1",
Expand All @@ -667,7 +676,7 @@ def setup(self, project, credentials):
self.dataset.create(self.dataset_prefix + "1")
self.credentials = credentials
yield
clean_gbq_environment(self.dataset_prefix, self.credentials, project)
clean_gbq_environment(self.dataset_prefix, bigquery_client)

def test_upload_data(self, project_id):
test_id = "1"
Expand Down Expand Up @@ -1192,6 +1201,21 @@ def test_upload_data_with_different_df_and_user_schema(self, project_id):
assert self.table.verify_schema(dataset, table,
dict(fields=test_schema))

def test_upload_data_tokyo(
self, project_id, tokyo_dataset, bigquery_client):
test_size = 10
df = make_mixed_dataframe_v2(test_size)
tokyo_destination = '{}.to_gbq_test'.format(tokyo_dataset)

# Initialize table with sample data
gbq.to_gbq(
df, tokyo_destination, project_id, private_key=self.credentials,
location='asia-northeast1')

table = bigquery_client.get_table(
bigquery_client.dataset(tokyo_dataset).table('to_gbq_test'))
assert table.num_rows > 0

def test_list_dataset(self):
dataset_id = self.dataset_prefix + "1"
assert dataset_id in self.dataset.datasets()
Expand Down
Loading