Skip to content

Commit 1d3a505

Browse files
authored
🔧 refactor: Document Processing and Health Check Functions (#221)
* 🔧 refactor: Document Processing and Health Check Functions * Refactored `_prepare_documents_sync` to handle document preparation synchronously, improving performance and clarity. * Updated `store_data_in_vector_db` to utilize the new synchronous document preparation function, ensuring non-blocking behavior in the async context. * Changed `is_health_ok` to be asynchronous, allowing for proper handling of health checks for different database types. These changes streamline document processing and improve the responsiveness of health checks in the application. * 🔧 refactor: Update Event Loop Retrieval in Document Processing * Replaced `asyncio.get_event_loop()` with `asyncio.get_running_loop()` in `_process_documents_batched_sync` and `store_data_in_vector_db` to ensure compatibility with the current asyncio context. * This change enhances the reliability of asynchronous operations by using the correct method for obtaining the running event loop.
1 parent 95db2e1 commit 1d3a505

File tree

2 files changed

+37
-15
lines changed

2 files changed

+37
-15
lines changed

app/routes/document_routes.py

Lines changed: 33 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -548,7 +548,7 @@ async def _process_documents_batched_sync(
548548
EMBEDDING_BATCH_SIZE,
549549
)
550550

551-
loop = asyncio.get_event_loop()
551+
loop = asyncio.get_running_loop()
552552

553553
for batch_idx in range(num_batches):
554554
start_idx = batch_idx * EMBEDDING_BATCH_SIZE
@@ -608,13 +608,16 @@ def generate_digest(page_content: str):
608608
return hash_obj.hexdigest()
609609

610610

611-
async def store_data_in_vector_db(
611+
def _prepare_documents_sync(
612612
data: Iterable[Document],
613613
file_id: str,
614-
user_id: str = "",
615-
clean_content: bool = False,
616-
executor=None,
617-
) -> bool:
614+
user_id: str,
615+
clean_content: bool,
616+
) -> List[Document]:
617+
"""
618+
Synchronous document preparation - runs in executor to avoid blocking event loop.
619+
Handles text splitting, cleaning, and metadata preparation.
620+
"""
618621
text_splitter = RecursiveCharacterTextSplitter(
619622
chunk_size=CHUNK_SIZE, chunk_overlap=CHUNK_OVERLAP
620623
)
@@ -626,7 +629,7 @@ async def store_data_in_vector_db(
626629
doc.page_content = clean_text(doc.page_content)
627630

628631
# Preparing documents with page content and metadata for insertion.
629-
docs = [
632+
return [
630633
Document(
631634
page_content=doc.page_content,
632635
metadata={
@@ -639,15 +642,34 @@ async def store_data_in_vector_db(
639642
for doc in documents
640643
]
641644

645+
646+
async def store_data_in_vector_db(
647+
data: Iterable[Document],
648+
file_id: str,
649+
user_id: str = "",
650+
clean_content: bool = False,
651+
executor=None,
652+
) -> bool:
653+
# Run document preparation in executor to avoid blocking the event loop
654+
loop = asyncio.get_running_loop()
655+
docs = await loop.run_in_executor(
656+
executor,
657+
_prepare_documents_sync,
658+
data,
659+
file_id,
660+
user_id,
661+
clean_content,
662+
)
663+
642664
try:
643665
if EMBEDDING_BATCH_SIZE <= 0:
644666
# synchronously embed the file and insert into vector store in one go
645667
if isinstance(vector_store, AsyncPgVector):
646668
ids = await vector_store.aadd_documents(
647-
docs, ids=[file_id] * len(documents), executor=executor
669+
docs, ids=[file_id] * len(docs), executor=executor
648670
)
649671
else:
650-
ids = vector_store.add_documents(docs, ids=[file_id] * len(documents))
672+
ids = vector_store.add_documents(docs, ids=[file_id] * len(docs))
651673
else:
652674
# asynchronously embed the file and insert into vector store as it is embedding
653675
# to lessen memory impact and speed up slightly as the majority of the document
@@ -884,7 +906,7 @@ async def embed_file_upload(
884906
user_id = get_user_id(request, entity_id)
885907
temp_file_path = os.path.join(RAG_UPLOAD_DIR, uploaded_file.filename)
886908

887-
save_upload_file_sync(uploaded_file, temp_file_path)
909+
await save_upload_file_async(uploaded_file, temp_file_path)
888910

889911
try:
890912
data, known_type, file_ext = await load_file_content(
@@ -926,7 +948,7 @@ async def embed_file_upload(
926948
detail=f"Error during file processing: {str(e)}",
927949
)
928950
finally:
929-
os.remove(temp_file_path)
951+
await cleanup_temp_file_async(temp_file_path)
930952

931953
return {
932954
"status": True,

app/utils/health.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,10 @@
44
from app.services.mongo_client import mongo_health_check
55

66

7-
def is_health_ok():
7+
async def is_health_ok():
88
if VECTOR_DB_TYPE == VectorDBType.PGVECTOR:
9-
return pg_health_check()
9+
return await pg_health_check()
1010
if VECTOR_DB_TYPE == VectorDBType.ATLAS_MONGO:
11-
return mongo_health_check()
11+
return await mongo_health_check()
1212
else:
13-
return True
13+
return True

0 commit comments

Comments
 (0)