Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TDL-18879: Clear offset when max_skip error is encountered and return 0 if skip is greater than 250k in the current state. #29

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion tap_closeio/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,12 @@ def set_bookmark(self, path, val):

def get_offset(self, path):
off = bks_.get_offset(self.state, path[0])
return (off or {}).get(path[1])
value = (off or {}).get(path[1])
# for activities stream, if existing state contains offset greater
# than 250K, then return 0, as API will raise max_skip error
if value and path[0] == "activities" and value > 250000:
return 0
return value

def set_offset(self, path, val):
bks_.set_offset(self.state, path[0], path[1], val)
Expand Down
42 changes: 27 additions & 15 deletions tap_closeio/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@

LOGGER = singer.get_logger()

# default date date window size in days
DATE_WINDOW_SIZE = 15

PATHS = {
IDS.CUSTOM_FIELDS: "/custom_fields/lead/",
IDS.LEADS: "/lead/",
Expand Down Expand Up @@ -115,15 +118,24 @@ def paginated_sync(tap_stream_id, ctx, request, start_date):
# There may be streams other than `leads` that will run into
# `max_skip` errors but YAGNI. We can make the tap more
# complicated once we have an extant need for it.
if 'max_skip = ' in str(e) and tap_stream_id == IDS.LEADS:
LOGGER.info(("Hit max_skip error. "
"Setting bookmark to `{}` and restarting pagination.".format(
max_bookmark)))
skip = 0
ctx.clear_offsets(tap_stream_id)
ctx.set_bookmark(bookmark(tap_stream_id), max_bookmark)
_request = create_leads_request(ctx)
ctx.write_state()
if 'max_skip = ' in str(e):
if tap_stream_id == IDS.ACTIVITIES:
LOGGER.warning("Hit max_skip error so clearing skip offset, please reduce the date window size and try again.")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be an error and must be raised with this message.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

# clear offset
ctx.clear_offsets(tap_stream_id)
ctx.write_state()
raise Exception(str(e) + " So, clearing skip offset, please reduce the date window size and try again.") from None
elif tap_stream_id == IDS.LEADS:
LOGGER.info(("Hit max_skip error. "
"Setting bookmark to `{}` and restarting pagination.".format(
max_bookmark)))
skip = 0
ctx.clear_offsets(tap_stream_id)
ctx.set_bookmark(bookmark(tap_stream_id), max_bookmark)
_request = create_leads_request(ctx)
ctx.write_state()
else:
raise
else:
raise
ctx.clear_offsets(tap_stream_id)
Expand Down Expand Up @@ -168,15 +180,15 @@ def sync_activities(ctx):

try:
# get date window from config
date_window = int(ctx.config.get("date_window", 15))
# if date_window is 0, '0' or None, then set default window size of 15 days
date_window = int(ctx.config.get("date_window", DATE_WINDOW_SIZE))
# if date_window is 0, '0' or None, then set the default window size to DATE_WINDOW_SIZE (15 days)
if not date_window:
LOGGER.warning("Invalid value of date window is passed: \'{}\', using default window size of 15 days.".format(ctx.config.get("date_window")))
date_window = 15
LOGGER.warning("Invalid value of date window is passed: \'{}\', using default window size of {} days.".format(ctx.config.get("date_window"), DATE_WINDOW_SIZE))
date_window = DATE_WINDOW_SIZE
except ValueError:
LOGGER.warning("Invalid value of date window is passed: \'{}\', using default window size of 15 days.".format(ctx.config.get("date_window")))
LOGGER.warning("Invalid value of date window is passed: \'{}\', using default window size of {} days.".format(ctx.config.get("date_window"), DATE_WINDOW_SIZE))
# In case of empty string(''), use default window
date_window = 15
date_window = DATE_WINDOW_SIZE

LOGGER.info("Using offset seconds {}".format(offset_secs))
start_date -= timedelta(seconds=offset_secs)
Expand Down
2 changes: 1 addition & 1 deletion tests/unittests/test_activity_stream_date_window.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ def test_activity_stream_default_date_window(self, mocked_paginated_sync):
# now date
now_date = datetime.now()
config = {
"start_date": (now_date - timedelta(days=40)).strftime("%Y-%m-%d/"), # set date 40 days later than now
"start_date": (now_date - timedelta(days=40)).strftime("%Y-%m-%d"), # set date 40 days later than now
"api_key": "test_API_key"
}
state = {}
Expand Down
119 changes: 119 additions & 0 deletions tests/unittests/test_activity_stream_offset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
from unittest import mock
from tap_closeio.schemas import IDS
from tap_closeio.context import Context
from tap_closeio.streams import paginated_sync, create_request
import unittest

# mock Page class and set next_skip and records in the page
class MockPage:
def __init__(self, records, next_skip):
self.records = records
self.next_skip = next_skip

# mock paginate function and yield MockPage
def mock_paginate(*args, **kwargs):
pages = [
MockPage([{"id": 1, "date_created": "2022-01-02"}, {"id": 1, "date_created": "2022-01-03"}], 2),
MockPage([{"id": 1, "date_created": "2022-01-04"}, {"id": 1, "date_created": "2022-01-04"}], 4)
]
# yield 1st page, as a result after 1st page, the skip will be 2
yield pages[0]
# if there is param in the request to raise error, then raise max_skip error
if args[2].params.get("error"):
raise Exception("The skip you set is larger than the maximum skip for this resource (max_skip = 250000).")
# yield 2nd page
yield pages[1]

# mock format_dts function and return 3rd argument, ie. record
def mock_format_dts(*args, **kwargs):
return args[2]

class TestExistingStateOffset(unittest.TestCase):
"""
Test cases to verify we are returning 0 if skip > 250K else return the skip in the existing state file
"""
def test_existing_state_offset_greater_than_250K(self):
"""
Test case to verify we are returning 0 as skip is greater than 250K in existing state
"""
# mock config
config = {
"start_date": "2022-01-01",
"api_key": "test_API_key"
}
# mock state with skip > 250K
state = {
"currently_syncing": "activities",
"bookmarks": {
"activities": {
"date_created": "2022-04-01T00:00:00",
"offset": {"skip": 259000}
}
}
}
# create Context with config and state
context = Context(config, state)
# function call
offset = context.get_offset(["activities", "skip"])
# verify we got 0 as we had skip > 250K
self.assertEqual(offset, 0)

def test_existing_state_offset_lesser_than_250K(self):
"""
Test case to verify we are returning existing skip as it is lesser than 250K in existing state
"""
# mock config
config = {
"start_date": "2022-01-01",
"api_key": "test_API_key"
}
# mock state with skip < 250K
state = {
"currently_syncing": "activities",
"bookmarks": {
"activities": {
"date_created": "2022-04-01T00:00:00",
"offset": {"skip": 1000}
}
}
}
# create Context with config and state
context = Context(config, state)
# function call
offset = context.get_offset(["activities", "skip"])
# verify we got existing skip from the state as skip < 250K
self.assertEqual(offset, 1000)

@mock.patch("tap_closeio.streams.write_records")
@mock.patch("tap_closeio.streams.LOGGER.warning")
@mock.patch("tap_closeio.streams.paginate", side_effect = mock_paginate)
@mock.patch("tap_closeio.streams.format_dts", side_effect = mock_format_dts)
class TestOffsetClear(unittest.TestCase):
def test_clear_state_for_max_skip_error(self, mocked_format_dts, mocked_paginate, mocked_logger_warning, mocked_write_records):
"""
Test case to verify we clear the 'skip' when we encounter 'max_skip' error for 'activity' stream
"""
# mock config
config = {
"start_date": "2022-01-01",
"api_key": "test_API_key"
}
# mock state
state = {}
# mock param to raise 'max_skip' error
params = {"error": "true"}
# create Context with config and state
context = Context(config, state)
# create request for activity stream with params
request = create_request(IDS.ACTIVITIES, params)

# verify we raise Exception during function call
with self.assertRaises(Exception) as e:
paginated_sync(IDS.ACTIVITIES, context, request, "2022-01-01")

# verify the error message
self.assertEqual(str(e.exception), "The skip you set is larger than the maximum skip for this resource (max_skip = 250000). So, clearing skip offset, please reduce the date window size and try again.")
# verify we did not get 'skip' in the state file
self.assertIsNone(state.get("bookmarks").get("activities").get("offset").get("skip"))
# verify we log 'reduce date window' message
mocked_logger_warning.assert_called_with("Hit max_skip error so clearing skip offset, please reduce the date window size and try again.")