Skip to content

Commit

Permalink
refactor(pm): move domain conversion handler to kv_router_store
Browse files Browse the repository at this point in the history
  • Loading branch information
jagan-jaya committed Feb 25, 2025
1 parent 4fc28e6 commit 31c0427
Show file tree
Hide file tree
Showing 2 changed files with 206 additions and 186 deletions.
223 changes: 151 additions & 72 deletions crates/storage_impl/src/kv_router_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,14 @@ use crate::redis::kv_store::{KvStorePartition, PartitionKey, RedisConnInterface}
use crate::utils::find_all_combined_kv_database;
use crate::utils::try_redis_get_else_try_database_get;
use common_enums::enums::MerchantStorageScheme;
use common_utils::fallback_reverse_lookup_not_found;
use common_utils::{fallback_reverse_lookup_not_found, types::keymanager::KeyManagerState};
use diesel_models::{errors::DatabaseError, kv};
use error_stack::ResultExt;
use hyperswitch_domain_models::errors::{self, StorageResult};
use hyperswitch_domain_models::{
behaviour::{Conversion, ReverseConversion},
errors::{self, StorageResult},
merchant_key_store::MerchantKeyStore,
};
use masking::StrongSecret;
use serde::de;
use std::fmt::Debug;
Expand Down Expand Up @@ -46,10 +50,23 @@ pub struct InsertResourceParams<'a> {
pub insertable: kv::Insertable,
pub reverse_lookups: Vec<String>,
pub key: PartitionKey<'a>,
pub field: String,
// secondary key
pub identifier: String,
// type of resource Eg: "payment_attempt"
pub resource_type: &'static str,
}

pub struct UpdateResourceParams<'a> {
pub updateable: kv::Updateable,
pub operation: Op<'a>,
}

pub struct FilterResourceParams<'a> {
pub key: PartitionKey<'a>,
pub pattern: &'static str,
pub limit: Option<i64>,
}

#[async_trait::async_trait]
impl<T> DatabaseStore for KVRouterStore<T>
where
Expand Down Expand Up @@ -154,78 +171,103 @@ impl<T: DatabaseStore> KVRouterStore<T> {
.change_context(RedisError::StreamAppendFailed)
}

