Skip to content

Commit ab616dc

Browse files
committed
Add an configuration option to make callback logging synchronous
Signed-off-by: B-Step62 <[email protected]>
1 parent 8ba60bf commit ab616dc

File tree

6 files changed

+69
-37
lines changed

6 files changed

+69
-37
lines changed

Diff for: litellm/__init__.py

+2
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,8 @@
112112
callbacks: List[
113113
Union[Callable, _custom_logger_compatible_callbacks_literal, CustomLogger]
114114
] = []
115+
# If true, callbacks will be executed synchronously (blocking).
116+
sync_logging: bool = False
115117
langfuse_default_tags: Optional[List[str]] = None
116118
langsmith_batch_size: Optional[int] = None
117119
prometheus_initialize_budget_metrics: Optional[bool] = False

Diff for: litellm/batches/batch_utils.py

+7-4
Original file line numberDiff line numberDiff line change
@@ -129,10 +129,13 @@ async def _log_completed_batch(
129129
cache_hit=None,
130130
)
131131
)
132-
threading.Thread(
133-
target=logging_obj.success_handler,
134-
args=(None, start_time, end_time),
135-
).start()
132+
if litellm.sync_logging:
133+
logging_obj.success_handler(None, start_time, end_time)
134+
else:
135+
threading.Thread(
136+
target=logging_obj.success_handler,
137+
args=(None, start_time, end_time),
138+
).start()
136139

137140

138141
async def _batch_cost_calculator(

Diff for: litellm/caching/caching_handler.py

+14-8
Original file line numberDiff line numberDiff line change
@@ -280,10 +280,13 @@ def _sync_get_cache(
280280
is_async=False,
281281
)
282282

283-
threading.Thread(
284-
target=logging_obj.success_handler,
285-
args=(cached_result, start_time, end_time, cache_hit),
286-
).start()
283+
if litellm.sync_logging:
284+
logging_obj.success_handler(cached_result, start_time, end_time, True)
285+
else:
286+
threading.Thread(
287+
target=logging_obj.success_handler,
288+
args=(cached_result, start_time, end_time, cache_hit),
289+
).start()
287290
cache_key = litellm.cache._get_preset_cache_key_from_kwargs(
288291
**kwargs
289292
)
@@ -449,10 +452,13 @@ def _async_log_cache_hit_on_callbacks(
449452
cached_result, start_time, end_time, cache_hit
450453
)
451454
)
452-
threading.Thread(
453-
target=logging_obj.success_handler,
454-
args=(cached_result, start_time, end_time, cache_hit),
455-
).start()
455+
if litellm.sync_logging:
456+
logging_obj.success_handler(cached_result, start_time, end_time, cache_hit)
457+
else:
458+
threading.Thread(
459+
target=logging_obj.success_handler,
460+
args=(cached_result, start_time, end_time, cache_hit),
461+
).start()
456462

457463
async def _retrieve_from_cache(
458464
self, call_type: str, kwargs: Dict[str, Any], args: Tuple[Any, ...]

Diff for: litellm/litellm_core_utils/streaming_handler.py

+33-19
Original file line numberDiff line numberDiff line change
@@ -1427,10 +1427,13 @@ def __next__(self): # noqa: PLR0915
14271427
if response is None:
14281428
continue
14291429
## LOGGING
1430-
threading.Thread(
1431-
target=self.run_success_logging_and_cache_storage,
1432-
args=(response, cache_hit),
1433-
).start() # log response
1430+
if litellm.sync_logging:
1431+
self.run_success_logging_and_cache_storage(response, cache_hit)
1432+
else:
1433+
threading.Thread(
1434+
target=self.run_success_logging_and_cache_storage,
1435+
args=(response, cache_hit),
1436+
).start() # log response
14341437
choice = response.choices[0]
14351438
if isinstance(choice, StreamingChoices):
14361439
self.response_uptil_now += choice.delta.get("content", "") or ""
@@ -1476,10 +1479,13 @@ def __next__(self): # noqa: PLR0915
14761479
)
14771480

