1+ import functools
12import demistomock as demisto # noqa: F401
23from CommonServerPython import * # noqa: F401
34""" IMPORTS """
@@ -100,30 +101,39 @@ def get_events(self, config_ids: str, offset: str | None = '', limit: str | int
100101 new_offset = str (from_epoch )
101102 return events , new_offset
102103
103- def get_events_with_offset (
104- self ,
105- config_ids : str ,
106- offset : str | None = '' ,
107- limit : int = 20 ,
108- from_epoch : str = ''
109- ) -> tuple [list [str ], str | None ]:
104+ def execute_get_events_request (self , params : dict [str , int | str ], config_ids : str , prefix_msg : str = "" ):
105+ demisto .info (f"{ prefix_msg } Init session and sending request." )
106+ raw_response : str = self ._http_request (
107+ method = 'GET' ,
108+ url_suffix = f'/{ config_ids } ' ,
109+ params = params ,
110+ resp_type = 'text' ,
111+ )
112+ demisto .info (f"{ prefix_msg } Finished executing request to Akamai, processing" )
113+ return raw_response
114+
115+ def prepare_params (self , limit , offset , from_epoch , prefix_msg : str = "" ) -> dict [str , int | str ]:
110116 params : dict [str , int | str ] = {
111117 'limit' : limit
112118 }
113119 if offset :
114- demisto .info (f"received { offset = } will run an offset based request." )
120+ demisto .info (f"{ prefix_msg } received { offset = } will run an offset based request." )
115121 params ["offset" ] = offset
116122 else :
117123 from_param = int (from_epoch )
118124 params ["from" ] = from_param
119- demisto .info (f"did not receive an offset. will run a time based request with { from_param = } " )
120- raw_response : str = self ._http_request (
121- method = 'GET' ,
122- url_suffix = f'/{ config_ids } ' ,
123- params = params ,
124- resp_type = 'text' ,
125- )
126- demisto .info ("Finished executing request to Akamai, processing" )
125+ demisto .info (f"{ prefix_msg } didn't receive offset. will run a time based request with { from_param = } ." )
126+ return params
127+
128+ def get_events_with_offset (
129+ self ,
130+ config_ids : str ,
131+ offset : str | None = '' ,
132+ limit : int = 20 ,
133+ from_epoch : str = '' ,
134+ ) -> tuple [list [str ], str | None ]:
135+ params = self .prepare_params (offset = offset , limit = limit , from_epoch = from_epoch )
136+ raw_response = self .execute_get_events_request (params , config_ids )
127137 events : list [str ] = raw_response .split ('\n ' )
128138 offset = None
129139 try :
@@ -136,6 +146,45 @@ def get_events_with_offset(
136146 demisto .error (f"couldn't decode offset with { offset_context = } , reason { e } " )
137147 return events , offset
138148
149+ async def get_events_concurrently (
150+ self ,
151+ config_ids : str ,
152+ offset : str | None = '' ,
153+ limit : int = 200000 ,
154+ from_epoch : str = '' ,
155+ counter : int = 0
156+ ) -> tuple [list [str ], str | None ]:
157+ """Send request to get events from Akamai.
158+
159+ Args:
160+ config_ids (str): security configuration ids to fetch, e.g. `51000;56080`.
161+ offset (str | None): The offset (hash) to use for offset based mechanism.
162+ limit (int, optional): The number of events to limit for every request.
163+ from_epoch (str): From when to fetch if first time.
164+ counter (int, optional): The execution number.
165+
166+ Returns:
167+ tuple[list[str], str | None]: The events and offset obtained from last request.
168+ """
169+ params = self .prepare_params (offset = offset , limit = limit , from_epoch = from_epoch ,
170+ prefix_msg = f"Running in interval = { counter } . " )
171+ loop = asyncio .get_event_loop ()
172+ raw_response = await loop .run_in_executor (None , functools .partial (self .execute_get_events_request ,
173+ config_ids = config_ids ,
174+ params = params ,
175+ prefix_msg = f"Running in interval = { counter } . "
176+ ))
177+ events : list [str ] = raw_response .split ('\n ' )
178+ new_offset = None
179+ try :
180+ offset_context = events .pop ()
181+ loaded_offset_context = json .loads (offset_context )
182+ new_offset = loaded_offset_context .get ("offset" )
183+ except Exception as e :
184+ demisto .error (f"Running in interval = { counter } . Couldn't decode offset with { offset_context = } , reason { e } " )
185+ new_offset = offset
186+ return events , new_offset
187+
139188
140189'''HELPER FUNCIONS'''
141190
@@ -501,6 +550,7 @@ def fetch_events_command(
501550 "please run 'akamai-siem-reset-offset' on the specific instance.\n " \
502551 'For more information, please refer to the Troubleshooting section in the integration documentation.\n ' \
503552 f'original error: [{ e } ]'
553+ reset_offset_command (client )
504554 raise DemistoException (err_msg )
505555 else :
506556 raise DemistoException (e )
@@ -575,64 +625,6 @@ def post_latest_event_time(latest_event, base_msg):
575625MAX_ALLOWED_CONCURRENT_TASKS = 10000
576626
577627
578- async def get_events_with_offset_aiohttp (
579- client : Client ,
580- config_ids : str ,
581- offset : str | None = '' ,
582- limit : int = 200000 ,
583- from_epoch : str = '' ,
584- counter : int = 0
585- ) -> tuple [list [str ], str | None ]:
586- """Send request to get events from Akamai.
587-
588- Args:
589- client: Client object with request.
590- config_ids (str): security configuration ids to fetch, e.g. `51000;56080`.
591- offset (str | None): The offset (hash) to use for offset based mechanism.
592- limit (int, optional): The number of events to limit for every request.
593- from_epoch (str): From when to fetch if first time.
594- counter (int, optional): The execution number.
595-
596- Returns:
597- tuple[list[str], str | None]: The events and offset obtained from last request.
598- """
599- params : dict [str , int | str ] = {
600- 'limit' : limit
601- }
602- if offset :
603- demisto .info (f"Running in interval = { counter } . received { offset = } will run an offset based request." )
604- params ["offset" ] = offset
605- else :
606- from_param = int (from_epoch )
607- params ["from" ] = from_param
608- demisto .info (f"Running in interval = { counter } . didn't receive offset. will run a time based request with { from_param = } ." )
609-
610- url = f"{ client ._base_url } /"
611- demisto .info (f"Running in interval = { counter } . Init session and sending request." )
612-
613- async with aiohttp .ClientSession (base_url = url ,
614- trust_env = True ) as session , session .get (url = config_ids ,
615- params = params ,
616- auth = client ._auth ,
617- ssl = client ._verify ) as response :
618- try :
619- response .raise_for_status () # Check for any HTTP errors
620- raw_response = await response .text ()
621- except aiohttp .ClientResponseError as e :
622- raise DemistoException (f"Running in interval = { counter } . Error occurred when fetching from Akamai: { e .message } " )
623- demisto .info (f"Running in interval = { counter } . Finished executing request to Akamai, processing" )
624- events : list [str ] = raw_response .split ('\n ' )
625- new_offset = None
626- try :
627- offset_context = events .pop ()
628- loaded_offset_context = json .loads (offset_context )
629- new_offset = loaded_offset_context .get ("offset" )
630- except Exception as e :
631- demisto .error (f"Running in interval = { counter } . Couldn't decode offset with { offset_context = } , reason { e } " )
632- new_offset = offset
633- return events , new_offset
634-
635-
636628''' COMMANDS '''
637629
638630
@@ -763,18 +755,19 @@ async def get_events_from_akamai(client: Client,
763755 demisto .info (f"Running in interval = { counter } . Testing for possible tasks qt overflow." )
764756 await wait_until_tasks_load_decrease (counter , max_concurrent_tasks )
765757 demisto .info (f"Running in interval = { counter } . Finished testing for possible tasks qt overflow." )
766- get_events_task = get_events_with_offset_aiohttp (client , config_ids , offset , page_size , from_epoch , counter = counter )
758+ get_events_task = client .get_events_concurrently (
759+ config_ids , offset , page_size , from_epoch , counter = counter )
767760 events , offset = None , None
768761 events , offset = await get_events_task
769762 demisto .info (f"Running in interval = { counter } . got { len (events )} events and { offset = } ." )
770763 except DemistoException as e :
771764 demisto .error (f"{ e .message } " )
772765 err = str (e )
773766 if "Requested Range Not Satisfiable" in err :
774- err = f"Running in interval = { counter } . Got offset out of range error when attempting to fetch events from" \
775- "Akamai. \n This occurred due to offset pointing to events older than 12 hours which isn't supported by " \
767+ err = f"Running in interval = { counter } . Got offset out of range error when attempting to fetch events from " \
768+ "Akamai.\n This occurred due to offset pointing to events older than 12 hours which isn't supported by " \
776769 f"akamai.\n Restarting fetching events to start from { from_time } ago." \
777- 'For more information, please refer to the Troubleshooting section in the integration documentation.\n ' \
770+ ' For more information, please refer to the Troubleshooting section in the integration documentation.\n ' \
778771 f'original error: [{ err } ]'
779772 offset = None
780773 demisto .updateModuleHealth (err , is_error = True )
0 commit comments