Skip to content

Commit 73715ac

Browse files
[xy] Improve google sheets source. (mage-ai#4561)
1 parent 3d8f3c8 commit 73715ac

File tree

3 files changed

+145
-82
lines changed

3 files changed

+145
-82
lines changed

mage_integrations/mage_integrations/connections/google_sheets/__init__.py

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,13 @@
1+
from typing import Dict
2+
3+
import backoff
4+
import requests
15
from google.oauth2 import service_account
26
from googleapiclient.discovery import build
7+
38
from mage_integrations.connections.base import Connection
49
from mage_integrations.connections.utils.google import CredentialsInfoType
510
from mage_integrations.utils.dictionary import merge_dict
6-
from typing import Dict
711

812

913
class GoogleSheets(Connection):
@@ -22,11 +26,11 @@ def __init__(
2226

2327
def connect(self):
2428
"""Create a connection to the Google Search Console API and return service object.
25-
29+
2630
Returns:
2731
service (object): Google Search Console service object.
2832
"""
29-
33+
3034
scope = ['https://www.googleapis.com/auth/spreadsheets.readonly']
3135
if self.credentials_info is None:
3236
credentials = service_account.Credentials.from_service_account_file(
@@ -41,12 +45,20 @@ def connect(self):
4145
credentials=credentials
4246
)
4347

48+
@backoff.on_exception(
49+
backoff.expo,
50+
requests.exceptions.HTTPError,
51+
max_tries=5,
52+
factor=2,
53+
)
4454
def load(
4555
self,
4656
spreadsheet_id: str,
4757
value_range: str = None,
48-
opts: Dict = dict(),
58+
opts: Dict = None,
4959
):
60+
if opts is None:
61+
opts = dict()
5062
service = self.connect()
5163
kwargs = dict(
5264
spreadsheetId=spreadsheet_id,
@@ -60,6 +72,12 @@ def load(
6072

6173
return response.get('values', [])
6274

75+
@backoff.on_exception(
76+
backoff.expo,
77+
requests.exceptions.HTTPError,
78+
max_tries=5,
79+
factor=2,
80+
)
6381
def get_spreadsheet_metadata(
6482
self,
6583
spreadsheet_id: str,

mage_integrations/mage_integrations/sources/base.py

Lines changed: 38 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,36 +1,34 @@
1+
import inspect
2+
import json
3+
import os
4+
import sys
5+
import traceback
16
from datetime import datetime
7+
from os.path import isfile
8+
from typing import Dict, Generator, List
9+
10+
import dateutil.parser
11+
import pandas as pd
12+
import simplejson
13+
import singer
14+
from singer import utils
15+
from singer.schema import Schema
16+
217
from mage_integrations.sources.catalog import Catalog, CatalogEntry
318
from mage_integrations.sources.constants import (
419
REPLICATION_METHOD_FULL_TABLE,
520
REPLICATION_METHOD_INCREMENTAL,
621
REPLICATION_METHOD_LOG_BASED,
722
)
823
from mage_integrations.sources.messages import write_records, write_schema, write_state
9-
from mage_integrations.sources.utils import (
10-
get_standard_metadata,
11-
parse_args,
12-
)
24+
from mage_integrations.sources.utils import get_standard_metadata, parse_args
1325
from mage_integrations.utils.array import find_index
1426
from mage_integrations.utils.dictionary import extract, group_by, merge_dict
1527
from mage_integrations.utils.files import get_abs_path
1628
from mage_integrations.utils.logger import Logger
1729
from mage_integrations.utils.logger.constants import TYPE_SAMPLE_DATA
1830
from mage_integrations.utils.schema_helpers import extract_selected_columns
1931

20-
from os.path import isfile
21-
from singer import utils
22-
from singer.schema import Schema
23-
from typing import Dict, Generator, List
24-
import dateutil.parser
25-
import inspect
26-
import json
27-
import os
28-
import pandas as pd
29-
import simplejson
30-
import singer
31-
import sys
32-
import traceback
33-
3432
LOGGER = singer.get_logger()
3533

3634

@@ -47,7 +45,7 @@ def __init__(
4745
load_sample_data: bool = False,
4846
log_to_stdout: bool = False,
4947
logger=LOGGER,
50-
query: Dict = {},
48+
query: Dict = None,
5149
schemas_folder: str = 'schemas',
5250
selected_streams: List[str] = None,
5351
settings: Dict = None,
@@ -56,6 +54,8 @@ def __init__(
5654
test_connection: bool = False,
5755
verbose: int = 1,
5856
):
57+
if query is None:
58+
query = {}
5959
args = parse_args([])
6060
if args:
6161
if args.catalog:
@@ -231,7 +231,8 @@ def process(self) -> None:
231231
elif self.count_records_mode:
232232
arr = []
233233
selected_streams_arr = self.catalog.get_selected_streams(self.state or {}) or []
234-
streams = [stream for stream in selected_streams_arr if stream.tap_stream_id in self.selected_streams]
234+
streams = [stream for stream in selected_streams_arr
235+
if stream.tap_stream_id in self.selected_streams]
235236
for stream in streams:
236237
tap_stream_id = stream.tap_stream_id
237238
count = self.count_records(
@@ -443,9 +444,10 @@ def sync_stream(self, stream, properties: Dict = None) -> int:
443444

444445
write_state(state)
445446

446-
self.logger.info(f'Load data for stream {tap_stream_id} completed.', tags=merge_dict(tags, dict(
447-
records=record_count,
448-
)))
447+
self.logger.info(
448+
f'Load data for stream {tap_stream_id} completed.',
449+
tags=merge_dict(tags, dict(records=record_count)),
450+
)
449451

450452
return record_count
451453

@@ -491,7 +493,7 @@ def write_records(self, stream, rows: List[Dict], properties: Dict = None) -> Li
491493
if self.is_sorted:
492494
state = {}
493495

494-
for idx, col in enumerate(bookmark_properties):
496+
for _, col in enumerate(bookmark_properties):
495497
singer.write_bookmark(
496498
state,
497499
tap_stream_id,
@@ -534,9 +536,10 @@ def sync(self, catalog: Catalog, properties: Dict = None) -> None:
534536
self.process_stream(stream, properties)
535537
record_count = self.sync_stream(stream, properties)
536538

537-
self.logger.info(f'Sync for stream {tap_stream_id} completed.', tags=merge_dict(tags, dict(
538-
records=record_count,
539-
)))
539+
self.logger.info(
540+
f'Sync for stream {tap_stream_id} completed.',
541+
tags=merge_dict(tags, dict(records=record_count)),
542+
)
540543

541544
self.logger.info('Sync completed.')
542545

@@ -566,7 +569,8 @@ def build_catalog_entry(
566569
replication_method=replication_method or self.get_forced_replication_method(stream_id),
567570
schema=schema.to_dict(),
568571
stream_id=stream_id,
569-
valid_replication_keys=bookmark_properties or self.get_valid_replication_keys(stream_id),
572+
valid_replication_keys=bookmark_properties or
573+
self.get_valid_replication_keys(stream_id),
570574
)
571575

572576
idx = find_index(lambda x: len(x['breadcrumb']) == 0, metadata)
@@ -582,11 +586,13 @@ def build_catalog_entry(
582586
database=None,
583587
disable_column_type_check=None,
584588
is_view=None,
585-
key_properties=key_properties or [], # User customizes this after creating catalog from discover.
589+
# User customizes this after creating catalog from discover.
590+
key_properties=key_properties or [],
586591
metadata=metadata,
587592
partition_keys=None,
588593
replication_key=replication_key or '',
589-
replication_method=replication_method or self.get_forced_replication_method(stream_id),
594+
replication_method=replication_method or
595+
self.get_forced_replication_method(stream_id),
590596
row_count=None,
591597
schema=schema,
592598
stream=stream_id,
@@ -611,7 +617,7 @@ def count_records(
611617
self,
612618
stream,
613619
bookmarks: Dict = None,
614-
query: Dict = {},
620+
query: Dict = None,
615621
**kwargs,
616622
) -> int:
617623
return 0
@@ -620,7 +626,7 @@ def load_data(
620626
self,
621627
stream,
622628
bookmarks: Dict = None,
623-
query: Dict = {},
629+
query: Dict = None,
624630
sample_data: bool = False,
625631
start_date: datetime = None,
626632
**kwargs,

0 commit comments

Comments
 (0)