Skip to content

Commit

Permalink
Add an configuration option to make callback logging synchronous
Browse files Browse the repository at this point in the history
Signed-off-by: B-Step62 <[email protected]>
  • Loading branch information
B-Step62 committed Feb 3, 2025
1 parent 8ba60bf commit 482180c
Show file tree
Hide file tree
Showing 6 changed files with 87 additions and 52 deletions.
2 changes: 2 additions & 0 deletions litellm/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@
callbacks: List[
Union[Callable, _custom_logger_compatible_callbacks_literal, CustomLogger]
] = []
# If true, callbacks will be executed synchronously (blocking).
sync_logging: bool = False
langfuse_default_tags: Optional[List[str]] = None
langsmith_batch_size: Optional[int] = None
prometheus_initialize_budget_metrics: Optional[bool] = False
Expand Down
11 changes: 7 additions & 4 deletions litellm/batches/batch_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,10 +129,13 @@ async def _log_completed_batch(
cache_hit=None,
)
)
threading.Thread(
target=logging_obj.success_handler,
args=(None, start_time, end_time),
).start()
if litellm.sync_logging:
logging_obj.success_handler(None, start_time, end_time)
else:
threading.Thread(
target=logging_obj.success_handler,
args=(None, start_time, end_time),
).start()


async def _batch_cost_calculator(
Expand Down
22 changes: 14 additions & 8 deletions litellm/caching/caching_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,10 +280,13 @@ def _sync_get_cache(
is_async=False,
)

threading.Thread(
target=logging_obj.success_handler,
args=(cached_result, start_time, end_time, cache_hit),
).start()
if litellm.sync_logging:
logging_obj.success_handler(cached_result, start_time, end_time, True)
else:
threading.Thread(
target=logging_obj.success_handler,
args=(cached_result, start_time, end_time, cache_hit),
).start()
cache_key = litellm.cache._get_preset_cache_key_from_kwargs(
**kwargs
)
Expand Down Expand Up @@ -449,10 +452,13 @@ def _async_log_cache_hit_on_callbacks(
cached_result, start_time, end_time, cache_hit
)
)
threading.Thread(
target=logging_obj.success_handler,
args=(cached_result, start_time, end_time, cache_hit),
).start()
if litellm.sync_logging:
logging_obj.success_handler(cached_result, start_time, end_time, cache_hit)
else:
threading.Thread(
target=logging_obj.success_handler,
args=(cached_result, start_time, end_time, cache_hit),
).start()

async def _retrieve_from_cache(
self, call_type: str, kwargs: Dict[str, Any], args: Tuple[Any, ...]
Expand Down
85 changes: 51 additions & 34 deletions litellm/litellm_core_utils/streaming_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1356,22 +1356,25 @@ def run_success_logging_and_cache_storage(self, processed_chunk, cache_hit: bool
Disables streaming logging.
"""
return
## ASYNC LOGGING
# Create an event loop for the new thread
if self.logging_loop is not None:
future = asyncio.run_coroutine_threadsafe(
self.logging_obj.async_success_handler(
processed_chunk, None, None, cache_hit
),
loop=self.logging_loop,
)
future.result()
else:
asyncio.run(
self.logging_obj.async_success_handler(
processed_chunk, None, None, cache_hit

if not litellm.sync_logging:
## ASYNC LOGGING
# Create an event loop for the new thread
if self.logging_loop is not None:
future = asyncio.run_coroutine_threadsafe(
self.logging_obj.async_success_handler(
processed_chunk, None, None, cache_hit
),
loop=self.logging_loop,
)
)
future.result()
else:
asyncio.run(
self.logging_obj.async_success_handler(
processed_chunk, None, None, cache_hit
)
)

## SYNC LOGGING
self.logging_obj.success_handler(processed_chunk, None, None, cache_hit)

Expand Down Expand Up @@ -1427,10 +1430,13 @@ def __next__(self): # noqa: PLR0915
if response is None:
continue
## LOGGING
threading.Thread(
target=self.run_success_logging_and_cache_storage,
args=(response, cache_hit),
).start() # log response
if litellm.sync_logging:
self.run_success_logging_and_cache_storage(response, cache_hit)
else:
threading.Thread(
target=self.run_success_logging_and_cache_storage,
args=(response, cache_hit),
).start() # log response
choice = response.choices[0]
if isinstance(choice, StreamingChoices):
self.response_uptil_now += choice.delta.get("content", "") or ""
Expand Down Expand Up @@ -1476,10 +1482,13 @@ def __next__(self): # noqa: PLR0915
)

## LOGGING
threading.Thread(
target=self.logging_obj.success_handler,
args=(response, None, None, cache_hit),
).start() # log response
if litellm.sync_logging:
self.logging_obj.success_handler(response, None, None, cache_hit)
else:
threading.Thread(
target=self.logging_obj.success_handler,
args=(response, None, None, cache_hit),
).start() # log response

if self.sent_stream_usage is False and self.send_stream_usage is True:
self.sent_stream_usage = True
Expand All @@ -1492,10 +1501,13 @@ def __next__(self): # noqa: PLR0915
usage = calculate_total_usage(chunks=self.chunks)
processed_chunk._hidden_params["usage"] = usage
## LOGGING
threading.Thread(
target=self.run_success_logging_and_cache_storage,
args=(processed_chunk, cache_hit),
).start() # log response
if litellm.sync_logging:
self.run_success_logging_and_cache_storage(processed_chunk, cache_hit)
else:
threading.Thread(
target=self.run_success_logging_and_cache_storage,
args=(processed_chunk, cache_hit),
).start() # log response
return processed_chunk
except Exception as e:
traceback_exception = traceback.format_exc()
Expand Down Expand Up @@ -1654,13 +1666,18 @@ async def __anext__(self): # noqa: PLR0915
)
)

executor.submit(
self.logging_obj.success_handler,
complete_streaming_response,
cache_hit=cache_hit,
start_time=None,
end_time=None,
)
if litellm.sync_logging:
self.logging_obj.success_handler(
complete_streaming_response, None, None, cache_hit
)
else:
executor.submit(
self.logging_obj.success_handler,
complete_streaming_response,
cache_hit=cache_hit,
start_time=None,
end_time=None,
)

raise StopAsyncIteration # Re-raise StopIteration
else:
Expand Down
1 change: 1 addition & 0 deletions litellm/proxy/pass_through_endpoints/success_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ async def pass_through_async_success_handler(
standard_logging_response_object = StandardPassThroughResponseObject(
response=httpx_response.text
)

thread_pool_executor.submit(
logging_obj.success_handler,
standard_logging_response_object, # Positional argument 1
Expand Down
18 changes: 12 additions & 6 deletions litellm/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -1082,12 +1082,18 @@ def wrapper(*args, **kwargs): # noqa: PLR0915

# LOG SUCCESS - handle streaming success logging in the _next_ object, remove `handle_success` once it's deprecated
verbose_logger.info("Wrapper: Completed Call, calling success_handler")
executor.submit(
logging_obj.success_handler,
result,
start_time,
end_time,
)
if litellm.sync_logging:
print("sync_logging")
logging_obj.success_handler(result, start_time, end_time)
else:
print("async_logging")
executor.submit(
logging_obj.success_handler,
result,
start_time,
end_time,
)

# RETURN RESULT
update_response_metadata(
result=result,
Expand Down

0 comments on commit 482180c

Please sign in to comment.