diff --git a/airbyte-integrations/connectors/source-mgramseva/source_mgramseva/source.py b/airbyte-integrations/connectors/source-mgramseva/source_mgramseva/source.py index 6b2f4ebbfdd88..25c9f736ecd7c 100644 --- a/airbyte-integrations/connectors/source-mgramseva/source_mgramseva/source.py +++ b/airbyte-integrations/connectors/source-mgramseva/source_mgramseva/source.py @@ -8,7 +8,7 @@ import base64 import hashlib -from datetime import datetime +from datetime import datetime, timedelta from logging import Logger from dateutil.relativedelta import relativedelta import requests @@ -19,6 +19,13 @@ from airbyte_cdk.sources.streams.http import HttpStream from airbyte_cdk.sources.streams.core import StreamData +pytz.IST = pytz.timezone("Asia/Kolkata") + + +def convert_to_date(x: int) -> datetime: + """convert a timestamp to a date""" + return datetime.fromtimestamp(x / 1000, pytz.UTC).astimezone(pytz.IST) + # Basic full refresh stream class MgramsevaStream(HttpStream, ABC): @@ -99,6 +106,7 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp """ :return an iterable containing each record in the response """ + # self.logger.info(response.json()) return map(lambda x: {"data": x, "id": x["id"]}, response.json()[self.response_key]) @@ -106,31 +114,54 @@ class MgramsevaDemands(MgramsevaStream): """object for consumer demands""" def __init__( - self, headers: dict, request_info: dict, user_request: dict, tenantid: str, start_date: datetime, end_date: datetime, **kwargs - ): - """specify endpoint for demands and call super""" - params = { - "tenantId": tenantid, - "businessService": "WS", - "periodFrom": int(1000 * start_date.timestamp()), - "periodTo": int(1000 * end_date.timestamp()), - } - super().__init__("billing-service/demand/_search", headers, request_info, user_request, params, "Demands", **kwargs) + self, headers: dict, request_info: dict, user_request: dict, tenantid_list: list, **kwargs + ): # pylint: disable=super-init-not-called + """ctor""" + self.tenantid_list = tenantid_list + self.headers = headers + self.request_info = request_info + self.user_request = user_request + self.response_key = "Demands" + + def read_records( + self, + sync_mode: SyncMode, + cursor_field: Optional[List[str]] = None, + stream_slice: Optional[Mapping[str, Any]] = None, + stream_state: Optional[Mapping[str, Any]] = None, + ) -> Iterable[StreamData]: + """override""" + for tenantid in self.tenantid_list: + params = { + "tenantId": tenantid, + "businessService": "WS", + } + demandstream = MgramsevaStream( + "billing-service/demand/_search", self.headers, self.request_info, self.user_request, params, self.response_key + ) + yield from demandstream.read_records(sync_mode, cursor_field, stream_slice, stream_state) + + def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: + """include the bill date""" + demands = response.json()[self.response_key] + for demand in demands: + demand["demandFromDate"] = convert_to_date(demand["taxPeriodFrom"]).strftime("%Y-%m-%d") + demand["demandToDate"] = convert_to_date(demand["taxPeriodTo"]).strftime("%Y-%m-%d") + return map(lambda x: {"data": x, "id": x["id"]}, demands) class MgramsevaBills(MgramsevaStream): """object for consumer bills""" - def __init__(self, headers: dict, request_info: dict, user_request: dict, tenantid: str, consumer_codes: list, **kwargs): + def __init__( + self, headers: dict, request_info: dict, user_request: dict, tenantid_list: list, consumer_codes: dict, **kwargs + ): # pylint: disable=super-init-not-called """specify endpoint for bills and call super""" self.headers = headers self.request_info = request_info self.user_request = user_request self.consumer_codes = consumer_codes - self.params = { - "tenantId": tenantid, - "businessService": "WS", - } + self.tenantid_list = tenantid_list def read_records( self, @@ -140,13 +171,13 @@ def read_records( stream_state: Optional[Mapping[str, Any]] = None, ) -> Iterable[StreamData]: """override""" - for consumer_code in self.consumer_codes: - params = self.params.copy() - params["consumerCode"] = consumer_code - consumer_code_stream = MgramsevaStream( - "billing-service/bill/v2/_fetchbill", self.headers, self.request_info, self.user_request, params, "Bill" - ) - yield from consumer_code_stream.read_records(sync_mode, cursor_field, stream_slice, stream_state) + for tenantid in self.tenantid_list: + for consumer_code in self.consumer_codes[tenantid]: + params = {"tenantId": tenantid, "businessService": "WS", "consumerCode": consumer_code} + consumer_code_stream = MgramsevaStream( + "billing-service/bill/v2/_fetchbill", self.headers, self.request_info, self.user_request, params, "Bill" + ) + yield from consumer_code_stream.read_records(sync_mode, cursor_field, stream_slice, stream_state) class MgramsevaTenantExpense(MgramsevaStream): @@ -160,18 +191,18 @@ def __init__( user_request: dict, tenantid: str, month_start: datetime, - next_month_start: datetime, + month_end: datetime, response_key: str, **kwargs, ): """call super""" self.tenantid = tenantid self.month_start = month_start - self.next_month_start = next_month_start + self.month_end = month_end params = { "tenantId": self.tenantid, "fromDate": int(month_start.timestamp() * 1000), - "toDate": int(next_month_start.timestamp() * 1000), + "toDate": int(month_end.timestamp() * 1000), } super().__init__(endpoint, headers, request_info, user_request, params, response_key, **kwargs) @@ -182,7 +213,7 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp expenses = response.json()[self.response_key] expenses["tenantId"] = self.tenantid expenses["fromDate"] = self.month_start.strftime("%Y-%m-%d") - expenses["toDate"] = self.next_month_start.strftime("%Y-%m-%d") + expenses["toDate"] = self.month_end.strftime("%Y-%m-%d") combined_string = f"{self.tenantid}{expenses['fromDate']}{expenses['toDate']}" id_hash = hashlib.sha256(combined_string.encode()) return [{"data": expenses, "id": id_hash.hexdigest()}] @@ -192,8 +223,8 @@ class MgramsevaTenantExpenses(MgramsevaStream): """object for tenant payments""" def __init__( - self, headers: dict, request_info: dict, user_request: dict, tenantid: str, fromdate: datetime, todate: datetime, **kwargs - ): + self, headers: dict, request_info: dict, user_request: dict, tenantid_list: list, fromdate: datetime, todate: datetime, **kwargs + ): # pylint: disable=super-init-not-called """ specify endpoint for demands and call super 1672531200000 = 2023-01-01 00:00 @@ -202,7 +233,7 @@ def __init__( self.headers = headers self.request_info = request_info self.user_request = user_request - self.tenantid = tenantid + self.tenantid_list = tenantid_list self.fromdate = fromdate self.todate = todate @@ -215,34 +246,56 @@ def read_records( ) -> Iterable[StreamData]: """override""" - month_start = self.fromdate.replace(day=1) + for tenantid in self.tenantid_list: - while month_start < self.todate: + month_start = self.fromdate.replace(day=1) - next_month_start = month_start + relativedelta(months=1) + while month_start < self.todate: - stream = MgramsevaTenantExpense( - "echallan-services/eChallan/v1/_expenseDashboard", - self.headers, - self.request_info, - self.user_request, - self.tenantid, - month_start, - next_month_start, - "ExpenseDashboard", - ) - yield from stream.read_records(sync_mode, cursor_field, stream_slice, stream_state) + next_month_start = month_start + relativedelta(months=1) - timedelta(milliseconds=1) - month_start = next_month_start + stream = MgramsevaTenantExpense( + "echallan-services/eChallan/v1/_expenseDashboard", + self.headers, + self.request_info, + self.user_request, + tenantid, + month_start, + next_month_start, + "ExpenseDashboard", + ) + yield from stream.read_records(sync_mode, cursor_field, stream_slice, stream_state) + + month_start = next_month_start class MgramsevaPayments(MgramsevaStream): """object for consumer payments""" - def __init__(self, headers: dict, request_info: dict, user_request: dict, tenantid: str, **kwargs): + def __init__( + self, headers: dict, request_info: dict, user_request: dict, tenantid_list: list, **kwargs + ): # pylint: disable=super-init-not-called """specify endpoint for payments and call super""" - params = {"tenantId": tenantid, "businessService": "WS"} - super().__init__("collection-services/payments/WS/_search", headers, request_info, user_request, params, "Payments", **kwargs) + self.headers = headers + self.request_info = request_info + self.user_request = user_request + self.tenantid_list = tenantid_list + + def read_records( + self, + sync_mode: SyncMode, + cursor_field: Optional[List[str]] = None, + stream_slice: Optional[Mapping[str, Any]] = None, + stream_state: Optional[Mapping[str, Any]] = None, + ) -> Iterable[StreamData]: + """override""" + + for tenantid in self.tenantid_list: + params = {"tenantId": tenantid, "businessService": "WS"} + paymentstream = MgramsevaStream( + "collection-services/payments/WS/_search", self.headers, self.request_info, self.user_request, params, "Payments" + ) + yield from paymentstream.read_records(sync_mode, cursor_field, stream_slice, stream_state) # Source @@ -332,24 +385,28 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: # tenant_expenses_from = datetime.strptime(config.get("tenant_expenses_from", "2022-01-01"), "%Y-%m-%d") # tenant_expenses_to = datetime.strptime(config.get("tenant_expenses_to", "2022-01-01"), "%Y-%m-%d") - start_date = datetime.strptime(config.get("start_date", "2022-01-01"), "%Y-%m-%d").replace(tzinfo=pytz.UTC) - end_date = datetime.today().replace(tzinfo=pytz.UTC) - - for tenantid in self.config["tenantids"]: - # Generate streams for each object type - streams = [ - MgramsevaPayments(self.headers, self.request_info, self.user_request, tenantid), - MgramsevaTenantExpenses(self.headers, self.request_info, self.user_request, tenantid, start_date, end_date), - ] + start_date = datetime.strptime(config.get("start_date", "2022-01-01"), "%Y-%m-%d") + start_date = pytz.IST.localize(start_date).astimezone(pytz.utc) + end_date = datetime.today() + end_date = pytz.IST.localize(end_date).astimezone(pytz.utc) - demand_stream = MgramsevaDemands(self.headers, self.request_info, self.user_request, tenantid, start_date, end_date) - streams.append(demand_stream) + # Generate streams for each object type + streams = [ + MgramsevaPayments(self.headers, self.request_info, self.user_request, self.config["tenantids"]), + MgramsevaTenantExpenses(self.headers, self.request_info, self.user_request, self.config["tenantids"], start_date, end_date), + MgramsevaDemands(self.headers, self.request_info, self.user_request, self.config["tenantids"]), + ] - # and now we need bills for each consumer - consumer_codes = set() - for demand in demand_stream.read_records(SyncMode.full_refresh): - consumer_codes.add(demand["data"]["consumerCode"]) + # and now we need bills for each consumer + tenantid_to_consumer_codes = {} + for tenantid in self.config["tenantids"]: + tenantid_to_consumer_codes[tenantid] = set() + tmp_demand_stream = MgramsevaDemands(self.headers, self.request_info, self.user_request, [tenantid]) + for demand in tmp_demand_stream.read_records(SyncMode.full_refresh): + tenantid_to_consumer_codes[tenantid].add(demand["data"]["consumerCode"]) - streams.append(MgramsevaBills(self.headers, self.request_info, self.user_request, tenantid, list(consumer_codes))) + streams.append( + MgramsevaBills(self.headers, self.request_info, self.user_request, self.config["tenantids"], tenantid_to_consumer_codes) + ) - return streams + return streams