pub async fn find_resource_by_id<DFut, D, R>(
pub async fn find_resource_by_id<DFut, D, R, M>(
&self,
state: &KeyManagerState,
key_store: &MerchantKeyStore,
storage_scheme: MerchantStorageScheme,
find_resource_fn: R,
find_resource_db_fn: R,
lookup_id: String,
) -> error_stack::Result<D, errors::StorageError>
where
D: de::DeserializeOwned
D: Debug + Sync + Conversion,
M: de::DeserializeOwned
+ serde::Serialize
+ Debug
+ KvStorePartition
+ UniqueConstraints
+ Sync,
DFut: futures::Future<Output = error_stack::Result<D, DatabaseError>> + Send,
+ Sync
+ ReverseConversion<D>,
DFut: futures::Future<Output = error_stack::Result<M, DatabaseError>> + Send,
R: FnOnce() -> DFut,
{
let database_call = || async {
find_resource_fn().await.map_err(|error| {
find_resource_db_fn().await.map_err(|error| {
let new_err = diesel_error_to_data_error(*error.current_context());
error.change_context(new_err)
})
};
let storage_scheme = Box::pin(decide_storage_scheme::<T, D>(
let storage_scheme = Box::pin(decide_storage_scheme::<T, M>(
self,
storage_scheme,
Op::Find,
))
.await;
match storage_scheme {
MerchantStorageScheme::PostgresOnly => database_call().await,
MerchantStorageScheme::RedisKv => {
let lookup = fallback_reverse_lookup_not_found!(
self.get_lookup_by_lookup_id(&lookup_id, storage_scheme)
.await,
database_call().await
);
let res = || async {
match storage_scheme {
MerchantStorageScheme::PostgresOnly => database_call().await,
MerchantStorageScheme::RedisKv => {
let lookup = fallback_reverse_lookup_not_found!(
self.get_lookup_by_lookup_id(&lookup_id, storage_scheme)
.await,
database_call().await
);

let key = PartitionKey::CombinationKey {
combination: &lookup.pk_id,
};
let key = PartitionKey::CombinationKey {
combination: &lookup.pk_id,
};

Box::pin(try_redis_get_else_try_database_get(
async {
Box::pin(kv_wrapper(self, KvOperation::<D>::HGet(&lookup.sk_id), key))
.await?
.try_into_hget()
},
database_call,
))
.await
Box::pin(try_redis_get_else_try_database_get(
async {
Box::pin(kv_wrapper(self, KvOperation::<M>::HGet(&lookup.sk_id), key))
.await?
.try_into_hget()
},
database_call,
))
.await
}
}
}
};
res()
.await?
.convert(
state,
key_store.key.get_inner(),
key_store.merchant_id.clone().into(),
)
.await
.change_context(errors::StorageError::DecryptionError)
}

pub async fn insert_resource<DFut, D, R>(
pub async fn insert_resource<DFut, D, R, M>(
&self,
state: &KeyManagerState,
key_store: &MerchantKeyStore,
storage_scheme: MerchantStorageScheme,
create_resource_fn: R,
resource_new: D,
resource_params: InsertResourceParams<'_>,
resource_new: M,
InsertResourceParams {
insertable,
reverse_lookups,
key,
identifier,
resource_type,
}: InsertResourceParams<'_>,
) -> error_stack::Result<D, errors::StorageError>
where
D: de::DeserializeOwned
D: Debug + Sync + Conversion,
M: de::DeserializeOwned
+ serde::Serialize
+ Debug
+ KvStorePartition
+ UniqueConstraints
+ Sync,
DFut: futures::Future<Output = error_stack::Result<D, DatabaseError>> + Send,
+ Sync
+ ReverseConversion<D>,
DFut: futures::Future<Output = error_stack::Result<M, DatabaseError>> + Send,
R: FnOnce() -> DFut,
{
let storage_scheme = Box::pin(decide_storage_scheme::<_, D>(
let storage_scheme = Box::pin(decide_storage_scheme::<_, M>(
self,
storage_scheme,
Op::Insert,
Expand All @@ -237,69 +279,80 @@ impl<T: DatabaseStore> KVRouterStore<T> {
error.change_context(new_err)
}),
MerchantStorageScheme::RedisKv => {
let key = resource_params.key;
let key_str = key.to_string();
let reverse_lookup_entry = |v: String| diesel_models::ReverseLookupNew {
sk_id: resource_params.field.clone(),
sk_id: identifier.clone(),
pk_id: key_str.clone(),
lookup_id: v,
source: resource_params.resource_type.to_string(),
source: resource_type.to_string(),
updated_by: storage_scheme.to_string(),
};
let results = resource_params
.reverse_lookups
let results = reverse_lookups
.into_iter()
.map(|v| self.insert_reverse_lookup(reverse_lookup_entry(v), storage_scheme));

futures::future::try_join_all(results).await?;

let redis_entry = kv::TypedSql {
op: kv::DBOperation::Insert {
insertable: Box::new(resource_params.insertable),
insertable: Box::new(insertable),
},
};
match Box::pin(kv_wrapper::<D, _, _>(
match Box::pin(kv_wrapper::<M, _, _>(
self,
KvOperation::<D>::HSetNx(&resource_params.field, &resource_new, redis_entry),
KvOperation::<M>::HSetNx(&identifier, &resource_new, redis_entry),
key.clone(),
))
.await
.map_err(|err| err.to_redis_failed_response(&key.to_string()))?
.try_into_hsetnx()
{
Ok(HsetnxReply::KeyNotSet) => Err(errors::StorageError::DuplicateValue {
entity: resource_params.resource_type,
entity: resource_type,
key: Some(key_str),
}
.into()),
Ok(HsetnxReply::KeySet) => Ok(resource_new),
Err(er) => Err(er).change_context(errors::StorageError::KVError),
}
}
}
}?
.convert(
state,
key_store.key.get_inner(),
key_store.merchant_id.clone().into(),
)
.await
.change_context(errors::StorageError::DecryptionError)
}

pub async fn update_resource<DFut, D, R>(
pub async fn update_resource<DFut, D, R, M>(
&self,
state: &KeyManagerState,
key_store: &MerchantKeyStore,
storage_scheme: MerchantStorageScheme,
update_resource_fn: R,
updated_resource: D,
updateable: kv::Updateable,
operation: Op<'_>,
updated_resource: M,
UpdateResourceParams {
updateable,
operation,
}: UpdateResourceParams<'_>,
) -> error_stack::Result<D, errors::StorageError>
where
D: de::DeserializeOwned
D: Debug + Sync + Conversion,
M: de::DeserializeOwned
+ serde::Serialize
+ Debug
+ KvStorePartition
+ UniqueConstraints
+ Sync,
DFut: futures::Future<Output = error_stack::Result<D, DatabaseError>> + Send,
+ Sync
+ ReverseConversion<D>,
DFut: futures::Future<Output = error_stack::Result<M, DatabaseError>> + Send,
R: FnOnce() -> DFut,
{
match operation {
Op::Update(key, field, updated_by) => {
let storage_scheme = Box::pin(decide_storage_scheme::<_, D>(
let storage_scheme = Box::pin(decide_storage_scheme::<_, M>(
self,
storage_scheme,
Op::Update(key.clone(), field, updated_by),
Expand All @@ -324,7 +377,7 @@ impl<T: DatabaseStore> KVRouterStore<T> {
};
Box::pin(kv_wrapper::<(), _, _>(
self,
KvOperation::<D>::Hset((field, redis_value), redis_entry),
KvOperation::<M>::Hset((field, redis_value), redis_entry),
key,
))
.await
Expand All @@ -336,40 +389,53 @@ impl<T: DatabaseStore> KVRouterStore<T> {
}
}
_ => Err(errors::StorageError::KVError.into()),
}
}?
.convert(
state,
key_store.key.get_inner(),
key_store.merchant_id.clone().into(),
)
.await
.change_context(errors::StorageError::DecryptionError)
}
pub async fn filter_resources<DFut, D, R>(
pub async fn filter_resources<DFut, D, R, M>(
&self,
state: &KeyManagerState,
key_store: &MerchantKeyStore,
storage_scheme: MerchantStorageScheme,
filter_resources_fn: R,
key: PartitionKey<'_>,
pattern: &str,
filter_fn: impl Fn(&D) -> bool,
limit: Option<i64>,
filter_resource_db_fn: R,
filter_fn: impl Fn(&M) -> bool,
FilterResourceParams {
key,
pattern,
limit,
}: FilterResourceParams<'_>,
) -> error_stack::Result<Vec<D>, errors::StorageError>
where
D: de::DeserializeOwned
D: Debug + Sync + Conversion,
M: de::DeserializeOwned
+ serde::Serialize
+ Debug
+ KvStorePartition
+ UniqueConstraints
+ Sync,
DFut: futures::Future<Output = error_stack::Result<Vec<D>, DatabaseError>> + Send,
+ Sync
+ ReverseConversion<D>,
DFut: futures::Future<Output = error_stack::Result<Vec<M>, DatabaseError>> + Send,
R: FnOnce() -> DFut,
{
let db_call = || async {
filter_resources_fn().await.map_err(|error| {
filter_resource_db_fn().await.map_err(|error| {
let new_err = diesel_error_to_data_error(*error.current_context());
error.change_context(new_err)
})
};
match storage_scheme {
let resources = match storage_scheme {
MerchantStorageScheme::PostgresOnly => db_call().await,
MerchantStorageScheme::RedisKv => {
let redis_fut = async {
let kv_result = Box::pin(kv_wrapper::<D, _, _>(
let kv_result = Box::pin(kv_wrapper::<M, _, _>(
self,
KvOperation::<D>::Scan(pattern),
KvOperation::<M>::Scan(pattern),
key,
))
.await?
Expand All @@ -379,7 +445,20 @@ impl<T: DatabaseStore> KVRouterStore<T> {

Box::pin(find_all_combined_kv_database(redis_fut, db_call, limit)).await
}
}
}?;
let resource_futures = resources
.into_iter()
.map(|pm| async {
pm.convert(
state,
key_store.key.get_inner(),
key_store.merchant_id.clone().into(),
)
.await
.change_context(errors::StorageError::DecryptionError)
})
.collect::<Vec<_>>();
futures::future::try_join_all(resource_futures).await
}
}

Expand Down
Loading

0 comments on commit 31c0427

Please sign in to comment.