From ab616dc0a83e0bc6407317016bee33ca1fd1d5fc Mon Sep 17 00:00:00 2001 From: B-Step62 Date: Mon, 3 Feb 2025 10:06:12 +0900 Subject: [PATCH] Add an configuration option to make callback logging synchronous Signed-off-by: B-Step62 --- litellm/__init__.py | 2 + litellm/batches/batch_utils.py | 11 ++-- litellm/caching/caching_handler.py | 22 +++++--- .../litellm_core_utils/streaming_handler.py | 52 ++++++++++++------- .../pass_through_endpoints/success_handler.py | 1 + litellm/utils.py | 18 ++++--- 6 files changed, 69 insertions(+), 37 deletions(-) diff --git a/litellm/__init__.py b/litellm/__init__.py index 506ecb258e01..301c882c4774 100644 --- a/litellm/__init__.py +++ b/litellm/__init__.py @@ -112,6 +112,8 @@ callbacks: List[ Union[Callable, _custom_logger_compatible_callbacks_literal, CustomLogger] ] = [] +# If true, callbacks will be executed synchronously (blocking). +sync_logging: bool = False langfuse_default_tags: Optional[List[str]] = None langsmith_batch_size: Optional[int] = None prometheus_initialize_budget_metrics: Optional[bool] = False diff --git a/litellm/batches/batch_utils.py b/litellm/batches/batch_utils.py index f24eda043225..acdfc5252fe8 100644 --- a/litellm/batches/batch_utils.py +++ b/litellm/batches/batch_utils.py @@ -129,10 +129,13 @@ async def _log_completed_batch( cache_hit=None, ) ) - threading.Thread( - target=logging_obj.success_handler, - args=(None, start_time, end_time), - ).start() + if litellm.sync_logging: + logging_obj.success_handler(None, start_time, end_time) + else: + threading.Thread( + target=logging_obj.success_handler, + args=(None, start_time, end_time), + ).start() async def _batch_cost_calculator( diff --git a/litellm/caching/caching_handler.py b/litellm/caching/caching_handler.py index 40c100173203..0cd88911b179 100644 --- a/litellm/caching/caching_handler.py +++ b/litellm/caching/caching_handler.py @@ -280,10 +280,13 @@ def _sync_get_cache( is_async=False, ) - threading.Thread( - target=logging_obj.success_handler, - args=(cached_result, start_time, end_time, cache_hit), - ).start() + if litellm.sync_logging: + logging_obj.success_handler(cached_result, start_time, end_time, True) + else: + threading.Thread( + target=logging_obj.success_handler, + args=(cached_result, start_time, end_time, cache_hit), + ).start() cache_key = litellm.cache._get_preset_cache_key_from_kwargs( **kwargs ) @@ -449,10 +452,13 @@ def _async_log_cache_hit_on_callbacks( cached_result, start_time, end_time, cache_hit ) ) - threading.Thread( - target=logging_obj.success_handler, - args=(cached_result, start_time, end_time, cache_hit), - ).start() + if litellm.sync_logging: + logging_obj.success_handler(cached_result, start_time, end_time, cache_hit) + else: + threading.Thread( + target=logging_obj.success_handler, + args=(cached_result, start_time, end_time, cache_hit), + ).start() async def _retrieve_from_cache( self, call_type: str, kwargs: Dict[str, Any], args: Tuple[Any, ...] diff --git a/litellm/litellm_core_utils/streaming_handler.py b/litellm/litellm_core_utils/streaming_handler.py index 08356fea73aa..a0a148d4d373 100644 --- a/litellm/litellm_core_utils/streaming_handler.py +++ b/litellm/litellm_core_utils/streaming_handler.py @@ -1427,10 +1427,13 @@ def __next__(self): # noqa: PLR0915 if response is None: continue ## LOGGING - threading.Thread( - target=self.run_success_logging_and_cache_storage, - args=(response, cache_hit), - ).start() # log response + if litellm.sync_logging: + self.run_success_logging_and_cache_storage(response, cache_hit) + else: + threading.Thread( + target=self.run_success_logging_and_cache_storage, + args=(response, cache_hit), + ).start() # log response choice = response.choices[0] if isinstance(choice, StreamingChoices): self.response_uptil_now += choice.delta.get("content", "") or "" @@ -1476,10 +1479,13 @@ def __next__(self): # noqa: PLR0915 ) ## LOGGING - threading.Thread( - target=self.logging_obj.success_handler, - args=(response, None, None, cache_hit), - ).start() # log response + if litellm.sync_logging: + self.logging_obj.success_handler(response, None, None, cache_hit) + else: + threading.Thread( + target=self.logging_obj.success_handler, + args=(response, None, None, cache_hit), + ).start() # log response if self.sent_stream_usage is False and self.send_stream_usage is True: self.sent_stream_usage = True @@ -1492,10 +1498,13 @@ def __next__(self): # noqa: PLR0915 usage = calculate_total_usage(chunks=self.chunks) processed_chunk._hidden_params["usage"] = usage ## LOGGING - threading.Thread( - target=self.run_success_logging_and_cache_storage, - args=(processed_chunk, cache_hit), - ).start() # log response + if litellm.sync_logging: + self.run_success_logging_and_cache_storage(processed_chunk, cache_hit) + else: + threading.Thread( + target=self.run_success_logging_and_cache_storage, + args=(processed_chunk, cache_hit), + ).start() # log response return processed_chunk except Exception as e: traceback_exception = traceback.format_exc() @@ -1654,13 +1663,18 @@ async def __anext__(self): # noqa: PLR0915 ) ) - executor.submit( - self.logging_obj.success_handler, - complete_streaming_response, - cache_hit=cache_hit, - start_time=None, - end_time=None, - ) + if litellm.sync_logging: + self.logging_obj.success_handler( + complete_streaming_response, None, None, cache_hit + ) + else: + executor.submit( + self.logging_obj.success_handler, + complete_streaming_response, + cache_hit=cache_hit, + start_time=None, + end_time=None, + ) raise StopAsyncIteration # Re-raise StopIteration else: diff --git a/litellm/proxy/pass_through_endpoints/success_handler.py b/litellm/proxy/pass_through_endpoints/success_handler.py index 6f112aed1fd4..527121ee6b34 100644 --- a/litellm/proxy/pass_through_endpoints/success_handler.py +++ b/litellm/proxy/pass_through_endpoints/success_handler.py @@ -83,6 +83,7 @@ async def pass_through_async_success_handler( standard_logging_response_object = StandardPassThroughResponseObject( response=httpx_response.text ) + thread_pool_executor.submit( logging_obj.success_handler, standard_logging_response_object, # Positional argument 1 diff --git a/litellm/utils.py b/litellm/utils.py index 7197862e3a9a..eff973dadf38 100644 --- a/litellm/utils.py +++ b/litellm/utils.py @@ -1082,12 +1082,18 @@ def wrapper(*args, **kwargs): # noqa: PLR0915 # LOG SUCCESS - handle streaming success logging in the _next_ object, remove `handle_success` once it's deprecated verbose_logger.info("Wrapper: Completed Call, calling success_handler") - executor.submit( - logging_obj.success_handler, - result, - start_time, - end_time, - ) + if litellm.sync_logging: + print("sync_logging") + logging_obj.success_handler(result, start_time, end_time) + else: + print("async_logging") + executor.submit( + logging_obj.success_handler, + result, + start_time, + end_time, + ) + # RETURN RESULT update_response_metadata( result=result,