Skip to content

Commit 525a505

Browse files
MarcAmickMarc Amickdanny-avila
authored
🚰 feat: Stream Document Embeddings to Database in Batches (#214)
* Process documents using async pipeline with concurrent embedding generation and DB insertion. This provides better memory efficiency by immediately inserting embeddings as they're generated. * - mad maxsize of embedding-queue configurable with EMBEDIING_MAX_QUEUE_SIZE env var * implemented fixes for copilot code review * implemented fixes for copilot code review - white space changes and remove unneeded import * implemented fixes for copilot code review - added documentation in function header for executor argument * feat: improve streaming document embeddings with bug fixes and tests This PR implements several critical fixes and improvements for the batch embedding feature that streams document embeddings to reduce memory consumption. ## Critical Fixes - Fix blocking async in _process_documents_batched_sync by wrapping sync calls in run_in_executor() - Unify rollback behavior: add `if all_ids:` guard to async path to only rollback when documents were actually inserted - Fix producer error handling: move `put(None)` to finally block to always signal completion - Add task_done() call for None signal in consumer - Fix consumer error handling: put exception object in results_queue instead of empty list, enabling proper error propagation ## Code Quality Improvements - Remove dead `# temp_vector_store` comment - Remove ineffective `del` statements (don't help with memory in Python) - Fix all f-string logging to use %-style formatting - Update docstrings to accurately describe function behavior - Add type hints (AsyncPgVector, PgVector, ThreadPoolExecutor) - Rename functions for clarity: embedding_producer → batch_producer, database_consumer → embedding_consumer - Add early return for empty document lists ## Tests (31 new tests) - tests/test_batch_processing.py: 23 unit tests covering async pipeline, sync batched processing, edge cases, and producer-consumer pattern - tests/test_batch_processing_integration.py: 8 integration tests including memory optimization verification with tracemalloc ## Documentation - Add EMBEDDING_BATCH_SIZE and EMBEDDING_MAX_QUEUE_SIZE to README env vars - Add "Embedding Batch Processing" section with configuration guide - Add "Running Tests" section with commands and test categories - Improve inline comments in config.py explaining batch processing trade-offs - Add pytest.ini for test configuration Closes #213 * feat: add utility function for batch calculation and update tests This commit introduces a new utility function `calculate_num_batches` to streamline batch size calculations across the document processing routes. The function handles various edge cases, including zero items and batch sizes of zero. Additionally, existing tests have been updated to utilize this new function, ensuring accurate batch count calculations and improving test coverage for edge cases. * refactor: update vector store mocking in tests to use AsyncPgVector This commit modifies the test setup in `test_main.py` to replace the mocking of vector store methods with the `AsyncPgVector` class. The changes include updating the signatures of dummy functions to accept `self` and ensuring that all relevant methods are patched correctly. This enhances the accuracy of the tests by aligning them with the asynchronous implementation of the vector store. * test: clear cache for embedding function in vector store tests This commit updates the test setup in `test_main.py` to clear the cache of the `get_cached_query_embedding` function before running tests. This ensures that the mock embedding function is used consistently, improving the reliability of the tests. Additionally, the comment for overriding the embedding function has been clarified to indicate that it uses a dummy implementation that does not call OpenAI. * test: enhance vector store tests with dummy embedding function This commit updates the test setup in `test_main.py` to clear the cache of the `get_cached_query_embedding` function and replaces it with a dummy implementation that returns predefined embeddings. This change improves the reliability of the tests by ensuring consistent behavior during test execution. --------- Co-authored-by: Marc Amick <[email protected]> Co-authored-by: Danny Avila <[email protected]>
1 parent 65c64ed commit 525a505

File tree

7 files changed

+1213
-26
lines changed

7 files changed

+1213
-26
lines changed

README.md

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,8 @@ The following environment variables are required to run the application:
5959
- `COLLECTION_NAME`: (Optional) The name of the collection in the vector store. Default value is "testcollection".
6060
- `CHUNK_SIZE`: (Optional) The size of the chunks for text processing. Default value is "1500".
6161
- `CHUNK_OVERLAP`: (Optional) The overlap between chunks during text processing. Default value is "100".
62+
- `EMBEDDING_BATCH_SIZE`: (Optional) Number of document chunks to process per batch. Set to `0` (default) to disable batching. Recommended value is `750` for `text-embedding-3-small`.
63+
- `EMBEDDING_MAX_QUEUE_SIZE`: (Optional) Maximum number of batches to buffer in memory during async processing. Default value is "3".
6264
- `RAG_UPLOAD_DIR`: (Optional) The directory where uploaded files are stored. Default value is "./uploads/".
6365
- `PDF_EXTRACT_IMAGES`: (Optional) A boolean value indicating whether to extract images from PDF files. Default value is "False".
6466
- `DEBUG_RAG_API`: (Optional) Set to "True" to show more verbose logging output in the server console, and to enable postgresql database routes
@@ -95,6 +97,41 @@ The following environment variables are required to run the application:
9597

9698
Make sure to set these environment variables before running the application. You can set them in a `.env` file or as system environment variables.
9799

