Skip to content

Commit

Permalink
feat(logging): implement enhanced logging functionality and stream lo…
Browse files Browse the repository at this point in the history
…gs from the API
  • Loading branch information
mert-ergun committed Feb 27, 2025
1 parent 7217da2 commit 298d864
Show file tree
Hide file tree
Showing 7 changed files with 689 additions and 138 deletions.
125 changes: 94 additions & 31 deletions crossbar_llm/backend/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
from dotenv import load_dotenv
from pydantic import BaseModel
from typing import Optional, List
from tools.langchain_llm_qa_trial import RunPipeline
from tools.langchain_llm_qa_trial import RunPipeline, configure_logging
from tools.utils import Logger
import numpy as np
import pandas as pd
import neo4j
Expand All @@ -17,8 +18,8 @@
import queue
import threading
import json

import logging
import time

# Load environment variables
load_dotenv()
Expand Down Expand Up @@ -50,9 +51,11 @@ def get_csrf_config():

@app.get("/csrf-token/")
def get_csrf_token(csrf_protect: CsrfProtect = Depends()):
Logger.debug("Getting CSRF token")
csrf_token, signed_token = csrf_protect.generate_csrf_tokens()
response = JSONResponse({"detail": "CSRF cookie set" ,"csrf_token": csrf_token})
csrf_protect.set_csrf_cookie(signed_token, response)
Logger.info("CSRF token generated and set in cookie")
return response

# Neo4j connection details
Expand All @@ -76,28 +79,39 @@ def emit(self, record):

# Modify the existing logging setup
def setup_logging(verbose=False):
current_time = time.strftime("%Y-%m-%d-%H:%M:%S")
log_filename = f"query_log_{current_time}.log"
configure_logging(verbose=verbose, log_filename=log_filename)

# Add async queue handler for streaming logs to frontend
logger = logging.getLogger()
logger.setLevel(logging.DEBUG if verbose else logging.INFO)
logger.handlers.clear()

# Add async queue handler
queue_handler = AsyncQueueHandler()
queue_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
logger.addHandler(queue_handler)
# Check if AsyncQueueHandler already exists in handlers
has_queue_handler = any(isinstance(h, AsyncQueueHandler) for h in logger.handlers)

if not has_queue_handler:
queue_handler = AsyncQueueHandler()
queue_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
logger.addHandler(queue_handler)

# Log setup complete
Logger.info(f"Backend logging initialized with verbose={verbose}")

@app.get("/stream-logs")
async def stream_logs():
Logger.debug("Stream logs endpoint accessed")
async def event_generator():
while True:
try:
# Wait for new log entries
log_entry = await log_queue.get()
Logger.debug(f"Sending log entry: {log_entry[:50]}...")
yield {
"event": "log",
"data": json.dumps({"log": log_entry})
}
except Exception as e:
logging.error(f"Error in event generator: {e}")
Logger.error(f"Error in event generator: {e}")
continue

