@@ -687,24 +687,38 @@ absl::Status SchemaManager::LoadIndicesFromAux(RedisModuleCtx *ctx,
687
687
return absl::OkStatus ();
688
688
}
689
689
690
+ absl::StatusOr<vmsdk::UniqueRedisDetachedThreadSafeContext>
691
+ CreateRDBDetachedContext (RedisModuleCtx *ctx, RedisModuleIO *rdb) {
692
+ int db_num = RedisModule_GetDbIdFromIO (rdb);
693
+ auto rdb_load_ctx = vmsdk::MakeUniqueRedisDetachedThreadSafeContext (
694
+ RedisModule_GetDetachedThreadSafeContext (ctx));
695
+ if (RedisModule_SelectDb (rdb_load_ctx.get (), db_num) != REDISMODULE_OK) {
696
+ return absl::InternalError (absl::StrCat (" Failed to select DB " , db_num,
697
+ " for index schema RDB load" ));
698
+ }
699
+ return rdb_load_ctx;
700
+ }
701
+
690
702
absl::Status SchemaManager::AuxLoad (RedisModuleIO *rdb, int encver, int when) {
691
703
if (when == REDISMODULE_AUX_BEFORE_RDB) {
692
704
return absl::OkStatus ();
693
705
}
694
- auto ctx = RedisModule_GetContextFromIO (rdb);
695
706
696
707
auto aux_index_schema_count = RedisModule_LoadUnsigned (rdb);
697
708
if (aux_index_schema_count > 0 ) {
698
709
// Note that we need to subscribe now, so that we can get the loading
699
710
// ended callback.
700
711
SubscribeToServerEventsIfNeeded ();
701
712
}
713
+ VMSDK_ASSIGN_OR_RETURN (
714
+ auto rdb_load_ctx,
715
+ CreateRDBDetachedContext (RedisModule_GetContextFromIO (rdb), rdb));
702
716
if (staging_indices_due_to_repl_load_.Get ()) {
703
- VMSDK_RETURN_IF_ERROR (
704
- StageIndicesFromAux (ctx , aux_index_schema_count, rdb, encver));
717
+ VMSDK_RETURN_IF_ERROR (StageIndicesFromAux (
718
+ rdb_load_ctx. get () , aux_index_schema_count, rdb, encver));
705
719
} else {
706
- VMSDK_RETURN_IF_ERROR (
707
- LoadIndicesFromAux (ctx , aux_index_schema_count, rdb, encver));
720
+ VMSDK_RETURN_IF_ERROR (LoadIndicesFromAux (
721
+ rdb_load_ctx. get () , aux_index_schema_count, rdb, encver));
708
722
}
709
723
710
724
return absl::OkStatus ();
@@ -770,23 +784,18 @@ int SchemaManager::OnAuxLoadCallback(RedisModuleIO *rdb, int encver, int when) {
770
784
absl::StatusOr<void *> SchemaManager::IndexSchemaRDBLoad (RedisModuleIO *rdb,
771
785
int encoding_version) {
772
786
// Make sure we create the index schema in the right DB.
773
- int db_num = RedisModule_GetDbIdFromIO (rdb);
774
- RedisModuleCtx *rdb_load_ctx =
775
- RedisModule_GetDetachedThreadSafeContext (detached_ctx_.get ());
776
- if (RedisModule_SelectDb (rdb_load_ctx, db_num) != REDISMODULE_OK) {
777
- return absl::InternalError (absl::StrCat (" Failed to select DB " , db_num,
778
- " for index schema RDB load" ));
779
- }
787
+ VMSDK_ASSIGN_OR_RETURN (auto rdb_load_ctx,
788
+ CreateRDBDetachedContext (detached_ctx_.get (), rdb));
780
789
781
790
IndexSchema *index_schema_ptr = nullptr ;
782
791
if (staging_indices_due_to_repl_load_.Get ()) {
783
792
VMSDK_ASSIGN_OR_RETURN (
784
793
index_schema_ptr,
785
- StageIndexFromRDB (rdb_load_ctx, rdb, encoding_version));
794
+ StageIndexFromRDB (rdb_load_ctx. get () , rdb, encoding_version));
786
795
} else {
787
796
VMSDK_ASSIGN_OR_RETURN (
788
797
index_schema_ptr,
789
- LoadIndexFromRDB (rdb_load_ctx, rdb, encoding_version));
798
+ LoadIndexFromRDB (rdb_load_ctx. get () , rdb, encoding_version));
790
799
}
791
800
792
801
// Note that we need to subscribe now, so that we can get the loading ended
0 commit comments