Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a configuration option to make callback logging synchronous #8202

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions litellm/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@
callbacks: List[
Union[Callable, _custom_logger_compatible_callbacks_literal, CustomLogger]
] = []
# If true, callbacks will be executed synchronously (blocking).
sync_logging: bool = False
langfuse_default_tags: Optional[List[str]] = None
langsmith_batch_size: Optional[int] = None
prometheus_initialize_budget_metrics: Optional[bool] = False
Expand Down
5 changes: 1 addition & 4 deletions litellm/batches/batch_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,10 +129,7 @@ async def _log_completed_batch(
cache_hit=None,
)
)
threading.Thread(
target=logging_obj.success_handler,
args=(None, start_time, end_time),
).start()
logging_obj.success_handler(None, start_time, end_time)


async def _batch_cost_calculator(
Expand Down
10 changes: 2 additions & 8 deletions litellm/caching/caching_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,10 +280,7 @@ def _sync_get_cache(
is_async=False,
)

threading.Thread(
target=logging_obj.success_handler,
args=(cached_result, start_time, end_time, cache_hit),
).start()
logging_obj.success_handler(cached_result, start_time, end_time, True)
cache_key = litellm.cache._get_preset_cache_key_from_kwargs(
**kwargs
)
Expand Down Expand Up @@ -449,10 +446,7 @@ def _async_log_cache_hit_on_callbacks(
cached_result, start_time, end_time, cache_hit
)
)
threading.Thread(
target=logging_obj.success_handler,
args=(cached_result, start_time, end_time, cache_hit),
).start()
logging_obj.success_handler(cached_result, start_time, end_time, cache_hit)

