From 31c0427bd203fd5efa495b12629b855b308832c8 Mon Sep 17 00:00:00 2001 From: Jagan Elavarasan Date: Tue, 25 Feb 2025 17:06:02 +0530 Subject: [PATCH] refactor(pm): move domain conversion handler to kv_router_store --- crates/storage_impl/src/kv_router_store.rs | 223 ++++++++++++++------- crates/storage_impl/src/payment_method.rs | 169 +++++----------- 2 files changed, 206 insertions(+), 186 deletions(-) diff --git a/crates/storage_impl/src/kv_router_store.rs b/crates/storage_impl/src/kv_router_store.rs index 310f4aed1c5..1accb653799 100644 --- a/crates/storage_impl/src/kv_router_store.rs +++ b/crates/storage_impl/src/kv_router_store.rs @@ -10,10 +10,14 @@ use crate::redis::kv_store::{KvStorePartition, PartitionKey, RedisConnInterface} use crate::utils::find_all_combined_kv_database; use crate::utils::try_redis_get_else_try_database_get; use common_enums::enums::MerchantStorageScheme; -use common_utils::fallback_reverse_lookup_not_found; +use common_utils::{fallback_reverse_lookup_not_found, types::keymanager::KeyManagerState}; use diesel_models::{errors::DatabaseError, kv}; use error_stack::ResultExt; -use hyperswitch_domain_models::errors::{self, StorageResult}; +use hyperswitch_domain_models::{ + behaviour::{Conversion, ReverseConversion}, + errors::{self, StorageResult}, + merchant_key_store::MerchantKeyStore, +}; use masking::StrongSecret; use serde::de; use std::fmt::Debug; @@ -46,10 +50,23 @@ pub struct InsertResourceParams<'a> { pub insertable: kv::Insertable, pub reverse_lookups: Vec, pub key: PartitionKey<'a>, - pub field: String, + // secondary key + pub identifier: String, + // type of resource Eg: "payment_attempt" pub resource_type: &'static str, } +pub struct UpdateResourceParams<'a> { + pub updateable: kv::Updateable, + pub operation: Op<'a>, +} + +pub struct FilterResourceParams<'a> { + pub key: PartitionKey<'a>, + pub pattern: &'static str, + pub limit: Option, +} + #[async_trait::async_trait] impl DatabaseStore for KVRouterStore where @@ -154,78 +171,103 @@ impl KVRouterStore { .change_context(RedisError::StreamAppendFailed) } - pub async fn find_resource_by_id( + pub async fn find_resource_by_id( &self, + state: &KeyManagerState, + key_store: &MerchantKeyStore, storage_scheme: MerchantStorageScheme, - find_resource_fn: R, + find_resource_db_fn: R, lookup_id: String, ) -> error_stack::Result where - D: de::DeserializeOwned + D: Debug + Sync + Conversion, + M: de::DeserializeOwned + serde::Serialize + Debug + KvStorePartition + UniqueConstraints - + Sync, - DFut: futures::Future> + Send, + + Sync + + ReverseConversion, + DFut: futures::Future> + Send, R: FnOnce() -> DFut, { let database_call = || async { - find_resource_fn().await.map_err(|error| { + find_resource_db_fn().await.map_err(|error| { let new_err = diesel_error_to_data_error(*error.current_context()); error.change_context(new_err) }) }; - let storage_scheme = Box::pin(decide_storage_scheme::( + let storage_scheme = Box::pin(decide_storage_scheme::( self, storage_scheme, Op::Find, )) .await; - match storage_scheme { - MerchantStorageScheme::PostgresOnly => database_call().await, - MerchantStorageScheme::RedisKv => { - let lookup = fallback_reverse_lookup_not_found!( - self.get_lookup_by_lookup_id(&lookup_id, storage_scheme) - .await, - database_call().await - ); + let res = || async { + match storage_scheme { + MerchantStorageScheme::PostgresOnly => database_call().await, + MerchantStorageScheme::RedisKv => { + let lookup = fallback_reverse_lookup_not_found!( + self.get_lookup_by_lookup_id(&lookup_id, storage_scheme) + .await, + database_call().await + ); - let key = PartitionKey::CombinationKey { - combination: &lookup.pk_id, - }; + let key = PartitionKey::CombinationKey { + combination: &lookup.pk_id, + }; - Box::pin(try_redis_get_else_try_database_get( - async { - Box::pin(kv_wrapper(self, KvOperation::::HGet(&lookup.sk_id), key)) - .await? - .try_into_hget() - }, - database_call, - )) - .await + Box::pin(try_redis_get_else_try_database_get( + async { + Box::pin(kv_wrapper(self, KvOperation::::HGet(&lookup.sk_id), key)) + .await? + .try_into_hget() + }, + database_call, + )) + .await + } } - } + }; + res() + .await? + .convert( + state, + key_store.key.get_inner(), + key_store.merchant_id.clone().into(), + ) + .await + .change_context(errors::StorageError::DecryptionError) } - pub async fn insert_resource( + pub async fn insert_resource( &self, + state: &KeyManagerState, + key_store: &MerchantKeyStore, storage_scheme: MerchantStorageScheme, create_resource_fn: R, - resource_new: D, - resource_params: InsertResourceParams<'_>, + resource_new: M, + InsertResourceParams { + insertable, + reverse_lookups, + key, + identifier, + resource_type, + }: InsertResourceParams<'_>, ) -> error_stack::Result where - D: de::DeserializeOwned + D: Debug + Sync + Conversion, + M: de::DeserializeOwned + serde::Serialize + Debug + KvStorePartition + UniqueConstraints - + Sync, - DFut: futures::Future> + Send, + + Sync + + ReverseConversion, + DFut: futures::Future> + Send, R: FnOnce() -> DFut, { - let storage_scheme = Box::pin(decide_storage_scheme::<_, D>( + let storage_scheme = Box::pin(decide_storage_scheme::<_, M>( self, storage_scheme, Op::Insert, @@ -237,17 +279,15 @@ impl KVRouterStore { error.change_context(new_err) }), MerchantStorageScheme::RedisKv => { - let key = resource_params.key; let key_str = key.to_string(); let reverse_lookup_entry = |v: String| diesel_models::ReverseLookupNew { - sk_id: resource_params.field.clone(), + sk_id: identifier.clone(), pk_id: key_str.clone(), lookup_id: v, - source: resource_params.resource_type.to_string(), + source: resource_type.to_string(), updated_by: storage_scheme.to_string(), }; - let results = resource_params - .reverse_lookups + let results = reverse_lookups .into_iter() .map(|v| self.insert_reverse_lookup(reverse_lookup_entry(v), storage_scheme)); @@ -255,12 +295,12 @@ impl KVRouterStore { let redis_entry = kv::TypedSql { op: kv::DBOperation::Insert { - insertable: Box::new(resource_params.insertable), + insertable: Box::new(insertable), }, }; - match Box::pin(kv_wrapper::( + match Box::pin(kv_wrapper::( self, - KvOperation::::HSetNx(&resource_params.field, &resource_new, redis_entry), + KvOperation::::HSetNx(&identifier, &resource_new, redis_entry), key.clone(), )) .await @@ -268,7 +308,7 @@ impl KVRouterStore { .try_into_hsetnx() { Ok(HsetnxReply::KeyNotSet) => Err(errors::StorageError::DuplicateValue { - entity: resource_params.resource_type, + entity: resource_type, key: Some(key_str), } .into()), @@ -276,30 +316,43 @@ impl KVRouterStore { Err(er) => Err(er).change_context(errors::StorageError::KVError), } } - } + }? + .convert( + state, + key_store.key.get_inner(), + key_store.merchant_id.clone().into(), + ) + .await + .change_context(errors::StorageError::DecryptionError) } - pub async fn update_resource( + pub async fn update_resource( &self, + state: &KeyManagerState, + key_store: &MerchantKeyStore, storage_scheme: MerchantStorageScheme, update_resource_fn: R, - updated_resource: D, - updateable: kv::Updateable, - operation: Op<'_>, + updated_resource: M, + UpdateResourceParams { + updateable, + operation, + }: UpdateResourceParams<'_>, ) -> error_stack::Result where - D: de::DeserializeOwned + D: Debug + Sync + Conversion, + M: de::DeserializeOwned + serde::Serialize + Debug + KvStorePartition + UniqueConstraints - + Sync, - DFut: futures::Future> + Send, + + Sync + + ReverseConversion, + DFut: futures::Future> + Send, R: FnOnce() -> DFut, { match operation { Op::Update(key, field, updated_by) => { - let storage_scheme = Box::pin(decide_storage_scheme::<_, D>( + let storage_scheme = Box::pin(decide_storage_scheme::<_, M>( self, storage_scheme, Op::Update(key.clone(), field, updated_by), @@ -324,7 +377,7 @@ impl KVRouterStore { }; Box::pin(kv_wrapper::<(), _, _>( self, - KvOperation::::Hset((field, redis_value), redis_entry), + KvOperation::::Hset((field, redis_value), redis_entry), key, )) .await @@ -336,40 +389,53 @@ impl KVRouterStore { } } _ => Err(errors::StorageError::KVError.into()), - } + }? + .convert( + state, + key_store.key.get_inner(), + key_store.merchant_id.clone().into(), + ) + .await + .change_context(errors::StorageError::DecryptionError) } - pub async fn filter_resources( + pub async fn filter_resources( &self, + state: &KeyManagerState, + key_store: &MerchantKeyStore, storage_scheme: MerchantStorageScheme, - filter_resources_fn: R, - key: PartitionKey<'_>, - pattern: &str, - filter_fn: impl Fn(&D) -> bool, - limit: Option, + filter_resource_db_fn: R, + filter_fn: impl Fn(&M) -> bool, + FilterResourceParams { + key, + pattern, + limit, + }: FilterResourceParams<'_>, ) -> error_stack::Result, errors::StorageError> where - D: de::DeserializeOwned + D: Debug + Sync + Conversion, + M: de::DeserializeOwned + serde::Serialize + Debug + KvStorePartition + UniqueConstraints - + Sync, - DFut: futures::Future, DatabaseError>> + Send, + + Sync + + ReverseConversion, + DFut: futures::Future, DatabaseError>> + Send, R: FnOnce() -> DFut, { let db_call = || async { - filter_resources_fn().await.map_err(|error| { + filter_resource_db_fn().await.map_err(|error| { let new_err = diesel_error_to_data_error(*error.current_context()); error.change_context(new_err) }) }; - match storage_scheme { + let resources = match storage_scheme { MerchantStorageScheme::PostgresOnly => db_call().await, MerchantStorageScheme::RedisKv => { let redis_fut = async { - let kv_result = Box::pin(kv_wrapper::( + let kv_result = Box::pin(kv_wrapper::( self, - KvOperation::::Scan(pattern), + KvOperation::::Scan(pattern), key, )) .await? @@ -379,7 +445,20 @@ impl KVRouterStore { Box::pin(find_all_combined_kv_database(redis_fut, db_call, limit)).await } - } + }?; + let resource_futures = resources + .into_iter() + .map(|pm| async { + pm.convert( + state, + key_store.key.get_inner(), + key_store.merchant_id.clone().into(), + ) + .await + .change_context(errors::StorageError::DecryptionError) + }) + .collect::>(); + futures::future::try_join_all(resource_futures).await } } diff --git a/crates/storage_impl/src/payment_method.rs b/crates/storage_impl/src/payment_method.rs index f067e2b0c2a..19fda8adee9 100644 --- a/crates/storage_impl/src/payment_method.rs +++ b/crates/storage_impl/src/payment_method.rs @@ -19,7 +19,12 @@ use crate::{ redis::kv_store::{Op, PartitionKey}, utils::{pg_connection_read, pg_connection_write}, }; -use crate::{kv_router_store::{KVRouterStore, InsertResourceParams}, DatabaseStore, RouterStore}; +use crate::{ + kv_router_store::{ + FilterResourceParams, InsertResourceParams, KVRouterStore, UpdateResourceParams, + }, + DatabaseStore, RouterStore, +}; use hyperswitch_domain_models::{ errors, merchant_key_store::MerchantKeyStore, @@ -43,18 +48,13 @@ impl PaymentMethodInterface for KVRouterStore { ) -> CustomResult { let conn = pg_connection_read(self).await?; self.find_resource_by_id( + state, + key_store, storage_scheme, || async { PaymentMethod::find_by_payment_method_id(&conn, payment_method_id).await }, format!("payment_method_{}", payment_method_id), ) - .await? - .convert( - state, - key_store.key.get_inner(), - key_store.merchant_id.clone().into(), - ) .await - .change_context(errors::StorageError::DecryptionError) } #[cfg(all(feature = "v2", feature = "payment_methods_v2"))] @@ -72,14 +72,7 @@ impl PaymentMethodInterface for KVRouterStore { || async { PaymentMethod::find_by_payment_method_id(&conn, payment_method_id).await }, format!("payment_method_{}", payment_method_id.get_string_repr()), ) - .await? - .convert( - state, - key_store.key.get_inner(), - key_store.merchant_id.clone().into(), - ) .await - .change_context(errors::StorageError::DecryptionError) } #[cfg(all( @@ -96,18 +89,13 @@ impl PaymentMethodInterface for KVRouterStore { ) -> CustomResult { let conn = pg_connection_read(self).await?; self.find_resource_by_id( + state, + key_store, storage_scheme, || async { PaymentMethod::find_by_locker_id(&conn, locker_id).await }, format!("payment_method_locker_{}", locker_id), ) - .await? - .convert( - state, - key_store.key.get_inner(), - key_store.merchant_id.clone().into(), - ) .await - .change_context(errors::StorageError::DecryptionError) } // not supported in kv @@ -168,7 +156,7 @@ impl PaymentMethodInterface for KVRouterStore { merchant_id: &payment_method_new.merchant_id.clone(), customer_id: &payment_method_new.customer_id.clone(), }; - let field = format!("payment_method_id_{}", payment_method_new.get_id()); + let identifier = format!("payment_method_id_{}", payment_method_new.get_id()); let lookup_id1 = format!("payment_method_{}", payment_method_new.get_id()); let mut reverse_lookups = vec![lookup_id1]; if let Some(locker_id) = &payment_method_new.locker_id { @@ -176,6 +164,8 @@ impl PaymentMethodInterface for KVRouterStore { } let payment_method = (&payment_method_new.clone()).into(); self.insert_resource( + state, + key_store, storage_scheme, || async { payment_method_new.clone().insert(&conn).await }, payment_method, @@ -183,18 +173,11 @@ impl PaymentMethodInterface for KVRouterStore { insertable: kv::Insertable::PaymentMethod(payment_method_new.clone()), reverse_lookups, key, - field, + identifier, resource_type: "payment_method", }, ) - .await? - .convert( - state, - key_store.key.get_inner(), - key_store.merchant_id.clone().into(), - ) .await - .change_context(errors::StorageError::DecryptionError) } #[cfg(all( @@ -226,6 +209,8 @@ impl PaymentMethodInterface for KVRouterStore { payment_method_update.convert_to_payment_method_update(storage_scheme); let updated_payment_method = p_update.clone().apply_changeset(payment_method.clone()); self.update_resource( + state, + key_store, storage_scheme, || async { payment_method @@ -234,24 +219,21 @@ impl PaymentMethodInterface for KVRouterStore { .await }, updated_payment_method, - kv::Updateable::PaymentMethodUpdate(Box::new(kv::PaymentMethodUpdateMems { - orig: payment_method.clone(), - update_data: p_update.clone(), - })), - Op::Update( - key.clone(), - &field, - payment_method.clone().updated_by.as_deref(), - ), - ) - .await? - .convert( - state, - key_store.key.get_inner(), - key_store.merchant_id.clone().into(), + UpdateResourceParams { + updateable: kv::Updateable::PaymentMethodUpdate(Box::new( + kv::PaymentMethodUpdateMems { + orig: payment_method.clone(), + update_data: p_update.clone(), + }, + )), + operation: Op::Update( + key.clone(), + &field, + payment_method.clone().updated_by.as_deref(), + ), + }, ) .await - .change_context(errors::StorageError::DecryptionError) } #[cfg(all(feature = "v2", feature = "payment_methods_v2"))] @@ -343,36 +325,22 @@ impl PaymentMethodInterface for KVRouterStore { ) .await }; - let payment_methods = self - .filter_resources( - storage_scheme, - database_call, - PartitionKey::MerchantIdCustomerId { + self.filter_resources( + state, + key_store, + storage_scheme, + database_call, + |pm| pm.status == status, + FilterResourceParams { + key: PartitionKey::MerchantIdCustomerId { merchant_id, customer_id, }, - "payment_method_id_*", - |pm| pm.status == status, + pattern: "payment_method_id_*", limit, - ) - .await?; - - let pm_futures = payment_methods - .into_iter() - .map(|pm| async { - pm.convert( - state, - key_store.key.get_inner(), - key_store.merchant_id.clone().into(), - ) - .await - .change_context(errors::StorageError::DecryptionError) - }) - .collect::>(); - - let domain_payment_methods = futures::future::try_join_all(pm_futures).await?; - - Ok(domain_payment_methods) + }, + ) + .await } #[cfg(all(feature = "v2", feature = "customer_v2"))] @@ -429,27 +397,9 @@ impl PaymentMethodInterface for KVRouterStore { key_store: &MerchantKeyStore, payment_method: DomainPaymentMethod, ) -> CustomResult { - let payment_method = Conversion::convert(payment_method) - .await - .change_context(errors::StorageError::DecryptionError)?; - let conn = pg_connection_write(self).await?; - let payment_method_update = PaymentMethodUpdate::StatusUpdate { - status: Some(common_enums::PaymentMethodStatus::Inactive), - }; - payment_method - .update_with_id(&conn, payment_method_update.into()) - .await - .map_err(|error| { - let new_err = diesel_error_to_data_error(*error.current_context()); - error.change_context(new_err) - })? - .convert( - state, - key_store.key.get_inner(), - key_store.merchant_id.clone().into(), - ) + self.router_store + .delete_payment_method(state, key_store, payment_method) .await - .change_context(errors::StorageError::DecryptionError) } // Check if KV stuff is needed here @@ -461,19 +411,9 @@ impl PaymentMethodInterface for KVRouterStore { fingerprint_id: &str, ) -> CustomResult { let conn = pg_connection_read(self).await?; - PaymentMethod::find_by_fingerprint_id(&conn, fingerprint_id) - .await - .map_err(|error| { - let new_err = diesel_error_to_data_error(*error.current_context()); - error.change_context(new_err) - })? - .convert( - state, - key_store.key.get_inner(), - key_store.merchant_id.clone().into(), - ) + self.router_store + .find_payment_method_by_fingerprint_id(state, key_store, fingerprint_id) .await - .change_context(errors::StorageError::DecryptionError) } } @@ -507,7 +447,7 @@ impl PaymentMethodInterface for RouterStore { _storage_scheme: MerchantStorageScheme, ) -> CustomResult { let conn = pg_connection_read(self).await?; - self.find_resource_by_id(state, key_store, || async { + self.call_database(state, key_store, || async { PaymentMethod::find_by_id(&conn, payment_method_id).await }) .await @@ -613,15 +553,16 @@ impl PaymentMethodInterface for RouterStore { payment_method_update: PaymentMethodUpdate, storage_scheme: MerchantStorageScheme, ) -> CustomResult { - self.router_store - .update_payment_method( - state, - key_Store, - payment_method, - payment_method_update, - storage_scheme, - ) + let payment_method = Conversion::convert(payment_method) .await + .change_context(errors::StorageError::DecryptionError)?; + let conn = pg_connection_write(self).await?; + self.call_database(state, key_store, || async { + payment_method + .update_with_payment_method_id(&conn, payment_method_update.into()) + .await + }) + .await } #[cfg(all(