Skip to content

Commit

Permalink
chore: notify user file size (#1161)
Browse files Browse the repository at this point in the history
  • Loading branch information
appflowy authored Jan 14, 2025
1 parent 5a2f54b commit c5cdb06
Showing 1 changed file with 35 additions and 25 deletions.
60 changes: 35 additions & 25 deletions services/appflowy-worker/src/import_worker/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,23 @@ async fn consume_task(
Some(file_size) => {
if file_size > context.maximum_import_file_size as i64 {
let file_size_in_mb = file_size as f64 / 1_048_576.0;
let max_size_in_mb = context.maximum_import_file_size as f64 / 1_048_576.0;
let max_size_in_mb = (context.maximum_import_file_size as f64 / 1_048_576.0).ceil();
if let Ok(import_record) = select_import_task(&context.pg_pool, &task.task_id).await {
handle_failed_task(
&mut context,
&import_record,
task,
stream_name,
group_name,
&entry_id,
ImportError::UploadFileTooLarge {
file_size_in_mb,
max_size_in_mb,
},
ImportTaskState::Failed,
)
.await?;
}

return Err(ImportError::UploadFileTooLarge {
file_size_in_mb,
Expand All @@ -299,16 +315,18 @@ async fn consume_task(
}

// Check if the task is expired
if let Err(err) = is_task_expired(task.created_at.unwrap(), task.last_process_at) {
if let Err(reason) = is_task_expired(task.created_at.unwrap(), task.last_process_at) {
if let Ok(import_record) = select_import_task(&context.pg_pool, &task.task_id).await {
handle_expired_task(
error!("[Import] {} task is expired: {}", task.workspace_id, reason);
handle_failed_task(
&mut context,
&import_record,
task,
stream_name,
group_name,
&entry_id,
&err,
ImportError::UploadFileExpire,
ImportTaskState::Expire,
)
.await?;
}
Expand Down Expand Up @@ -342,30 +360,28 @@ async fn consume_task(
}
}

async fn handle_expired_task(
#[allow(clippy::too_many_arguments)]
async fn handle_failed_task(
context: &mut TaskContext,
import_record: &AFImportTask,
task: &NotionImportTask,
stream_name: &str,
group_name: &str,
entry_id: &str,
reason: &str,
error: ImportError,
task_state: ImportTaskState,
) -> Result<(), ImportError> {
info!(
"[Import]: {} import is expired with reason:{}",
task.workspace_id, reason
"[Import]: {} import was failed with reason:{}",
task.workspace_id, error
);

update_import_task_status(
&import_record.task_id,
ImportTaskState::Expire,
&context.pg_pool,
)
.await
.map_err(|e| {
error!("Failed to update import task status: {:?}", e);
ImportError::Internal(e.into())
})?;
update_import_task_status(&import_record.task_id, task_state, &context.pg_pool)
.await
.map_err(|e| {
error!("Failed to update import task status: {:?}", e);
ImportError::Internal(e.into())
})?;
remove_workspace(&import_record.workspace_id, &context.pg_pool).await;
info!("[Import]: deleted workspace {}", task.workspace_id);

Expand All @@ -382,13 +398,7 @@ async fn handle_expired_task(
task.workspace_id, err
);
}
notify_user(
task,
Err(ImportError::UploadFileExpire),
context.notifier.clone(),
&context.metrics,
)
.await?;
notify_user(task, Err(error), context.notifier.clone(), &context.metrics).await?;
Ok(())
}

Expand Down

0 comments on commit c5cdb06

Please sign in to comment.