Skip to content

Commit

Permalink
Log Streaming (#1663)
Browse files Browse the repository at this point in the history
* Fix agent bug, remove alias

* Allow ws on logs
  • Loading branch information
NolanTrem authored Dec 6, 2024
1 parent ab1d205 commit e9e156d
Show file tree
Hide file tree
Showing 8 changed files with 574 additions and 210 deletions.
89 changes: 89 additions & 0 deletions py/core/main/api/templates/log_viewer.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
<!DOCTYPE html>
<html>
<head>
<title>R2R Log Viewer</title>
<style>
body {
margin: 20px;
font-family: monospace;
background: #f8f9fa;
}
#logs {
white-space: pre-wrap;
background: white;
padding: 20px;
border-radius: 4px;
height: 80vh;
overflow-y: auto;
border: 1px solid #e9ecef;
box-shadow: 0 2px 4px rgba(0,0,0,0.05);
}
.log-entry {
margin: 2px 0;
border-bottom: 1px solid #f0f0f0;
}
.status {
color: #666;
font-style: italic;
}
</style>
</head>
<body>
<h2>R2R Log Viewer</h2>
<div id="logs"><span class="status">Connecting to log stream...</span></div>

<!-- Include ansi_up via a CDN -->
<script src="https://cdn.jsdelivr.net/npm/[email protected]/ansi_up.min.js"></script>
<script>
let ws = null;
let ansi_up = new AnsiUp();

function connect() {
if (ws) {
ws.close();
}

ws = new WebSocket(`ws://${window.location.host}/v3/logs/stream`);

ws.onmessage = function(event) {
const logsDiv = document.getElementById("logs");
const newEntry = document.createElement('div');
newEntry.className = 'log-entry';

// Convert ANSI to HTML
const htmlContent = ansi_up.ansi_to_html(event.data);
newEntry.innerHTML = htmlContent;
logsDiv.appendChild(newEntry);

// Keep only the last 1000 entries
while (logsDiv.children.length > 1000) {
logsDiv.removeChild(logsDiv.firstChild);
}

logsDiv.scrollTop = logsDiv.scrollHeight;
};

ws.onclose = function() {
const logsDiv = document.getElementById("logs");
const msg = document.createElement('div');
msg.className = 'status';
msg.textContent = 'Connection lost. Reconnecting...';
logsDiv.appendChild(msg);
setTimeout(connect, 1000);
};

ws.onerror = function(err) {
console.error('WebSocket error:', err);
};
}

connect();

window.onbeforeunload = function() {
if (ws) {
ws.close();
}
};
</script>
</body>
</html>
99 changes: 99 additions & 0 deletions py/core/main/api/v3/logs_router.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
from fastapi import WebSocket
import contextlib
from fastapi.requests import Request
from fastapi.templating import Jinja2Templates
from pathlib import Path
import asyncio
import logging
import aiofiles

from core.providers import (
HatchetOrchestrationProvider,
SimpleOrchestrationProvider,
)
from core.base.logger.base import RunType
from .base_router import BaseRouterV3


class LogsRouter(BaseRouterV3):
def __init__(
self,
providers,
services,
orchestration_provider: (
HatchetOrchestrationProvider | SimpleOrchestrationProvider
),
run_type: RunType = RunType.UNSPECIFIED,
):
super().__init__(providers, services, orchestration_provider, run_type)
CURRENT_DIR = Path(__file__).resolve().parent
TEMPLATES_DIR = CURRENT_DIR.parent / "templates"
self.templates = Jinja2Templates(directory=str(TEMPLATES_DIR))
self.services = services
self.log_file = Path.cwd() / "logs" / "app.log"
self.log_file.parent.mkdir(exist_ok=True)
if not self.log_file.exists():
self.log_file.touch(mode=0o666)

# Start from the beginning of the file
self.last_position = 0

async def read_full_file(self) -> str:
"""Read the entire log file from the start."""
if not self.log_file.exists():
return "Initializing logging system..."

