Skip to content

Commit

Permalink
feat(logging): implement thread-safe logging handler with asyncio int…
Browse files Browse the repository at this point in the history
…egration
  • Loading branch information
mert-ergun committed Feb 27, 2025
1 parent 298d864 commit 8bebffe
Showing 1 changed file with 61 additions and 2 deletions.
63 changes: 61 additions & 2 deletions crossbar_llm/backend/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,67 @@ def get_csrf_token(csrf_protect: CsrfProtect = Depends()):

# Create a custom logging handler
class AsyncQueueHandler(logging.Handler):
def __init__(self):
super().__init__()
# Create a thread-safe queue for log messages
self.log_messages = queue.Queue()
# Start a worker thread to process log messages
self.worker_thread = threading.Thread(target=self._worker, daemon=True)
self.worker_thread.start()
self.running = True

def emit(self, record):
if not self.running:
return
log_entry = self.format(record)
# Use asyncio.create_task to avoid blocking
asyncio.create_task(log_queue.put(log_entry))
# Add to the thread-safe queue
self.log_messages.put(log_entry)

def _worker(self):
"""Worker thread that transfers messages from the thread-safe queue to the asyncio queue."""
while self.running:
try:
# Get message from the thread-safe queue (blocking with timeout)
log_entry = self.log_messages.get(timeout=0.5)

# Try to transfer to the asyncio queue
try:
# Check if we're in the main thread where an event loop might be running
if threading.current_thread() is threading.main_thread():
try:
loop = asyncio.get_event_loop()
if loop.is_running():
loop.create_task(log_queue.put(log_entry))
else:
# We're in the main thread but no loop is running
# Just store it for later retrieval
asyncio.run(log_queue.put(log_entry))
except RuntimeError:
# No event loop, just store the entry for later
pass
else:
# We're in a worker thread, can't directly interact with asyncio
# Store the entry for later retrieval by the main event loop
pass
except Exception as e:
# If any asyncio error occurs, just continue
pass

# Mark the task as done in the thread-safe queue
self.log_messages.task_done()

except queue.Empty:
# Timeout on queue.get(), just continue the loop
continue
except Exception:
# Any other error, just continue
continue

def close(self):
self.running = False
if self.worker_thread.is_alive():
self.worker_thread.join(timeout=1.0)
super().close()

# Modify the existing logging setup
def setup_logging(verbose=False):
Expand Down Expand Up @@ -424,4 +481,6 @@ def get_neo4j_statistics():
@app.on_event("startup")
async def startup_event():
setup_logging(verbose=False)
# Initialize the event loop for threading usage
asyncio.get_event_loop_policy().get_event_loop()
Logger.info("API server started")

0 comments on commit 8bebffe

Please sign in to comment.