@@ -993,29 +993,33 @@ async def _aload_pending_sends(
993
993
"""
994
994
# Query checkpoint_writes for parent checkpoint's TASKS channel
995
995
parent_writes_query = FilterQuery (
996
- filter_expression = (Tag ("thread_id" ) == to_storage_safe_id (thread_id ))
997
- & (Tag ("checkpoint_ns" ) == to_storage_safe_str (checkpoint_ns ))
998
- & (Tag ("checkpoint_id" ) == to_storage_safe_id (parent_checkpoint_id ))
999
- & (Tag ("channel" ) == TASKS ),
1000
- return_fields = ["type" , "blob" , "task_path" , "task_id" , "idx" ],
1001
- num_results = 100 , # Adjust as needed
1002
- )
1003
- parent_writes_results = await self .checkpoint_writes_index .search (
1004
- parent_writes_query
996
+ filter_expression = (
997
+ (Tag ("thread_id" ) == to_storage_safe_id (thread_id ))
998
+ & (Tag ("checkpoint_ns" ) == checkpoint_ns )
999
+ & (Tag ("checkpoint_id" ) == to_storage_safe_id (parent_checkpoint_id ))
1000
+ & (Tag ("channel" ) == TASKS )
1001
+ ),
1002
+ return_fields = ["type" , "$.blob" , "task_path" , "task_id" , "idx" ],
1003
+ num_results = 100 ,
1005
1004
)
1006
-
1007
- # Sort results by task_path, task_id, idx (matching Postgres implementation)
1008
- sorted_writes = sorted (
1009
- parent_writes_results .docs ,
1010
- key = lambda x : (
1011
- getattr (x , "task_path" , "" ),
1012
- getattr (x , "task_id" , "" ),
1013
- getattr (x , "idx" , 0 ),
1005
+ res = await self .checkpoint_writes_index .search (parent_writes_query )
1006
+
1007
+ # Sort results for deterministic order
1008
+ docs = sorted (
1009
+ res .docs ,
1010
+ key = lambda d : (
1011
+ getattr (d , "task_path" , "" ),
1012
+ getattr (d , "task_id" , "" ),
1013
+ getattr (d , "idx" , 0 ),
1014
1014
),
1015
1015
)
1016
-
1017
- # Extract type and blob pairs
1018
- return [(doc .type , doc .blob ) for doc in sorted_writes ]
1016
+
1017
+ # Convert to expected format
1018
+ return [
1019
+ (d .type .encode (), blob )
1020
+ for d in docs
1021
+ if (blob := getattr (d , "$.blob" , getattr (d , "blob" , None ))) is not None
1022
+ ]
1019
1023
1020
1024
async def _aload_pending_writes (
1021
1025
self ,
0 commit comments