-
Notifications
You must be signed in to change notification settings - Fork 10
feat(checkpoint-redis): implement adelete_thread and delete_thread … #54
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…ethods (#51) - Add adelete_thread method to AsyncRedisSaver to delete all checkpoints, blobs, and writes for a thread - Add delete_thread method to RedisSaver for sync operations - Use Redis search indices instead of keys() command for better performance - Batch deletions using Redis pipeline for efficiency
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR implements asynchronous and synchronous thread deletion methods for the checkpoint Redis saving mechanisms, improving performance by using Redis search indices and pipelining.
- Added adelete_thread method in AsyncRedisSaver for async checkpoint, blob, and writes deletion.
- Added delete_thread method in RedisSaver for synchronous deletion following similar logic.
- Introduced comprehensive tests to validate the deletion behavior for both implementations.
Reviewed Changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 3 comments.
File | Description |
---|---|
tests/test_issue_51_adelete_thread.py | Added tests for async and sync thread deletion functionality. |
langgraph/checkpoint/redis/aio.py | Implemented async adelete_thread using Redis search indices and pipelines. |
langgraph/checkpoint/redis/init.py | Implemented sync delete_thread mirroring the async deletion logic. |
checkpoint_query = FilterQuery( | ||
filter_expression=Tag("thread_id") == storage_safe_thread_id, | ||
return_fields=["checkpoint_ns", "checkpoint_id"], | ||
num_results=10000, # Get all checkpoints for this thread |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider defining a named constant for the magic number 10000 to improve maintainability and readability.
num_results=10000, # Get all checkpoints for this thread | |
num_results=DEFAULT_NUM_RESULTS, # Get all checkpoints for this thread |
Copilot uses AI. Check for mistakes.
checkpoint_query = FilterQuery( | ||
filter_expression=Tag("thread_id") == storage_safe_thread_id, | ||
return_fields=["checkpoint_ns", "checkpoint_id"], | ||
num_results=10000, # Get all checkpoints for this thread |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider using a named constant instead of hardcoding 10000 to enhance clarity and ease future adjustments.
num_results=10000, # Get all checkpoints for this thread | |
num_results=DEFAULT_NUM_RESULTS, # Get all checkpoints for this thread |
Copilot uses AI. Check for mistakes.
@@ -575,6 +575,78 @@ def _load_pending_sends( | |||
# Extract type and blob pairs | |||
return [(doc.type, doc.blob) for doc in sorted_writes] | |||
|
|||
def delete_thread(self, thread_id: str) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] Both async and sync deletion methods share very similar logic; consider refactoring the common deletion routines into a shared utility to reduce duplication.
Copilot uses AI. Check for mistakes.
…methods (#51)