Skip to content

Add a configuration option to make callback logging synchronous #8202

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions litellm/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@
callbacks: List[
Union[Callable, _custom_logger_compatible_callbacks_literal, CustomLogger]
] = []
# If true, callbacks will be executed synchronously (blocking).
sync_logging: bool = False
langfuse_default_tags: Optional[List[str]] = None
langsmith_batch_size: Optional[int] = None
prometheus_initialize_budget_metrics: Optional[bool] = False
Expand Down
10 changes: 2 additions & 8 deletions litellm/caching/caching_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,10 +279,7 @@ def _sync_get_cache(
is_async=False,
)

threading.Thread(
target=logging_obj.success_handler,
args=(cached_result, start_time, end_time, cache_hit),
).start()
logging_obj.success_handler(cached_result, start_time, end_time, True)
cache_key = litellm.cache._get_preset_cache_key_from_kwargs(
**kwargs
)
Expand Down Expand Up @@ -448,10 +445,7 @@ def _async_log_cache_hit_on_callbacks(
cached_result, start_time, end_time, cache_hit
)
)
threading.Thread(
target=logging_obj.success_handler,
args=(cached_result, start_time, end_time, cache_hit),
).start()
logging_obj.success_handler(cached_result, start_time, end_time, cache_hit)

async def _retrieve_from_cache(
self, call_type: str, kwargs: Dict[str, Any], args: Tuple[Any, ...]
Expand Down
6 changes: 5 additions & 1 deletion litellm/integrations/mlflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,11 @@ def _handle_stream_event(self, kwargs, response_obj, start_time, end_time):

# If this is the final chunk, end the span. The final chunk
# has complete_streaming_response that gathers the full response.
if final_response := kwargs.get("complete_streaming_response"):
final_response = (
kwargs.get("complete_streaming_response")
or kwargs.get("async_complete_streaming_response")
)
if final_response:
end_time_ns = int(end_time.timestamp() * 1e9)

self._extract_and_set_chat_attributes(span, kwargs, final_response)
Expand Down
41 changes: 33 additions & 8 deletions litellm/litellm_core_utils/litellm_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@
import re
import subprocess
import sys
import threading
import time
import traceback
import uuid
from datetime import datetime as dt_object
from functools import lru_cache
from typing import Any, Callable, Dict, List, Literal, Optional, Tuple, Union, cast
from typing import Any, Callable, Dict, List, Literal, Optional, Sequence, Tuple, Union, cast

from pydantic import BaseModel

Expand Down Expand Up @@ -1083,7 +1084,36 @@ def _success_handler_helper_fn(
except Exception as e:
raise Exception(f"[Non-Blocking] LiteLLM.Success_Call Error: {str(e)}")

def success_handler( # noqa: PLR0915
def success_handler(
self,
result=None,
start_time=None,
end_time=None,
cache_hit=None,
synchronous=None,
**kwargs
):
"""
Execute the success handler function in a sync or async manner.
If synchronous argument is not provided, global `litellm.sync_logging` config is used.
"""
if synchronous is None:
synchronous = litellm.sync_logging

if synchronous:
self._success_handler(result, start_time, end_time, cache_hit, **kwargs)
else:
executor.submit(
self._success_handler,
result,
start_time,
end_time,
cache_hit,
**kwargs,
)


def _success_handler( # noqa: PLR0915
self, result=None, start_time=None, end_time=None, cache_hit=None, **kwargs
):
verbose_logger.debug(
Expand Down Expand Up @@ -2234,12 +2264,7 @@ def handle_sync_success_callbacks_for_async_calls(
if self._should_run_sync_callbacks_for_async_calls() is False:
return

executor.submit(
self.success_handler,
result,
start_time,
end_time,
)
self.success_handler(result, start_time, end_time)

def _should_run_sync_callbacks_for_async_calls(self) -> bool:
"""
Expand Down
114 changes: 55 additions & 59 deletions litellm/litellm_core_utils/streaming_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1494,32 +1494,47 @@ def run_success_logging_and_cache_storage(self, processed_chunk, cache_hit: bool
"""
Runs success logging in a thread and adds the response to the cache
"""
if litellm.disable_streaming_logging is True:
"""
[NOT RECOMMENDED]
Set this via `litellm.disable_streaming_logging = True`.
def _run():
if litellm.disable_streaming_logging is True:
"""
[NOT RECOMMENDED]
Set this via `litellm.disable_streaming_logging = True`.

Disables streaming logging.
"""
return

Disables streaming logging.
"""
return
## ASYNC LOGGING
# Create an event loop for the new thread
if self.logging_loop is not None:
future = asyncio.run_coroutine_threadsafe(
self.logging_obj.async_success_handler(
processed_chunk, None, None, cache_hit
),
loop=self.logging_loop,
)
future.result()
else:
asyncio.run(
self.logging_obj.async_success_handler(
processed_chunk, None, None, cache_hit
if not litellm.sync_logging:
## ASYNC LOGGING
# Create an event loop for the new thread
if self.logging_loop is not None:
future = asyncio.run_coroutine_threadsafe(
self.logging_obj.async_success_handler(
processed_chunk, None, None, cache_hit
),
loop=self.logging_loop,
)
future.result()
else:
asyncio.run(
self.logging_obj.async_success_handler(
processed_chunk, None, None, cache_hit
)
)

## SYNC LOGGING
self.logging_obj.success_handler(processed_chunk, None, None, cache_hit)

## Sync store in cache
if self.logging_obj._llm_caching_handler is not None:
self.logging_obj._llm_caching_handler._sync_add_streaming_response_to_cache(
processed_chunk
)
)
## SYNC LOGGING
self.logging_obj.success_handler(processed_chunk, None, None, cache_hit)

if litellm.sync_logging:
_run()
else:
executor.submit(_run)

def finish_reason_handler(self):
model_response = self.model_response_creator()
Expand Down Expand Up @@ -1567,11 +1582,8 @@ def __next__(self): # noqa: PLR0915
if response is None:
continue
## LOGGING
executor.submit(
self.run_success_logging_and_cache_storage,
response,
cache_hit,
) # log response
self.run_success_logging_and_cache_storage(response, cache_hit)

choice = response.choices[0]
if isinstance(choice, StreamingChoices):
self.response_uptil_now += choice.delta.get("content", "") or ""
Expand Down Expand Up @@ -1621,21 +1633,12 @@ def __next__(self): # noqa: PLR0915
),
cache_hit=cache_hit,
)
executor.submit(
self.logging_obj.success_handler,
complete_streaming_response.model_copy(deep=True),
None,
None,
cache_hit,
)
logging_result = complete_streaming_response.model_copy(deep=True)
else:
executor.submit(
self.logging_obj.success_handler,
response,
None,
None,
cache_hit,
)
logging_result = response

self.logging_obj.success_handler(logging_result, None, None, cache_hit)

if self.sent_stream_usage is False and self.send_stream_usage is True:
self.sent_stream_usage = True
return response
Expand All @@ -1647,11 +1650,7 @@ def __next__(self): # noqa: PLR0915
usage = calculate_total_usage(chunks=self.chunks)
processed_chunk._hidden_params["usage"] = usage
## LOGGING
executor.submit(
self.run_success_logging_and_cache_storage,
processed_chunk,
cache_hit,
) # log response
self.run_success_logging_and_cache_storage(processed_chunk, cache_hit)
return processed_chunk
except Exception as e:
traceback_exception = traceback.format_exc()
Expand Down Expand Up @@ -1802,22 +1801,19 @@ async def __anext__(self): # noqa: PLR0915
self.sent_stream_usage = True
return response

asyncio.create_task(
self.logging_obj.async_success_handler(
complete_streaming_response,
cache_hit=cache_hit,
start_time=None,
end_time=None,
)
)

executor.submit(
self.logging_obj.success_handler,
complete_streaming_response,
logging_params = dict(
result=complete_streaming_response,
cache_hit=cache_hit,
start_time=None,
end_time=None,
)
if litellm.sync_logging:
await self.logging_obj.async_success_handler(**logging_params)
else:
asyncio.create_task(self.logging_obj.async_success_handler(**logging_params))

self.logging_obj.success_handler(**logging_params)

raise StopAsyncIteration # Re-raise StopIteration
else:
Expand Down
1 change: 0 additions & 1 deletion litellm/proxy/_experimental/out/onboarding.html

This file was deleted.

12 changes: 3 additions & 9 deletions litellm/proxy/pass_through_endpoints/streaming_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,15 +123,9 @@ async def _route_streaming_logging_to_handler(
standard_logging_response_object = StandardPassThroughResponseObject(
response=f"cannot parse chunks to standard response object. Chunks={all_chunks}"
)
threading.Thread(
target=litellm_logging_obj.success_handler,
args=(
standard_logging_response_object,
start_time,
end_time,
False,
),
).start()
litellm_logging_obj.success_handler(
standard_logging_response_object, start_time, end_time, False
)
await litellm_logging_obj.async_success_handler(
result=standard_logging_response_object,
start_time=start_time,
Expand Down
3 changes: 1 addition & 2 deletions litellm/responses/streaming_iterator.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,7 @@ def _handle_logging_completed_response(self):
)
)

executor.submit(
self.logging_obj.success_handler,
self.logging_obj.success_handler(
result=self.completed_response,
cache_hit=None,
start_time=self.start_time,
Expand Down
31 changes: 14 additions & 17 deletions litellm/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -788,9 +788,7 @@ async def _client_async_logging_helper(
f"Async Wrapper: Completed Call, calling async_success_handler: {logging_obj.async_success_handler}"
)
# check if user does not want this to be logged
asyncio.create_task(
logging_obj.async_success_handler(result, start_time, end_time)
)
await logging_obj.async_success_handler(result, start_time, end_time)
logging_obj.handle_sync_success_callbacks_for_async_calls(
result=result,
start_time=start_time,
Expand Down Expand Up @@ -1163,12 +1161,8 @@ def wrapper(*args, **kwargs): # noqa: PLR0915

# LOG SUCCESS - handle streaming success logging in the _next_ object, remove `handle_success` once it's deprecated
verbose_logger.info("Wrapper: Completed Call, calling success_handler")
executor.submit(
logging_obj.success_handler,
result,
start_time,
end_time,
)
logging_obj.success_handler(result, start_time, end_time)
Copy link
Contributor

@krrishdholakia krrishdholakia Feb 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if moving to success_handler, make sure that uses an executor, so as to not cause high cpu consumption @B-Step62

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the catch, updated.


# RETURN RESULT
update_response_metadata(
result=result,
Expand Down Expand Up @@ -1336,15 +1330,18 @@ async def wrapper_async(*args, **kwargs): # noqa: PLR0915
)

# LOG SUCCESS - handle streaming success logging in the _next_ object
asyncio.create_task(
_client_async_logging_helper(
logging_obj=logging_obj,
result=result,
start_time=start_time,
end_time=end_time,
is_completion_with_fallbacks=is_completion_with_fallbacks,
)
async_logging_params = dict(
logging_obj=logging_obj,
result=result,
start_time=start_time,
end_time=end_time,
is_completion_with_fallbacks=is_completion_with_fallbacks,
)
if litellm.sync_logging:
await _client_async_logging_helper(**async_logging_params)
else:
asyncio.create_task(_client_async_logging_helper(**async_logging_params))

logging_obj.handle_sync_success_callbacks_for_async_calls(
result=result,
start_time=start_time,
Expand Down
Loading
Loading