try:
async with aiofiles.open(self.log_file, mode="r") as f:
# Start at beginning
await f.seek(0)
full_content = await f.read()
# Move last_position to end of file after reading full content
self.last_position = await f.tell()
return full_content
except Exception as e:
logging.error(f"Error reading full logs: {str(e)}")
return f"Error accessing full log file: {str(e)}"

async def read_new_logs(self) -> str:
"""Read new logs appended after last_position."""
if not self.log_file.exists():
return "Initializing logging system..."

try:
async with aiofiles.open(self.log_file, mode="r") as f:
await f.seek(self.last_position)
new_content = await f.read()
self.last_position = await f.tell()
return new_content or ""
except Exception as e:
logging.error(f"Error reading logs: {str(e)}")
return f"Error accessing log file: {str(e)}"

def _setup_routes(self):
@self.router.websocket("/logs/stream")
async def stream_logs(websocket: WebSocket):
await websocket.accept()
try:
# Send the entire file content upon initial connection
full_logs = await self.read_full_file()
if full_logs:
await websocket.send_text(full_logs)

# Now send incremental updates only
while True:
new_logs = await self.read_new_logs()
if new_logs:
await websocket.send_text(new_logs)
await asyncio.sleep(0.5)
except Exception as e:
logging.error(f"WebSocket error: {str(e)}")
finally:
with contextlib.suppress(Exception):
await websocket.close()

@self.router.get("/logs/viewer")
async def get_log_viewer(request: Request):
return self.templates.TemplateResponse(
"log_viewer.html", {"request": request}
)
42 changes: 26 additions & 16 deletions py/core/main/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from .api.v3.documents_router import DocumentsRouter
from .api.v3.graph_router import GraphRouter
from .api.v3.indices_router import IndicesRouter
from .api.v3.logs_router import LogsRouter
from .api.v3.prompts_router import PromptsRouter
from .api.v3.retrieval_router import RetrievalRouterV3
from .api.v3.system_router import SystemRouter
Expand All @@ -31,30 +32,32 @@ def __init__(
HatchetOrchestrationProvider | SimpleOrchestrationProvider
),
auth_router: AuthRouter,
documents_router: DocumentsRouter,
chunks_router: ChunksRouter,
indices_router: IndicesRouter,
users_router: UsersRouter,
collections_router: CollectionsRouter,
conversations_router: ConversationsRouter,
documents_router: DocumentsRouter,
graph_router: GraphRouter,
indices_router: IndicesRouter,
logs_router: LogsRouter,
prompts_router: PromptsRouter,
retrieval_router_v3: RetrievalRouterV3,
system_router: SystemRouter,
graph_router: GraphRouter,
users_router: UsersRouter,
):
self.config = config
self.auth_router = auth_router
self.orchestration_provider = orchestration_provider
self.documents_router = documents_router
self.chunks_router = chunks_router
self.indices_router = indices_router
self.users_router = users_router
self.collections_router = collections_router
self.conversations_router = conversations_router
self.documents_router = documents_router
self.graph_router = graph_router
self.indices_router = indices_router
self.logs_router = logs_router
self.orchestration_provider = orchestration_provider
self.prompts_router = prompts_router
self.retrieval_router_v3 = retrieval_router_v3
self.system_router = system_router
self.graph_router = graph_router
self.users_router = users_router

self.app = FastAPI()

Expand All @@ -73,16 +76,17 @@ async def r2r_exception_handler(request: Request, exc: R2RException):

def _setup_routes(self):

self.app.include_router(self.documents_router, prefix="/v3")
self.app.include_router(self.chunks_router, prefix="/v3")
self.app.include_router(self.indices_router, prefix="/v3")
self.app.include_router(self.users_router, prefix="/v3")
self.app.include_router(self.collections_router, prefix="/v3")
self.app.include_router(self.conversations_router, prefix="/v3")
self.app.include_router(self.documents_router, prefix="/v3")
self.app.include_router(self.graph_router, prefix="/v3")
self.app.include_router(self.indices_router, prefix="/v3")
self.app.include_router(self.logs_router, prefix="/v3")
self.app.include_router(self.prompts_router, prefix="/v3")
self.app.include_router(self.retrieval_router_v3, prefix="/v3")
self.app.include_router(self.graph_router, prefix="/v3")
self.app.include_router(self.system_router, prefix="/v3")
self.app.include_router(self.users_router, prefix="/v3")