14781481
## LOGGING
1479-
threading.Thread(
1480-
target=self.logging_obj.success_handler,
1481-
args=(response, None, None, cache_hit),
1482-
).start() # log response
1482+
if litellm.sync_logging:
1483+
self.logging_obj.success_handler(response, None, None, cache_hit)
1484+
else:
1485+
threading.Thread(
1486+
target=self.logging_obj.success_handler,
1487+
args=(response, None, None, cache_hit),
1488+
).start() # log response
14831489

14841490
if self.sent_stream_usage is False and self.send_stream_usage is True:
14851491
self.sent_stream_usage = True
@@ -1492,10 +1498,13 @@ def __next__(self): # noqa: PLR0915
14921498
usage = calculate_total_usage(chunks=self.chunks)
14931499
processed_chunk._hidden_params["usage"] = usage
14941500
## LOGGING
1495-
threading.Thread(
1496-
target=self.run_success_logging_and_cache_storage,
1497-
args=(processed_chunk, cache_hit),
1498-
).start() # log response
1501+
if litellm.sync_logging:
1502+
self.run_success_logging_and_cache_storage(processed_chunk, cache_hit)
1503+
else:
1504+
threading.Thread(
1505+
target=self.run_success_logging_and_cache_storage,
1506+
args=(processed_chunk, cache_hit),
1507+
).start() # log response
14991508
return processed_chunk
15001509
except Exception as e:
15011510
traceback_exception = traceback.format_exc()
@@ -1654,13 +1663,18 @@ async def __anext__(self): # noqa: PLR0915
16541663
)
16551664
)
16561665

1657-
executor.submit(
1658-
self.logging_obj.success_handler,
1659-
complete_streaming_response,
1660-
cache_hit=cache_hit,
1661-
start_time=None,
1662-
end_time=None,
1663-
)
1666+
if litellm.sync_logging:
1667+
self.logging_obj.success_handler(
1668+
complete_streaming_response, None, None, cache_hit
1669+
)
1670+
else:
1671+
executor.submit(
1672+
self.logging_obj.success_handler,
1673+
complete_streaming_response,
1674+
cache_hit=cache_hit,
1675+
start_time=None,
1676+
end_time=None,
1677+
)
16641678

16651679
raise StopAsyncIteration # Re-raise StopIteration
16661680
else:

Diff for: litellm/proxy/pass_through_endpoints/success_handler.py

+1
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ async def pass_through_async_success_handler(
8383
standard_logging_response_object = StandardPassThroughResponseObject(
8484
response=httpx_response.text
8585
)
86+
8687
thread_pool_executor.submit(
8788
logging_obj.success_handler,
8889
standard_logging_response_object, # Positional argument 1

Diff for: litellm/utils.py

+12-6
Original file line numberDiff line numberDiff line change
@@ -1082,12 +1082,18 @@ def wrapper(*args, **kwargs): # noqa: PLR0915
10821082

10831083
# LOG SUCCESS - handle streaming success logging in the _next_ object, remove `handle_success` once it's deprecated
10841084
verbose_logger.info("Wrapper: Completed Call, calling success_handler")
1085-
executor.submit(
1086-
logging_obj.success_handler,
1087-
result,
1088-
start_time,
1089-
end_time,
1090-
)
1085+
if litellm.sync_logging:
1086+
print("sync_logging")
1087+
logging_obj.success_handler(result, start_time, end_time)
1088+
else:
1089+
print("async_logging")
1090+
executor.submit(
1091+
logging_obj.success_handler,
1092+
result,
1093+
start_time,
1094+
end_time,
1095+
)
1096+
10911097
# RETURN RESULT
10921098
update_response_metadata(
10931099
result=result,

0 commit comments

Comments
 (0)