async def _retrieve_from_cache(
self, call_type: str, kwargs: Dict[str, Any], args: Tuple[Any, ...]
Expand Down
23 changes: 21 additions & 2 deletions litellm/litellm_core_utils/litellm_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@
import re
import subprocess
import sys
import threading
import time
import traceback
import uuid
from datetime import datetime as dt_object
from functools import lru_cache
from typing import Any, Callable, Dict, List, Literal, Optional, Tuple, Union, cast
from typing import Any, Callable, Dict, List, Literal, Optional, Sequence, Tuple, Union, cast

from pydantic import BaseModel

Expand Down Expand Up @@ -1008,7 +1009,23 @@ def _success_handler_helper_fn(
except Exception as e:
raise Exception(f"[Non-Blocking] LiteLLM.Success_Call Error: {str(e)}")

def success_handler( # noqa: PLR0915
def success_handler(self, *args, synchronous: Optional[bool]=None):
"""
Execute the success handler function in a sync or async manner.
If synchronous argument is not provided, global `litellm.sync_logging` config is used.
"""
if synchronous is None:
synchronous = litellm.sync_logging

if synchronous:
self._success_handler(*args)
else:
threading.Thread(
target=self._success_handler,
args=args,
).start()

def _success_handler( # noqa: PLR0915
self, result=None, start_time=None, end_time=None, cache_hit=None, **kwargs
):
verbose_logger.debug(
Expand Down Expand Up @@ -2151,6 +2168,8 @@ def handle_sync_success_callbacks_for_async_calls(
result,
start_time,
end_time,
# NB: Since we already run this in a TPE, the handler itself can run sync
synchronous=True,
)

def _should_run_sync_callbacks_for_async_calls(self) -> bool:
Expand Down
103 changes: 55 additions & 48 deletions litellm/litellm_core_utils/streaming_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1348,38 +1348,47 @@ def run_success_logging_and_cache_storage(self, processed_chunk, cache_hit: bool
"""
Runs success logging in a thread and adds the response to the cache
"""
if litellm.disable_streaming_logging is True:
"""
[NOT RECOMMENDED]
Set this via `litellm.disable_streaming_logging = True`.
def _run():
if litellm.disable_streaming_logging is True:
"""
[NOT RECOMMENDED]
Set this via `litellm.disable_streaming_logging = True`.

Disables streaming logging.
"""
return

Disables streaming logging.
"""
return
## ASYNC LOGGING
# Create an event loop for the new thread
if self.logging_loop is not None:
future = asyncio.run_coroutine_threadsafe(
self.logging_obj.async_success_handler(
processed_chunk, None, None, cache_hit
),
loop=self.logging_loop,
)
future.result()
else:
asyncio.run(
self.logging_obj.async_success_handler(
processed_chunk, None, None, cache_hit
if not litellm.sync_logging:
## ASYNC LOGGING
# Create an event loop for the new thread
if self.logging_loop is not None:
future = asyncio.run_coroutine_threadsafe(
self.logging_obj.async_success_handler(
processed_chunk, None, None, cache_hit
),
loop=self.logging_loop,
)
future.result()
else:
asyncio.run(
self.logging_obj.async_success_handler(
processed_chunk, None, None, cache_hit
)
)

## SYNC LOGGING
self.logging_obj.success_handler(processed_chunk, None, None, cache_hit)

## Sync store in cache
if self.logging_obj._llm_caching_handler is not None:
self.logging_obj._llm_caching_handler._sync_add_streaming_response_to_cache(
processed_chunk
)
)
## SYNC LOGGING
self.logging_obj.success_handler(processed_chunk, None, None, cache_hit)

## Sync store in cache
if self.logging_obj._llm_caching_handler is not None:
self.logging_obj._llm_caching_handler._sync_add_streaming_response_to_cache(
processed_chunk
)
if litellm.sync_logging:
_run()
else:
threading.Thread(target=_run).start()

def finish_reason_handler(self):
model_response = self.model_response_creator()
Expand Down Expand Up @@ -1427,10 +1436,7 @@ def __next__(self): # noqa: PLR0915
if response is None:
continue
## LOGGING
threading.Thread(
target=self.run_success_logging_and_cache_storage,
args=(response, cache_hit),
).start() # log response
self.run_success_logging_and_cache_storage(response, cache_hit)
choice = response.choices[0]
if isinstance(choice, StreamingChoices):
self.response_uptil_now += choice.delta.get("content", "") or ""
Expand Down Expand Up @@ -1476,10 +1482,7 @@ def __next__(self): # noqa: PLR0915
)

## LOGGING
threading.Thread(
target=self.logging_obj.success_handler,
args=(response, None, None, cache_hit),
).start() # log response
self.logging_obj.success_handler(response, None, None, cache_hit)

if self.sent_stream_usage is False and self.send_stream_usage is True:
self.sent_stream_usage = True
Expand All @@ -1492,10 +1495,7 @@ def __next__(self): # noqa: PLR0915
usage = calculate_total_usage(chunks=self.chunks)
processed_chunk._hidden_params["usage"] = usage
## LOGGING
threading.Thread(
target=self.run_success_logging_and_cache_storage,
args=(processed_chunk, cache_hit),
).start() # log response
self.run_success_logging_and_cache_storage(processed_chunk, cache_hit)
return processed_chunk
except Exception as e:
traceback_exception = traceback.format_exc()
Expand Down Expand Up @@ -1654,13 +1654,20 @@ async def __anext__(self): # noqa: PLR0915
)
)

executor.submit(
self.logging_obj.success_handler,
complete_streaming_response,
cache_hit=cache_hit,
start_time=None,
end_time=None,
)
if litellm.sync_logging:
self.logging_obj.success_handler(
complete_streaming_response, None, None, cache_hit
)
else:
executor.submit(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: I don't have a clear context for why TPE is used in some place only, so did not try to remove this if-else and keep the behavior same.

self.logging_obj.success_handler,
complete_streaming_response,
None,
None,
cache_hit,
# NB: We already run this in a TPE so the handler itself should run sync
synchronous=True,
)

raise StopAsyncIteration # Re-raise StopIteration
else:
Expand Down
12 changes: 3 additions & 9 deletions litellm/proxy/pass_through_endpoints/streaming_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,15 +123,9 @@ async def _route_streaming_logging_to_handler(
standard_logging_response_object = StandardPassThroughResponseObject(
response=f"cannot parse chunks to standard response object. Chunks={all_chunks}"
)
threading.Thread(
target=litellm_logging_obj.success_handler,
args=(
standard_logging_response_object,
start_time,
end_time,
False,
),
).start()
litellm_logging_obj.success_handler(
standard_logging_response_object, start_time, end_time, False
)
await litellm_logging_obj.async_success_handler(
result=standard_logging_response_object,
start_time=start_time,
Expand Down
3 changes: 3 additions & 0 deletions litellm/proxy/pass_through_endpoints/success_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,16 @@ async def pass_through_async_success_handler(
standard_logging_response_object = StandardPassThroughResponseObject(
response=httpx_response.text
)

thread_pool_executor.submit(
logging_obj.success_handler,
standard_logging_response_object, # Positional argument 1
start_time, # Positional argument 2
end_time, # Positional argument 3
cache_hit, # Positional argument 4
**kwargs, # Unpacked keyword arguments
# NB: Since we already run this in a TPE, the handler itself can run sync
synchronous=True,
)

await logging_obj.async_success_handler(
Expand Down
8 changes: 2 additions & 6 deletions litellm/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -1082,12 +1082,8 @@ def wrapper(*args, **kwargs): # noqa: PLR0915

# LOG SUCCESS - handle streaming success logging in the _next_ object, remove `handle_success` once it's deprecated
verbose_logger.info("Wrapper: Completed Call, calling success_handler")
executor.submit(
logging_obj.success_handler,
result,
start_time,
end_time,
)
logging_obj.success_handler(result, start_time, end_time)

# RETURN RESULT
update_response_metadata(
result=result,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ def test_dynamic_logging_global_callback():

try:
litellm_logging.success_handler(
result=ModelResponse(
ModelResponse(
id="chatcmpl-5418737b-ab14-420b-b9c5-b278b6681b70",
created=1732306261,
model="claude-3-opus-20240229",
Expand All @@ -288,9 +288,9 @@ def test_dynamic_logging_global_callback():
prompt_tokens_details=None,
),
),
start_time=datetime.now(),
end_time=datetime.now(),
cache_hit=False,
datetime.now(),
datetime.now(),
False,
)
except Exception as e:
print(f"Error: {e}")
Expand Down