@self.app.get("/openapi_spec", include_in_schema=False)
async def openapi_spec():
Expand All @@ -103,10 +107,16 @@ def _apply_cors(self):
)

async def serve(self, host: str = "0.0.0.0", port: int = 7272):
# Start the Hatchet worker in a separate thread
import uvicorn
from core.utils.logging_config import configure_logging

configure_logging()

# Run the FastAPI app
config = uvicorn.Config(self.app, host=host, port=port)
config = uvicorn.Config(
self.app,
host=host,
port=port,
log_config=None,
)
server = uvicorn.Server(config)
await server.serve()
13 changes: 8 additions & 5 deletions py/core/main/app_entry.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
from core.utils.logging_config import configure_logging

import os
from contextlib import asynccontextmanager
from typing import Optional
Expand All @@ -12,7 +13,8 @@

from .assembly import R2RBuilder, R2RConfig

logger = logging.getLogger()
logger, log_file = configure_logging()


# Global scheduler
scheduler = AsyncIOScheduler()
Expand Down Expand Up @@ -64,8 +66,6 @@ async def create_r2r_app(
return await builder.build()


logging.basicConfig(level=logging.INFO)

config_name = os.getenv("R2R_CONFIG_NAME", None)
config_path = os.getenv("R2R_CONFIG_PATH", None)

Expand Down Expand Up @@ -95,7 +95,10 @@ async def create_r2r_app(
)

# Create the FastAPI app
app = FastAPI(lifespan=lifespan)
app = FastAPI(
lifespan=lifespan,
log_config=None,
)


@app.exception_handler(R2RException)
Expand Down
20 changes: 13 additions & 7 deletions py/core/main/assembly/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from ..api.v3.documents_router import DocumentsRouter
from ..api.v3.graph_router import GraphRouter
from ..api.v3.indices_router import IndicesRouter
from ..api.v3.logs_router import LogsRouter
from ..api.v3.prompts_router import PromptsRouter
from ..api.v3.retrieval_router import RetrievalRouterV3
from ..api.v3.system_router import SystemRouter
Expand Down Expand Up @@ -241,32 +242,37 @@ async def build(self, *args, **kwargs) -> R2RApp:
services=services,
orchestration_provider=orchestration_provider,
).get_router(),
"documents_router": DocumentsRouter(
"chunks_router": ChunksRouter(
providers=providers,
services=services,
orchestration_provider=orchestration_provider,
).get_router(),
"chunks_router": ChunksRouter(
"collections_router": CollectionsRouter(
providers=providers,
services=services,
orchestration_provider=orchestration_provider,
).get_router(),
"indices_router": IndicesRouter(
"conversations_router": ConversationsRouter(
providers=providers,
services=services,
orchestration_provider=orchestration_provider,
).get_router(),
"users_router": UsersRouter(
"documents_router": DocumentsRouter(
providers=providers,
services=services,
orchestration_provider=orchestration_provider,
).get_router(),
"collections_router": CollectionsRouter(
"graph_router": GraphRouter(
providers=providers,
services=services,
orchestration_provider=orchestration_provider,
).get_router(),
"conversations_router": ConversationsRouter(
"indices_router": IndicesRouter(
providers=providers,
services=services,
orchestration_provider=orchestration_provider,
).get_router(),
"logs_router": LogsRouter(
providers=providers,
services=services,
orchestration_provider=orchestration_provider,
Expand All @@ -286,7 +292,7 @@ async def build(self, *args, **kwargs) -> R2RApp:
services=services,
orchestration_provider=orchestration_provider,
).get_router(),
"graph_router": GraphRouter(
"users_router": UsersRouter(
providers=providers,
services=services,
orchestration_provider=orchestration_provider,
Expand Down
Loading

0 comments on commit e9e156d

Please sign in to comment.