Skip to content

Commit 4d5c56e

Browse files
committed
feat : mgramseva expense stream incremental
1 parent 9e36802 commit 4d5c56e

File tree

4 files changed

+81
-58
lines changed

4 files changed

+81
-58
lines changed
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
{
2-
"todo-stream-name": {
3-
"todo-field-name": "value"
2+
"mgramseva_tenant_expenses": {
3+
"toDate": "2024-08-09"
44
}
55
}

airbyte-integrations/connectors/source-mgramseva/poetry.lock

Lines changed: 45 additions & 45 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

airbyte-integrations/connectors/source-mgramseva/source_mgramseva/schemas/mgramseva_tenant_expenses.json

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@
77
},
88
"data": {
99
"type": "object"
10+
},
11+
"toDate":{
12+
"type": "string"
1013
}
1114
}
1215
}

airbyte-integrations/connectors/source-mgramseva/source_mgramseva/source.py

Lines changed: 31 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
import pytz
1616
from airbyte_cdk.models import SyncMode
1717
from airbyte_cdk.sources import AbstractSource
18-
from airbyte_cdk.sources.streams import Stream
18+
from airbyte_cdk.sources.streams import Stream, CheckpointMixin
1919
from airbyte_cdk.sources.streams.http import HttpStream
2020
from airbyte_cdk.sources.streams.core import StreamData
2121

@@ -216,12 +216,15 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp
216216
expenses["toDate"] = self.month_end.strftime("%Y-%m-%d")
217217
combined_string = f"{self.tenantid}{expenses['fromDate']}{expenses['toDate']}"
218218
id_hash = hashlib.sha256(combined_string.encode())
219-
return [{"data": expenses, "id": id_hash.hexdigest()}]
219+
return [{"data": expenses, "id": id_hash.hexdigest(),"toDate":expenses["toDate"]}]
220220

221221

222-
class MgramsevaTenantExpenses(MgramsevaStream):
222+
class MgramsevaTenantExpenses(MgramsevaStream, CheckpointMixin):
223223
"""object for tenant payments"""
224224

225+
cursor_field = "toDate"
226+
_cursor_value = None
227+
225228
def __init__(
226229
self, headers: dict, request_info: dict, user_request: dict, tenantid_list: list, fromdate: datetime, todate: datetime, **kwargs
227230
): # pylint: disable=super-init-not-called
@@ -234,8 +237,20 @@ def __init__(
234237
self.request_info = request_info
235238
self.user_request = user_request
236239
self.tenantid_list = tenantid_list
237-
self.fromdate = fromdate
238-
self.todate = todate
240+
self.fromdate = fromdate.replace(hour=0, minute=0, second=0, microsecond=0)
241+
self.todate = todate.replace(hour=0, minute=0, second=0, microsecond=0)
242+
243+
@property
244+
def state(self) -> Mapping[str, Any]:
245+
if self._cursor_value:
246+
return {self.cursor_field: str(self._cursor_value)}
247+
else:
248+
return {self.cursor_field: str(self.fromdate)}
249+
250+
@state.setter
251+
def state(self, value: Mapping[str, Any]):
252+
if self.cursor_field in value:
253+
self._cursor_value = value[self.cursor_field]
239254

240255
def read_records(
241256
self,
@@ -248,25 +263,30 @@ def read_records(
248263

249264
for tenantid in self.tenantid_list:
250265

251-
month_start = self.fromdate.replace(day=1)
266+
state_value = self.state[self.cursor_field]
252267

253-
while month_start < self.todate:
268+
month_start = datetime.fromisoformat(state_value).replace(day=1)
254269

255-
next_month_start = month_start + relativedelta(months=1) - timedelta(milliseconds=1)
270+
to_date = self.todate.replace(day=1)
256271

272+
while month_start < to_date:
273+
274+
next_month_start = month_start + relativedelta(months=1)
257275
stream = MgramsevaTenantExpense(
258276
"echallan-services/eChallan/v1/_expenseDashboard",
259277
self.headers,
260278
self.request_info,
261279
self.user_request,
262280
tenantid,
263281
month_start,
264-
next_month_start,
282+
next_month_start - timedelta(milliseconds=1),
265283
"ExpenseDashboard",
266284
)
267-
yield from stream.read_records(sync_mode, cursor_field, stream_slice, stream_state)
268-
285+
for record in stream.read_records(sync_mode, cursor_field, stream_slice, stream_state):
286+
yield record
269287
month_start = next_month_start
288+
289+
self._cursor_value = str(self.todate.replace(day=1))
270290

271291

272292
class MgramsevaPayments(MgramsevaStream):

0 commit comments

Comments
 (0)