return EventSourceResponse(event_generator())
Expand Down Expand Up @@ -129,12 +143,17 @@ async def generate_query(
try:
await csrf_token.validate_csrf(request, cookie_key="fastapi-csrf-token")
except ValueError as e:
Logger.error(f"CSRF validation failed: {e}")
raise HTTPException(status_code=400, detail=str(e))

Logger.info(f"Generating query for question: '{generate_query_request.question}'")
Logger.debug(f"Using LLM: {generate_query_request.llm_type}, top_k: {generate_query_request.top_k}")

key = f"{generate_query_request.llm_type}_{generate_query_request.api_key}"

# Initialize or reuse RunPipeline instance
if key not in pipeline_instances:
Logger.info(f"Creating new pipeline instance for {generate_query_request.llm_type}")
pipeline_instances[key] = RunPipeline(
model_name=generate_query_request.llm_type,
verbose=generate_query_request.verbose,
Expand All @@ -150,28 +169,30 @@ async def generate_query(

try:
if generate_query_request.embedding is not None:
Logger.info("Processing vector search with embedding")
Logger.debug(f"Vector index: {generate_query_request.vector_index}")


print("Embedding:", generate_query_request.vector_index)
generate_query_request.embedding = generate_query_request.embedding.replace('{"vector_data":', '').replace('}', '').replace('[', '').replace(']', '')
embedding = [float(x) for x in generate_query_request.embedding.split(",")]
embedding = np.array(embedding)

Logger.debug(f"Embedding shape: {embedding.shape}")

vector_index = f"{generate_query_request.vector_index}Embeddings"

rp.search_type = "vector_search"
rp.top_k = generate_query_request.top_k

Logger.info("Generating query with vector embedding")
query = rp.run_for_query(
question=generate_query_request.question,
model_name=generate_query_request.llm_type,
api_key=generate_query_request.api_key,
vector_index=vector_index,
embedding = embedding,
embedding=embedding,
reset_llm_type=True
)

else:
Logger.info("Processing standard database search")
rp.search_type = "db_search"
rp.top_k = generate_query_request.top_k

Expand All @@ -182,8 +203,11 @@ async def generate_query(
reset_llm_type=True
)

Logger.info("Query generation successful")
Logger.debug(f"Generated query: {query}")

except Exception as e:
logging.error(f"Error generating query: {str(e)}")
Logger.error(f"Error generating query: {str(e)}")
logs = log_stream.getvalue()
logger.removeHandler(handler)
raise HTTPException(
Expand All @@ -200,7 +224,6 @@ async def generate_query(
logs = log_stream.getvalue()

response = JSONResponse({"query": query, "logs": logs})

return response

@app.post("/run_query/")
Expand All @@ -214,11 +237,17 @@ async def run_query(
await csrf_token.validate_csrf(request, cookie_key="fastapi-csrf-token")

except ValueError as e:
Logger.error(f"CSRF validation failed: {e}")
raise HTTPException(status_code=400, detail=str(e))

Logger.info(f"Running query for question: '{run_query_request.question}'")
Logger.debug(f"Query to execute: {run_query_request.query}")
Logger.debug(f"Using LLM: {run_query_request.llm_type}, top_k: {run_query_request.top_k}")

key = f"{run_query_request.llm_type}_{run_query_request.api_key}"

if key not in pipeline_instances:
Logger.info(f"Creating new pipeline instance for {run_query_request.llm_type}")
pipeline_instances[key] = RunPipeline(
model_name=run_query_request.llm_type,
verbose=run_query_request.verbose,
Expand All @@ -233,6 +262,7 @@ async def run_query(
logger.addHandler(handler)

try:
Logger.info("Executing query against database")
response, result = rp.execute_query(
query=run_query_request.query,
question=run_query_request.question,
Expand All @@ -241,8 +271,12 @@ async def run_query(
reset_llm_type=True
)

Logger.info("Query executed successfully")
Logger.debug(f"Result count: {len(result) if isinstance(result, list) else 'N/A'}")
Logger.debug(f"Natural language response generated")

except Exception as e:
logging.error(f"Error running query: {str(e)}")
Logger.error(f"Error running query: {str(e)}")
logs = log_stream.getvalue()
logger.removeHandler(handler)
raise HTTPException(
Expand All @@ -269,59 +303,79 @@ async def upload_vector(
embedding_type: Optional[str] = Form(None),
file: UploadFile = File(...)
):
Logger.info(f"Vector upload requested for category: {vector_category}")
Logger.debug(f"Embedding type: {embedding_type}")
Logger.debug(f"Filename: {file.filename}")

try:
await csrf_token.validate_csrf(request, cookie_key="fastapi-csrf-token")

except ValueError as e:
Logger.error(f"CSRF validation failed: {e}")
raise HTTPException(status_code=400, detail=str(e))

try:
contents = await file.read()
filename = file.filename
file_extension = filename.split('.')[-1]

Logger.debug(f"File extension: {file_extension}")

# Convert the file content to vector
if file_extension == 'csv':
Logger.debug("Processing CSV file")
df = pd.read_csv(BytesIO(contents))
if df.shape[1] > 1:
Logger.warning("CSV file contains multiple columns")
raise ValueError(
"The CSV file should contain only one column (one array). Multiple columns detected."
)
vector_data = df.to_numpy().flatten()
elif file_extension == 'npy':
Logger.debug("Processing NPY file")
arr = np.load(BytesIO(contents), allow_pickle=True)
if arr.ndim > 1:
Logger.warning("NPY file contains multi-dimensional array")
raise ValueError(
"The NPY file should contain only one array. Multiple arrays or a multi-dimensional array detected."
)
vector_data = arr.flatten()
else:
Logger.error(f"Unsupported file format: {file_extension}")
raise ValueError("Unsupported file format. Please upload a CSV or NPY file.")

print(f"Vector data for {filename}:", vector_data)

Logger.debug(f"Vector data shape: {vector_data.shape}")
Logger.info("Vector data processed successfully")

response = JSONResponse({"vector_data": vector_data.tolist()})

return response
except ValueError as e:
Logger.error(f"Error processing vector data: {e}")
raise HTTPException(status_code=400, detail=str(e))


@app.get("/database_stats/")
async def get_database_stats(request: Request, csrf_token: CsrfProtect = Depends()):
Logger.info("Database stats requested")
try:
await csrf_token.validate_csrf(request, cookie_key="fastapi-csrf-token")
except ValueError as e:
Logger.error(f"CSRF validation failed: {e}")
raise HTTPException(status_code=400, detail=str(e))
statistics = get_neo4j_statistics()
return statistics

try:
statistics = get_neo4j_statistics()
Logger.info("Database stats retrieved successfully")
Logger.debug(f"Database stats: {statistics}")
return statistics
except Exception as e:
Logger.error(f"Error getting database stats: {e}")
raise HTTPException(status_code=500, detail=str(e))

# Utility functions

def get_neo4j_statistics():
Logger.debug(f"Connecting to Neo4j at {neo4j_uri}")
driver = neo4j.GraphDatabase.driver(neo4j_uri, auth=(neo4j_user, neo4j_password))
with driver.session() as session:
# Get top 5 node labels and their counts
Logger.debug("Executing query for top 5 node labels")
top_labels_query = """
MATCH (n)
UNWIND labels(n) AS label
Expand All @@ -331,8 +385,9 @@ def get_neo4j_statistics():
"""
top_labels_result = session.run(top_labels_query)
top_5_labels = {record["label"]: record["count"] for record in top_labels_result}

# Get counts of node label combinations
Logger.debug("Executing query for node label combinations")
node_counts_query = """
MATCH (n)
WITH labels(n) AS labels_list, COUNT(*) AS count
Expand All @@ -342,8 +397,9 @@ def get_neo4j_statistics():
node_counts_result = session.run(node_counts_query)
# Convert labels list to a string to make it JSON-serializable
node_counts = {", ".join(record["labels_list"]): record["count"] for record in node_counts_result}

# Get top 5 relationship types and their counts
Logger.debug("Executing query for relationship types")
relationship_counts_query = """
MATCH ()-[r]->()
RETURN TYPE(r) AS type, COUNT(*) AS count
Expand All @@ -352,13 +408,20 @@ def get_neo4j_statistics():
"""
relationship_counts_result = session.run(relationship_counts_query)
relationship_counts = {record["type"]: record["count"] for record in relationship_counts_result}

driver.close()

Logger.debug("Neo4j connection closed")

statistics = {
"top_5_labels": top_5_labels,
"node_counts": node_counts,
"relationship_counts": relationship_counts
}

return statistics

return statistics
# Initialize logging on startup
@app.on_event("startup")
async def startup_event():
setup_logging(verbose=False)
Logger.info("API server started")
38 changes: 33 additions & 5 deletions crossbar_llm/backend/tools/langchain_llm_qa_trial.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,43 @@


def configure_logging(verbose=False, log_filename="query_log.log"):
log_handlers = [logging.FileHandler(log_filename)]
"""
Configure logging for the application based on verbosity level.
Args:
verbose (bool): Whether to show detailed debug logs
log_filename (str): Name of the log file to write logs to
"""
# Create logs directory if it doesn't exist
log_dir = os.path.join(parent_dir, "logs")
os.makedirs(log_dir, exist_ok=True)
log_path = os.path.join(log_dir, log_filename)

# Set up file handler for all logs
file_handler = logging.FileHandler(log_path)
file_handler.setFormatter(logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
))

# Set up handlers based on verbosity
handlers = [file_handler]
if verbose:
log_handlers.append(logging.StreamHandler())

console_handler = logging.StreamHandler()
console_handler.setFormatter(logging.Formatter(
'%(asctime)s - %(levelname)s - %(message)s'
))
handlers.append(console_handler)

# Configure root logger
logging.basicConfig(
handlers=log_handlers,
level=logging.INFO,
handlers=handlers,
level=logging.DEBUG if verbose else logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)

# Log configuration completed
log_level = "DEBUG" if verbose else "INFO"
logging.info(f"Logging initialized with level: {log_level}, output to: {log_path}")


class Config(BaseModel):
Expand Down
Loading

0 comments on commit 298d864

Please sign in to comment.