Skip to content

Commit

Permalink
type abstractions
Browse files Browse the repository at this point in the history
  • Loading branch information
avdb13 committed Nov 6, 2024
1 parent fec140a commit 078ba1c
Show file tree
Hide file tree
Showing 12 changed files with 108 additions and 317 deletions.
67 changes: 8 additions & 59 deletions atrium-common/src/resolver.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,11 @@
mod cache_impl;
mod cached_resolver;
mod cached;
mod error;
mod throttled_resolver;
mod throttled;

use crate::types::entry::Entry;

pub use self::cached_resolver::{CachedResolver, CachedResolverConfig};
pub use self::cached::CachedResolver;
pub use self::error::{Error, Result};
pub use self::throttled_resolver::ThrottledResolver;
pub use self::throttled::ThrottledResolver;
use std::future::Future;
use std::hash::Hash;

#[cfg_attr(not(target_arch = "wasm32"), trait_variant::make(Send))]
pub trait Resolver {
Expand All @@ -23,58 +19,11 @@ pub trait Resolver {
) -> impl Future<Output = std::result::Result<Option<Self::Output>, Self::Error>>;
}

#[cfg_attr(not(target_arch = "wasm32"), trait_variant::make(Send))]
pub trait Index<R>
where
R: Resolver,
{
fn index(&self, input: &R::Input) -> impl Future<Output = Entry<'static, R>> + 'static;
}

pub trait Cacheable
where
Self: Sized + Resolver,
Self::Input: Sized,
Self::Error: std::error::Error,
{
fn cached(self, config: CachedResolverConfig) -> CachedResolver<Self>;
}

impl<R> Cacheable for R
where
R: Sized + Resolver,
R::Input: Sized + Hash + Eq + Send + Sync + 'static,
R::Output: Clone + Send + Sync + 'static,
R::Error: std::error::Error + Send + Sync + 'static,
{
fn cached(self, config: CachedResolverConfig) -> CachedResolver<Self> {
CachedResolver::new(self, config)
}
}

pub trait Throttleable
where
Self: Sized + Resolver,
Self::Input: Sized,
{
fn throttled(self) -> ThrottledResolver<Self, Self::Error>;
}

