Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

N/A values in numeric fields #85

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 86 additions & 37 deletions tap_hubspot/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
import singer.metrics as metrics
from singer import metadata
from singer import utils
from singer import (transform,
from singer import (
NO_INTEGER_DATETIME_PARSING,
UNIX_MILLISECONDS_INTEGER_DATETIME_PARSING,
Transformer, _transform_datetime)

Expand All @@ -38,6 +39,54 @@ class StateFields:
offset = 'offset'
this_stream = 'this_stream'

class RowTransformer(Transformer):
def __init__(self, integer_datetime_fmt=NO_INTEGER_DATETIME_PARSING, pre_hook=None):
super().__init__(integer_datetime_fmt, pre_hook)

def replace_na_with_none(self, obj):
'''Given a certain object, the function will replace any 'N/A' values with None.
E.g: object = {
"key1" : [{"subkey1": "value1"}, {"subkey2": "N/A"}],
"key2" : "n/a",
"key3" : {
"subkey3" : "n/a",
"subkey4" : "value2"
}
}
self.replace_na_with_none(object) will return:
{
"key1" : [{"subkey1": "value1"}, {"subkey2": None}],
"key2" : None,
"key3" : {
"subkey3" : None,
"subkey4" : "value2"
}
}
'''
if isinstance(obj, dict):
new_dict = {}
for key, value in obj.items():
new_dict[key] = self.replace_na_with_none(value)
return new_dict

if isinstance(obj, list):
new_list = []
for value in obj:
new_list.append(self.replace_na_with_none(value))
return new_list

if isinstance(obj, str):
if obj.lower() == 'n/a':
obj = None
return obj

def process_row(self, row, schema, metadata):
''' Function that replaces N/A values in a record and returns only the selected fields in a row, given a certain schema and its metadata.
For further information: https://github.com/singer-io/singer-python/blob/master/singer/transform.py#L122'''
row = self.replace_na_with_none(row)
row = self.transform(row, schema, metadata)
return row

CHUNK_SIZES = {
"email_events": 1000 * 60 * 60 * 24,
"subscription_changes": 1000 * 60 * 60 * 24,
Expand Down Expand Up @@ -115,9 +164,7 @@ def get_field_type_schema(field_type):
"format": "date-time"}

elif field_type == "number":
# A value like 'N/A' can be returned for this type,
# so we have to let this be a string sometimes
return {"type": ["null", "number", "string"]}
return {"type": ["null", "number"]}

else:
return {"type": ["null", "string"]}
Expand Down Expand Up @@ -268,6 +315,7 @@ def request(url, params=None):
# }

#pylint: disable=line-too-long

def gen_request(STATE, tap_stream_id, url, params, path, more_key, offset_keys, offset_targets):
if len(offset_keys) != len(offset_targets):
raise ValueError("Number of offset_keys must match number of offset_targets")
Expand All @@ -278,9 +326,8 @@ def gen_request(STATE, tap_stream_id, url, params, path, more_key, offset_keys,
with metrics.record_counter(tap_stream_id) as counter:
while True:
data = request(url, params).json()

for row in data[path]:
counter.increment()
for row in data[path]:
counter.increment()
yield row

if not data.get(more_key, False):
Expand All @@ -298,7 +345,7 @@ def gen_request(STATE, tap_stream_id, url, params, path, more_key, offset_keys,
singer.write_state(STATE)


def _sync_contact_vids(catalog, vids, schema, bumble_bee):
def _sync_contact_vids(catalog, vids, schema, transformer):
if len(vids) == 0:
return

Expand All @@ -307,7 +354,7 @@ def _sync_contact_vids(catalog, vids, schema, bumble_bee):
mdata = metadata.to_map(catalog.get('metadata'))

for record in data.values():
record = bumble_bee.transform(record, schema, mdata)
record = transformer.process_row(record, schema, mdata)
singer.write_record("contacts", record, catalog.get('stream_alias'), time_extracted=time_extracted)

default_contact_params = {
Expand All @@ -330,7 +377,7 @@ def sync_contacts(STATE, ctx):
url = get_url("contacts_all")

vids = []
with Transformer(UNIX_MILLISECONDS_INTEGER_DATETIME_PARSING) as bumble_bee:
with RowTransformer(UNIX_MILLISECONDS_INTEGER_DATETIME_PARSING) as transformer:
for row in gen_request(STATE, 'contacts', url, default_contact_params, 'contacts', 'has-more', ['vid-offset'], ['vidOffset']):
modified_time = None
if bookmark_key in row:
Expand All @@ -346,10 +393,10 @@ def sync_contacts(STATE, ctx):
max_bk_value = modified_time

if len(vids) == 100:
_sync_contact_vids(catalog, vids, schema, bumble_bee)
_sync_contact_vids(catalog, vids, schema, transformer)
vids = []

_sync_contact_vids(catalog, vids, schema, bumble_bee)
_sync_contact_vids(catalog, vids, schema, transformer)

STATE = singer.write_bookmark(STATE, 'contacts', bookmark_key, utils.strftime(max_bk_value))
singer.write_state(STATE)
Expand All @@ -371,14 +418,16 @@ def _sync_contacts_by_company(STATE, company_id):

url = get_url("contacts_by_company", company_id=company_id)
path = 'vids'
with Transformer(UNIX_MILLISECONDS_INTEGER_DATETIME_PARSING) as bumble_bee:
with RowTransformer(UNIX_MILLISECONDS_INTEGER_DATETIME_PARSING) as transformer:
with metrics.record_counter(CONTACTS_BY_COMPANY) as counter:
data = request(url, default_contacts_by_company_params).json()
for row in data[path]:
counter.increment()
record = {'company-id' : company_id,
'contact-id' : row}
record = bumble_bee.transform(record, schema)
record = transformer.process_row({
'company-id' : company_id,
'contact-id' : row
},
schema)
singer.write_record("contacts_by_company", record, time_extracted=utils.now())

return STATE
Expand All @@ -390,7 +439,7 @@ def _sync_contacts_by_company(STATE, company_id):
def sync_companies(STATE, ctx):
catalog = ctx.get_catalog_from_id(singer.get_currently_syncing(STATE))
mdata = metadata.to_map(catalog.get('metadata'))
bumble_bee = Transformer(UNIX_MILLISECONDS_INTEGER_DATETIME_PARSING)
transformer = RowTransformer(UNIX_MILLISECONDS_INTEGER_DATETIME_PARSING)
bookmark_key = 'hs_lastmodifieddate'
start = utils.strptime_with_tz(get_start(STATE, "companies", bookmark_key))
LOGGER.info("sync_companies from %s", start)
Expand All @@ -403,7 +452,7 @@ def sync_companies(STATE, ctx):
contacts_by_company_schema = load_schema(CONTACTS_BY_COMPANY)
singer.write_schema("contacts_by_company", contacts_by_company_schema, ["company-id", "contact-id"])

with bumble_bee:
with transformer:
for row in gen_request(STATE, 'companies', url, default_company_params, 'companies', 'has-more', ['offset'], ['offset']):
row_properties = row['properties']
modified_time = None
Expand All @@ -421,7 +470,7 @@ def sync_companies(STATE, ctx):

if not modified_time or modified_time >= start:
record = request(get_url("companies_detail", company_id=row['companyId'])).json()
record = bumble_bee.transform(record, schema, mdata)
record = transformer.process_row(record, schema, mdata)
singer.write_record("companies", record, catalog.get('stream_alias'), time_extracted=utils.now())
if CONTACTS_BY_COMPANY in ctx.selected_stream_ids:
STATE = _sync_contacts_by_company(STATE, record['companyId'])
Expand Down Expand Up @@ -458,7 +507,7 @@ def sync_deals(STATE, ctx):
params['properties'].append(key)

url = get_url('deals_all')
with Transformer(UNIX_MILLISECONDS_INTEGER_DATETIME_PARSING) as bumble_bee:
with RowTransformer(UNIX_MILLISECONDS_INTEGER_DATETIME_PARSING) as transformer:
for row in gen_request(STATE, 'deals', url, params, 'deals', "hasMore", ["offset"], ["offset"]):
row_properties = row['properties']
modified_time = None
Expand All @@ -474,7 +523,7 @@ def sync_deals(STATE, ctx):
max_bk_value = modified_time

if not modified_time or modified_time >= start:
record = bumble_bee.transform(row, schema, mdata)
record = transformer.process_row(row, schema, mdata)
singer.write_record("deals", record, catalog.get('stream_alias'), time_extracted=utils.now())

STATE = singer.write_bookmark(STATE, 'deals', bookmark_key, utils.strftime(max_bk_value))
Expand All @@ -491,10 +540,10 @@ def sync_campaigns(STATE, ctx):
url = get_url("campaigns_all")
params = {'limit': 500}

with Transformer(UNIX_MILLISECONDS_INTEGER_DATETIME_PARSING) as bumble_bee:
with RowTransformer(UNIX_MILLISECONDS_INTEGER_DATETIME_PARSING) as transformer:
for row in gen_request(STATE, 'campaigns', url, params, "campaigns", "hasMore", ["offset"], ["offset"]):
record = request(get_url("campaigns_detail", campaign_id=row['id'])).json()
record = bumble_bee.transform(record, schema, mdata)
record = transformer.process_row(record, schema, mdata)
singer.write_record("campaigns", record, catalog.get('stream_alias'), time_extracted=utils.now())

return STATE
Expand Down Expand Up @@ -524,7 +573,7 @@ def sync_entity_chunked(STATE, catalog, entity_name, key_properties, path):
'endTimestamp': end_ts,
'limit': 1000,
}
with Transformer(UNIX_MILLISECONDS_INTEGER_DATETIME_PARSING) as bumble_bee:
with RowTransformer(UNIX_MILLISECONDS_INTEGER_DATETIME_PARSING) as transformer:
while True:
our_offset = singer.get_offset(STATE, entity_name)
if bool(our_offset) and our_offset.get('offset') != None:
Expand All @@ -535,7 +584,7 @@ def sync_entity_chunked(STATE, catalog, entity_name, key_properties, path):

for row in data[path]:
counter.increment()
record = bumble_bee.transform(row, schema, mdata)
record = transformer.process_row(row, schema, mdata)
singer.write_record(entity_name,
record,
catalog.get('stream_alias'),
Expand Down Expand Up @@ -580,9 +629,9 @@ def sync_contact_lists(STATE, ctx):

url = get_url("contact_lists")
params = {'count': 250}
with Transformer(UNIX_MILLISECONDS_INTEGER_DATETIME_PARSING) as bumble_bee:
with RowTransformer(UNIX_MILLISECONDS_INTEGER_DATETIME_PARSING) as transformer:
for row in gen_request(STATE, 'contact_lists', url, params, "lists", "has-more", ["offset"], ["offset"]):
record = bumble_bee.transform(row, schema, mdata)
record = transformer.process_row(row, schema, mdata)

if record[bookmark_key] >= start:
singer.write_record("contact_lists", record, catalog.get('stream_alias'), time_extracted=utils.now())
Expand All @@ -609,9 +658,9 @@ def sync_forms(STATE, ctx):
data = request(get_url("forms")).json()
time_extracted = utils.now()

with Transformer(UNIX_MILLISECONDS_INTEGER_DATETIME_PARSING) as bumble_bee:
with RowTransformer(UNIX_MILLISECONDS_INTEGER_DATETIME_PARSING) as transformer:
for row in data:
record = bumble_bee.transform(row, schema, mdata)
record = transformer.process_row(row, schema, mdata)

if record[bookmark_key] >= start:
singer.write_record("forms", record, catalog.get('stream_alias'), time_extracted=time_extracted)
Expand Down Expand Up @@ -640,9 +689,9 @@ def sync_workflows(STATE, ctx):
data = request(get_url("workflows")).json()
time_extracted = utils.now()

with Transformer(UNIX_MILLISECONDS_INTEGER_DATETIME_PARSING) as bumble_bee:
with RowTransformer(UNIX_MILLISECONDS_INTEGER_DATETIME_PARSING) as transformer:
for row in data['workflows']:
record = bumble_bee.transform(row, schema, mdata)
record = transformer.process_row(row, schema, mdata)
if record[bookmark_key] >= start:
singer.write_record("workflows", record, catalog.get('stream_alias'), time_extracted=time_extracted)
if record[bookmark_key] >= max_bk_value:
Expand All @@ -666,9 +715,9 @@ def sync_owners(STATE, ctx):
data = request(get_url("owners")).json()
time_extracted = utils.now()

with Transformer(UNIX_MILLISECONDS_INTEGER_DATETIME_PARSING) as bumble_bee:
with RowTransformer(UNIX_MILLISECONDS_INTEGER_DATETIME_PARSING) as transformer:
for row in data:
record = bumble_bee.transform(row, schema, mdata)
record = transformer.process_row(row, schema, mdata)
if record[bookmark_key] >= max_bk_value:
max_bk_value = record[bookmark_key]

Expand Down Expand Up @@ -699,9 +748,9 @@ def sync_engagements(STATE, ctx):

time_extracted = utils.now()

with Transformer(UNIX_MILLISECONDS_INTEGER_DATETIME_PARSING) as bumble_bee:
with RowTransformer(UNIX_MILLISECONDS_INTEGER_DATETIME_PARSING) as transformer:
for engagement in engagements:
record = bumble_bee.transform(engagement, schema, mdata)
record = transformer.process_row(engagement, schema, mdata)
if record['engagement'][bookmark_key] >= start:
# hoist PK and bookmark field to top-level record
record['engagement_id'] = record['engagement']['id']
Expand All @@ -721,9 +770,9 @@ def sync_deal_pipelines(STATE, ctx):
singer.write_schema('deal_pipelines', schema, ['pipelineId'], catalog.get('stream_alias'))
LOGGER.info('sync_deal_pipelines')
data = request(get_url('deal_pipelines')).json()
with Transformer(UNIX_MILLISECONDS_INTEGER_DATETIME_PARSING) as bumble_bee:
with RowTransformer(UNIX_MILLISECONDS_INTEGER_DATETIME_PARSING) as transformer:
for row in data:
record = bumble_bee.transform(row, schema, mdata)
record = transformer.process_row(row, schema, mdata)
singer.write_record("deal_pipelines", record, catalog.get('stream_alias'), time_extracted=utils.now())
singer.write_state(STATE)
return STATE
Expand Down
59 changes: 59 additions & 0 deletions tap_hubspot/tests/test_sql_types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
from tap_hubspot import RowTransformer
from singer import UNIX_MILLISECONDS_INTEGER_DATETIME_PARSING

class TestRelationalDBFieldType(unittest.TestCase):
def test_process_na(self):
row = {'addedAt': 1549520648318,
'vid': 151,
'canonical-vid': 151,
'merged-vids': [],
'portal-id': 'N/A',
'is-contact': True,
'profile-token': 'test_token',
'profile-url': 'https://app.hubspot.com/contacts/5389540/contact/151',
'properties': {'firstname': {'value': 'Firstname'},
'lastmodifieddate': {'value': '1549521003396'},
'lastname': {'value': 'Lastname'}},
'form-submissions': [], 'list-memberships': [],
'identity-profiles': [{'vid': 151,
'saved-at-timestamp': 1549520648194,
'deleted-changed-timestamp': 0,
'identities': [{'type': 'EMAIL',
'value': '[email protected]',
'timestamp': 1549520648175,
'is-primary': True},
{'type': 'LEAD_GUID',
'value': '76f4s5sq-5308-4796-96c4-4b7381e17cg3',
'timestamp': 1549520648191}]}],
'merge-audits': [],
'version': 7,
'versionTimestamp': 1549521003697}
transformer = RowTransformer(UNIX_MILLISECONDS_INTEGER_DATETIME_PARSING)
processed_row = transformer.replace_na_with_none(row)

self.asserEqual(processed_row,
{'addedAt': 1549520648318,
'vid': 151,
'canonical-vid': 151,
'merged-vids': [],
'portal-id': None,
'is-contact': True,
'profile-token': 'test_token',
'profile-url': 'https://app.hubspot.com/contacts/5389540/contact/151',
'properties': {'firstname': {'value': 'Firstname'},
'lastmodifieddate': {'value': '1549521003396'},
'lastname': {'value': 'Lastname'}},
'form-submissions': [], 'list-memberships': [],
'identity-profiles': [{'vid': 151,
'saved-at-timestamp': 1549520648194,
'deleted-changed-timestamp': 0,
'identities': [{'type': 'EMAIL',
'value': '[email protected]',
'timestamp': 1549520648175,
'is-primary': True},
{'type': 'LEAD_GUID',
'value': '76f4s5sq-5308-4796-96c4-4b7381e17cg3',
'timestamp': 1549520648191}]}],
'merge-audits': [],
'version': 7,
'versionTimestamp': 1549521003697})