100+
### Embedding Batch Processing
101+
102+
For large files, you can enable batched embedding processing to reduce memory consumption. This is particularly useful in memory-constrained environments like Kubernetes pods with memory limits.
103+
104+
#### Configuration
105+
106+
| Variable | Default | Description |
107+
|----------|---------|-------------|
108+
| `EMBEDDING_BATCH_SIZE` | `0` | Number of document chunks to process per batch. `0` disables batching (original behavior). |
109+
| `EMBEDDING_MAX_QUEUE_SIZE` | `3` | Maximum number of batches to buffer in memory during async processing. |
110+
111+
#### Recommended Settings
112+
113+
For `text-embedding-3-small` model:
114+
- `EMBEDDING_BATCH_SIZE=750` - Good balance of throughput and memory
115+
116+
For memory-constrained environments (< 2GB RAM):
117+
- `EMBEDDING_BATCH_SIZE=100-250`
118+
119+
For high-throughput environments:
120+
- `EMBEDDING_BATCH_SIZE=1000-2000`
121+
- `EMBEDDING_MAX_QUEUE_SIZE=5`
122+
123+
#### Behavior
124+
125+
When `EMBEDDING_BATCH_SIZE > 0`:
126+
- Documents are processed in batches of the specified size
127+
- Each batch is embedded and inserted before the next batch starts
128+
- On failure, successfully inserted documents are rolled back
129+
- Memory usage is bounded by `EMBEDDING_BATCH_SIZE * EMBEDDING_MAX_QUEUE_SIZE`
130+
131+
When `EMBEDDING_BATCH_SIZE = 0` (default):
132+
- All documents are processed at once (original behavior)
133+
- Better for small files or memory-rich environments
134+
98135
### Use Atlas MongoDB as Vector Database
99136

100137
Instead of using the default pgvector, we could use [Atlas MongoDB](https://www.mongodb.com/products/platform/atlas-vector-search) as the vector database. To do so, set the following environment variables
@@ -169,6 +206,81 @@ Notes:
169206

170207
### Dev notes:
171208

209+
#### Running Tests
210+
211+
##### Prerequisites
212+
213+
Install test dependencies:
214+
215+
```bash
216+
pip install -r test_requirements.txt
217+
```
218+
219+
##### Running All Tests
220+
221+
```bash
222+
# Run all tests
223+
pytest
224+
225+
# Run with verbose output
226+
pytest -v
227+
228+
# Run with coverage (if pytest-cov is installed)
229+
pytest --cov=app
230+
```
231+
232+
##### Running Specific Test Files
233+
234+
```bash
235+
# Run batch processing unit tests
236+
pytest tests/test_batch_processing.py -v
237+
238+
# Run batch processing integration tests (memory optimization tests)
239+
pytest tests/test_batch_processing_integration.py -v
240+
241+
# Run main API tests
242+
pytest tests/test_main.py -v
243+
```
244+
245+
##### Running Tests by Category
246+
247+
```bash
248+
# Run only integration tests (marked with @pytest.mark.integration)
249+
pytest -m integration -v
250+
251+
# Skip integration tests
252+
pytest -m "not integration" -v
253+
254+
# Run only async tests
255+
pytest -k "async" -v
256+
```
257+
258+
##### Test Categories
259+
260+
| Test File | Description |
261+
|-----------|-------------|
262+
| `test_batch_processing.py` | Unit tests for batch processing functions |
263+
| `test_batch_processing_integration.py` | Memory optimization and integration tests |
264+
| `test_main.py` | API endpoint tests |
265+
| `test_config.py` | Configuration tests |
266+
| `test_middleware.py` | Middleware tests |
267+
| `test_models.py` | Model tests |
268+
269+
##### Memory Optimization Tests
270+
271+
The `test_batch_processing_integration.py` file includes tests that verify the memory optimization behavior:
272+
273+
- **`test_memory_bounded_by_batch_size`**: Verifies that the number of documents in memory at any time is bounded by `EMBEDDING_BATCH_SIZE`
274+
- **`test_memory_tracking_with_tracemalloc`**: Uses Python's `tracemalloc` to monitor memory usage during batch processing
275+
- **`test_sync_memory_bounded_by_batch_size`**: Same verification for the synchronous code path
276+
277+
Run memory tests specifically:
278+
279+
```bash
280+
pytest tests/test_batch_processing_integration.py::TestMemoryOptimization -v
281+
pytest tests/test_batch_processing_integration.py::TestSyncBatchedMemory -v
282+
```
283+
172284
#### Installing pre-commit formatter
173285

174286
Run the following commands to install pre-commit formatter, which uses [black](https://github.com/psf/black) code formatter:

app/config.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,22 @@ def get_env_variable(
7070
CHUNK_SIZE = int(get_env_variable("CHUNK_SIZE", "1500"))
7171
CHUNK_OVERLAP = int(get_env_variable("CHUNK_OVERLAP", "100"))
7272

73+
# Batch processing configuration for memory-constrained environments.
74+
# When EMBEDDING_BATCH_SIZE > 0, documents are processed in batches to reduce
75+
# peak memory usage. This is useful for Kubernetes pods with memory limits.
76+
#
77+
# Trade-offs:
78+
# - Smaller batch size = lower memory, more DB round trips
79+
# - Larger batch size = higher memory, fewer DB round trips
80+
# - 0 = disable batching, process all at once (original behavior)
81+
#
82+
# Recommended: 750 for text-embedding-3-small (good balance of speed and memory)
83+
EMBEDDING_BATCH_SIZE = int(get_env_variable("EMBEDDING_BATCH_SIZE", "0"))
84+
85+
# Maximum number of batches to buffer in memory during async processing.
86+
# Higher values allow more parallelism but use more memory.
87+
EMBEDDING_MAX_QUEUE_SIZE = int(get_env_variable("EMBEDDING_MAX_QUEUE_SIZE", "3"))
88+
7389
env_value = get_env_variable("PDF_EXTRACT_IMAGES", "False").lower()
7490
PDF_EXTRACT_IMAGES = True if env_value == "true" else False
7591

0 commit comments

Comments
 (0)