impl<R> Throttleable for R
where
R: Resolver,
R::Input: Clone + Hash + Eq + Send + Sync + 'static,
R::Output: Clone + Send + Sync + 'static,
R::Error: std::error::Error + Send + Sync + 'static,
{
fn throttled(self) -> ThrottledResolver<Self, Self::Error> {
ThrottledResolver::new(self, Default::default())
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::types::cached::{CacheConfig, Cacheable};
use crate::types::throttled::Throttleable;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
Expand Down Expand Up @@ -186,7 +135,7 @@ mod tests {
async fn test_cached_with_max_capacity() {
let counts = Arc::new(RwLock::new(HashMap::new()));
let resolver = mock_resolver(counts.clone())
.cached(CachedResolverConfig { max_capacity: Some(1), ..Default::default() });
.cached(CacheConfig { max_capacity: Some(1), ..Default::default() });
for (input, expected) in [
("k1", Some("v1")),
("k2", Some("v2")),
Expand Down Expand Up @@ -216,7 +165,7 @@ mod tests {
#[cfg_attr(target_arch = "wasm32", wasm_bindgen_test)]
async fn test_cached_with_time_to_live() {
let counts = Arc::new(RwLock::new(HashMap::new()));
let resolver = mock_resolver(counts.clone()).cached(CachedResolverConfig {
let resolver = mock_resolver(counts.clone()).cached(CacheConfig {
time_to_live: Some(Duration::from_millis(10)),
..Default::default()
});
Expand Down
9 changes: 0 additions & 9 deletions atrium-common/src/resolver/cache_impl.rs

This file was deleted.

3 changes: 3 additions & 0 deletions atrium-common/src/resolver/cached.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
use crate::types::cached::Cached;

pub type CachedResolver<T, S> = Cached<T, S>;
66 changes: 0 additions & 66 deletions atrium-common/src/resolver/cached_resolver.rs

This file was deleted.

3 changes: 3 additions & 0 deletions atrium-common/src/resolver/throttled.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
use crate::types::throttled::{SenderMap, Throttled};

pub type ThrottledResolver<T> = Throttled<T, SenderMap<T>>;
73 changes: 0 additions & 73 deletions atrium-common/src/resolver/throttled_resolver.rs

This file was deleted.

1 change: 0 additions & 1 deletion atrium-common/src/types.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,2 @@
pub mod cached;
pub mod entry;
pub mod throttled;
62 changes: 38 additions & 24 deletions atrium-common/src/types/cached.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,21 @@ pub struct CacheConfig {
pub time_to_live: Option<Duration>,
}

pub trait Cacheable<S> {
fn cached(self, config: CacheConfig) -> Cached<Self, S>
where
Self: Sized;
}

impl<T, S> Cacheable<S> for T
where
S: From<CacheConfig>,
{
fn cached(self, config: CacheConfig) -> Cached<Self, S> {
Cached::new(self, config)
}
}

#[cfg_attr(not(target_arch = "wasm32"), trait_variant::make(Send))]
pub(crate) trait Cache {
type Input: Hash + Eq + Sync + 'static;
Expand All @@ -29,42 +44,41 @@ pub(crate) trait Cache {
async fn set(&self, key: Self::Input, value: Self::Output);
}

pub struct Cached<T, K, V> {
pub struct Cached<T, S> {
inner: T,
cache: CacheImpl<K, V>,
cache: S,
}

pub type CachedResolver<R>
impl<T, S> Cached<T, S>
where
R: Resolver,
= Cached<R, R::Input, R::Output>;

impl<T, K, V> Cached<T, K, V>
// where
// T: Resolver,
// K: Sized + Hash + Eq + Send + Sync + 'static,
// V: Clone + Send + Sync + 'static,
S: From<CacheConfig>,
{
pub fn new(inner: T, config: CacheConfig) -> Self {
Self { inner, cache: CacheImpl::new(config) }
Self { inner, cache: config.into() }
}
}

impl<R> Resolver for CachedResolver<R>
impl<T> Resolver for Cached<T, CacheImpl<T::Input, T::Output>>
where
R: Resolver + Send + Sync + 'static,
R::Input: Clone + Hash + Eq + Send + Sync + 'static + Debug,
R::Output: Clone + Send + Sync + 'static,
T: Resolver + Send + Sync + 'static,
T::Input: Clone + Hash + Eq + Send + Sync + 'static,
T::Output: Clone + Send + Sync + 'static,
{
type Input = R::Input;
type Output = R::Output;
type Input = T::Input;
type Output = T::Output;
type Error = T::Error;

async fn resolve(&self, input: &Self::Input) -> Result<Option<Self::Output>, Self::Error> {
match self.cache.get(input).await {
Some(cached) => Ok(Some(cached)),
None => {
let result = self.inner.resolve(input).await?;

async fn resolve(&self, input: &Self::Input) -> Result<Self::Output> {
if let Some(output) = self.cache.get(input).await {
return Ok(output);
if let Some(result) = result.as_ref().cloned() {
self.cache.set(input.clone(), result.clone()).await;
}
Ok(result)
}
}
let output = self.resolver.resolve(input).await?;
self.cache.set(input.clone(), output.clone()).await;
Ok(output)
}
}
13 changes: 11 additions & 2 deletions atrium-common/src/types/cached/moka.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::super::cached_resolver::{Cache as CacheTrait, CachedResolverConfig};
use super::{Cache as CacheTrait, CacheConfig};
use moka::{future::Cache, policy::EvictionPolicy};
use std::collections::hash_map::RandomState;
use std::hash::Hash;
Expand All @@ -15,7 +15,7 @@ where
type Input = I;
type Output = O;

fn new(config: CachedResolverConfig) -> Self {
fn new(config: CacheConfig) -> Self {
let mut builder = Cache::<I, O, _>::builder().eviction_policy(EvictionPolicy::lru());
if let Some(max_capacity) = config.max_capacity {
builder = builder.max_capacity(max_capacity);
Expand All @@ -33,3 +33,12 @@ where
self.inner.insert(key, value).await;
}
}
impl<I, O> From<CacheConfig> for MokaCache<I, O>
where
I: Hash + Eq + Send + Sync + 'static,
O: Clone + Send + Sync + 'static,
{
fn from(config: CacheConfig) -> Self {
Self::new(config)
}
}
4 changes: 2 additions & 2 deletions atrium-common/src/types/cached/wasm.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::super::cached_resolver::{Cache as CacheTrait, CachedResolverConfig};
use super::{Cache as CacheTrait, CacheConfig};
use lru::LruCache;
use std::collections::HashMap;
use std::hash::Hash;
Expand Down Expand Up @@ -64,7 +64,7 @@ where
type Input = I;
type Output = O;

fn new(config: CachedResolverConfig) -> Self {
fn new(config: CacheConfig) -> Self {
let store = if let Some(max_capacity) = config.max_capacity {
Store::Lru(LruCache::new(
NonZeroUsize::new(max_capacity as usize)
Expand Down
Loading

0 comments on commit 078ba1c

Please sign in to comment.