Skip to content

Commit

Permalink
fix db bug
Browse files Browse the repository at this point in the history
  • Loading branch information
robatipoor committed Jan 19, 2024
1 parent b99392f commit 7c2ec62
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 89 deletions.
99 changes: 35 additions & 64 deletions api/src/database/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,35 +48,25 @@ impl Database {
.transpose()
}

pub async fn fetch_count(&self, file_path: &FilePath) -> ApiResult<Option<MetaDataFile>> {
let mut output = None;
self
pub fn update(&self, file_path: &FilePath, old: MetaDataFile, new: MetaDataFile) -> ApiResult {
let old = IVec::try_from(old)?;
let new = IVec::try_from(new)?;
let file_path = IVec::try_from(file_path)?;
let result = self
.inner
.fetch_and_update(IVec::try_from(file_path)?, |value| {
value
.map(|val| {
MetaDataFile::try_from(val)
.map(|mut meta| {
meta.count_downloads += 1;
let val = IVec::try_from(&meta);
output = Some(meta);
val
.map_err(|err| {
tracing::error!("Covnert MetaDataFile to IVec unsuccessfully: {err}");
err
})
.ok()
})
.map_err(|err| {
tracing::error!("Covnert IVec to MetaDataFile unsuccessfully: {err}");
err
})
.ok()
})
.flatten()
.flatten()
})?;
Ok(output)
.compare_and_swap(&file_path, Some(old), Some(new))?;
match result {
Ok(_) => Ok(()),
Err(err) if err.current.is_some() => Err(ApiError::BadRequestError(
"Update meta data failed".to_string(),
)),
Err(err) => {
tracing::error!("Compare and swap error: {err}");
Err(ApiError::DatabaseError(sled::Error::ReportableBug(
"Storing the meta data file in the database faild.".to_string(),
)))
}
}
}

pub fn exist(&self, path: &FilePath) -> ApiResult<bool> {
Expand All @@ -95,11 +85,11 @@ impl Database {
let expire = (expire_time, path);
match self.expires.write() {
Ok(mut guard) => {
let is_gc_notify = guard
.iter()
.next()
.filter(|(exp, _)| *exp < Utc::now())
.is_some();
let is_gc_notify = if let Some((first_expire, _)) = guard.iter().next() {
*first_expire > expire_time
} else {
true
};
guard.insert(expire.clone());
drop(guard);
if is_gc_notify {
Expand All @@ -113,7 +103,9 @@ impl Database {
}
}
Err(err) if err.current.is_some() => {
return Err(ApiError::ResourceExistsError("File path exists".to_string()));
return Err(ApiError::ResourceExistsError(
"File path exists".to_string(),
));
}
Err(err) => {
tracing::error!("Compare and swap error: {err}");
Expand All @@ -138,7 +130,7 @@ impl Database {
guard.remove(&(meta.expiration_date, path));
}
Err(err) => {
tracing::error!("Get expires lock unsuccessfully: {err}");
tracing::error!("Failed to acquire expires lock: {err}");
}
}
Ok(Some(meta))
Expand All @@ -148,21 +140,20 @@ impl Database {
}

pub async fn purge(&self) -> ApiResult<Option<Duration>> {
let now = Utc::now();
match self.expires.write() {
Ok(mut guard) => {
let expires = &mut *guard;
while let Some((expire_date, path)) = expires.iter().next().cloned() {
if expire_date < now {
if expire_date < Utc::now() {
self.inner.remove(&IVec::try_from(&path)?)?;
expires.remove(&(expire_date, path));
} else {
return Ok(Some((expire_date - now).to_std()?));
return Ok(Some((expire_date - Utc::now()).to_std()?));
}
}
}
Err(err) => {
tracing::error!("Get expires lock unsuccessfully: {err}");
tracing::error!("Failed to acquire expires lock: {err}");
return Err(ApiError::LockError(err.to_string()));
}
}
Expand Down Expand Up @@ -331,7 +322,7 @@ mod tests {

#[test_context(StateTestContext)]
#[tokio::test]
async fn test_store_file_and_fetch_count(ctx: &mut StateTestContext) {
async fn test_store_and_update_file(ctx: &mut StateTestContext) {
let path: FilePath = Faker.fake();
let meta = MetaDataFile {
created_at: Utc::now(),
Expand All @@ -347,34 +338,14 @@ mod tests {
.store(path.clone(), meta.clone())
.await
.unwrap();
let result = ctx.state.db.fetch_count(&path).await.unwrap().unwrap();
assert_eq!(result.created_at, meta.created_at);
assert_eq!(result.expiration_date, meta.expiration_date);
assert_eq!(result.secret, meta.secret);
assert_eq!(result.max_download, meta.max_download);
assert_eq!(result.count_downloads, meta.count_downloads);
}

#[test_context(StateTestContext)]
#[tokio::test]
async fn test_store_file_and_double_fetch_count(ctx: &mut StateTestContext) {
let path: FilePath = Faker.fake();
let meta = MetaDataFile {
created_at: Utc::now(),
expiration_date: Utc::now() + chrono::Duration::seconds(10),
secret: None,
delete_manually: true,
max_download: None,
count_downloads: 0,
};
let mut updated_meta = meta.clone();
updated_meta.count_downloads += 1;
ctx
.state
.db
.store(path.clone(), meta.clone())
.await
.update(&path, meta.clone(), updated_meta)
.unwrap();
ctx.state.db.fetch_count(&path).await.unwrap().unwrap();
let result = ctx.state.db.fetch_count(&path).await.unwrap().unwrap();
let result = ctx.state.db.fetch(&path).unwrap().unwrap();
assert_eq!(result.created_at, meta.created_at);
assert_eq!(result.expiration_date, meta.expiration_date);
assert_eq!(result.secret, meta.secret);
Expand Down
25 changes: 15 additions & 10 deletions api/src/service/file.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::configure::ApiConfig;
use crate::error::{ApiError, ApiResult, ToApiResult};
use crate::util::secret::{Secret, SecretHash};
use anyhow::anyhow;
Expand Down Expand Up @@ -119,22 +120,26 @@ pub async fn fetch(
file_name: &str,
secret: Option<Secret>,
) -> ApiResult<ServeFile> {
let path = FilePath {
let file_path = FilePath {
code: code.to_string(),
file_name: file_name.to_string(),
};
let meta = state.db.fetch_count(&path).await?.to_result()?;
if let Some(max) = meta.max_download {
if meta.count_downloads >= max {
state.db.delete(path.clone()).await?;
let meta_data = state.db.fetch(&file_path)?.to_result()?;
authorize_user(secret, &meta_data.secret)?;
if let Some(max) = meta_data.max_download {
if meta_data.count_downloads >= max {
state.db.delete(file_path.clone()).await?;
return Err(ApiError::NotFoundError(format!(
"{} not found",
path.url_path()
file_path.url_path()
)));
} else {
let mut updated_meta_data = meta_data.clone();
updated_meta_data.count_downloads += 1;
state.db.update(&file_path, meta_data, updated_meta_data)?;
}
}
authorize_user(secret, &meta.secret)?;
read_file(&state.config.fs.base_dir.join(&path.url_path())).await
Ok(read_file(&state.config, &file_path))
}

pub async fn delete(
Expand Down Expand Up @@ -164,8 +169,8 @@ pub async fn delete(
Ok(())
}

pub async fn read_file(file_path: &PathBuf) -> ApiResult<ServeFile> {
Ok(ServeFile::new(file_path))
pub fn read_file(config: &ApiConfig, file_path: &FilePath) -> ServeFile {
ServeFile::new(&file_path.fs_path(&config.fs.base_dir))
}

pub fn authorize_user(secret: Option<Secret>, secret_hash: &Option<SecretHash>) -> ApiResult<()> {
Expand Down
2 changes: 1 addition & 1 deletion api/tests/api/info_api_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ pub async fn test_get_info_when_file_not_exist(ctx: &mut ApiTestContext) {
#[tokio::test]
pub async fn test_get_info_when_file_exceed_max_dl(ctx: &mut ApiTestContext) {
let file = ctx.upload_dummy_file(Some(1), None, None, None, None).await;
let (status, _) = ctx.download(&file.path, None).await.unwrap();
let (status, _resp) = ctx.download(&file.path, None).await.unwrap();
assert!(status.is_success());
let (status, resp) = ctx.info(&file.path, None).await.unwrap();
assert_err!(resp, |e: &BodyResponseError| e.error_type == "NOT_FOUND");
Expand Down
15 changes: 1 addition & 14 deletions sdk/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use crate::{
};

use log_derive::logfn;
use multer::Multipart;
use once_cell::sync::Lazy;
use reqwest::StatusCode;

Expand Down Expand Up @@ -89,19 +88,7 @@ impl PasteFileClient {
let error = resp.json::<BodyResponseError>().await?;
return Ok((status, ApiResponseResult::Err(error)));
}
let body = resp.bytes().await;
let boundary = std::str::from_utf8(body.as_ref().unwrap());
// TODO FIXME multer::parse_boundary(ct);
let boundary = boundary.unwrap().to_string().lines().next().unwrap()[2..]
.trim()
.to_string();
let stream = futures_util::stream::once(async { body });
let mut mp = Multipart::new(stream, boundary);
let f = mp.next_field().await.unwrap().unwrap();
Ok((
status,
ApiResponseResult::Ok(f.bytes().await.unwrap().to_vec()),
))
Ok((status, ApiResponseResult::Ok(resp.bytes().await?.to_vec())))
}

#[logfn(Info)]
Expand Down

0 comments on commit 7c2ec62

Please sign in to comment.