Skip to content

Commit

Permalink
Merge pull request #121 from traP-jp/feat/#120-local-jobapi
Browse files Browse the repository at this point in the history
✨ job api for local environment
  • Loading branch information
comavius authored Feb 18, 2025
2 parents 5257fef + 022728a commit 458f1bf
Show file tree
Hide file tree
Showing 9 changed files with 251 additions and 24 deletions.
5 changes: 0 additions & 5 deletions lib/judge_core/src/common.rs

This file was deleted.

39 changes: 32 additions & 7 deletions lib/judge_core/src/job.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::common::ShellOutput;
use crate::identifiers::{ResourceId, RuntimeId};
use crate::identifiers::ResourceId;
use futures::future::Future;
use std::process::Output;
use tokio::sync::broadcast;

/// JobAPI is a set of shell environment and cache of outcome files of previous jobs.
Expand All @@ -16,20 +16,37 @@ pub trait JobApi<JobOutcome: Clone>: Clone {
&self,
job_conf: ExecutionJob<JobOutcome>,
priority: i32,
) -> impl Future<Output = impl Future<Output = Result<(JobOutcome, ShellOutput), ExecutionJobError>>>;
) -> impl Future<
Output = Result<
impl Future<Output = Result<ExecutionJobFinished<JobOutcome>, ExecutionJobError>>,
ExecutionJobPreparationError,
>,
>;

fn place_file(
&self,
job_conf: FilePlacementJob,
) -> impl Future<Output = Result<JobOutcome, FilePlacementJobError>>;
}

#[derive(Debug, Clone)]
pub enum ExecutionJobFinished<JobOutcome: Clone> {
/// Job finished successfully.
Succeeded(JobOutcome, Output),
/// Job failed expectedly.
FailedExpectedly((JobOutcome, Output)),
/// Preceding job failed expectedly.
PrecedingJobFailedExpectedly,
}

#[derive(Debug, Clone)]
pub enum JobOutcomeAcquisitionResult<JobOutcome: Clone> {
/// Received JobOutcome successfully.
Succeeded(JobOutcome),
/// Failed to receive JobOutcome.
Failed(String),
/// Failed to receive JobOutcome expectedly.
FailedExpectedly,
/// Failed to receive JobOutcome unexpectedly.
FailedUnexpectedly(String),
}

pub struct JobOutcomeLink<JobOutcome: Clone> {
Expand All @@ -42,10 +59,18 @@ pub struct ExecutionJob<JobOutcome: Clone> {
pub depends_on_with_names: Vec<JobOutcomeLink<JobOutcome>>,
}

#[derive(Debug, Clone, thiserror::Error)]
pub enum ExecutionJobPreparationError {
#[error("Internal error while preparing a job: {0}")]
InternalError(String),
}

#[derive(Debug, Clone, thiserror::Error)]
pub enum ExecutionJobError {
#[error("Internal error while running a job: {0}")]
InternalError(String),
#[error("Preceding job failed unexpectedly: {0}")]
PrecedingJobFailed(String),
}

pub enum FilePlacementJob {
Expand All @@ -58,8 +83,8 @@ pub enum FilePlacementJob {

#[derive(Debug, thiserror::Error)]
pub enum FilePlacementJobError {
#[error("Invalid file id: {0}")]
InvalidFileId(RuntimeId),
#[error("Invalid resource id: {0}")]
InvalidResourceId(ResourceId),
#[error("Internal error while placing a file: {0}")]
InternalError(String),
}
1 change: 0 additions & 1 deletion lib/judge_core/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
pub mod common;
pub mod identifiers;
pub mod job;
pub mod problem_registry;
Expand Down
13 changes: 12 additions & 1 deletion lib/judge_core/src/problem_registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,16 @@ pub trait ProblemRegistryServer {

/// ProblemRegistryClient fetches contents of problems from the registry in judge server.
pub trait ProblemRegistryClient {
fn fetch(&self, resource_id: ResourceId) -> impl Future<Output = Result<String>>;
fn fetch(
&self,
resource_id: ResourceId,
) -> impl Future<Output = Result<String, ResourceFetchError>>;
}

#[derive(Debug, Clone, thiserror::Error)]
pub enum ResourceFetchError {
#[error("Failed to fetch resource with error: {0}")]
FetchFailed(String),
#[error("Resource {0} not found")]
NotFound(ResourceId),
}
50 changes: 42 additions & 8 deletions lib/judge_core/src/runner.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
use crate::{
common::ShellOutput,
identifiers::RuntimeId,
job::{
self, ExecutionJob, FilePlacementJob, JobApi, JobOutcomeAcquisitionResult, JobOutcomeLink,
self, ExecutionJob, ExecutionJobFinished, FilePlacementJob, JobApi,
JobOutcomeAcquisitionResult, JobOutcomeLink,
},
procedure::runtime::Procedure,
};
use futures::{future::join_all, join, Future};
use std::collections::HashMap;
use std::process::Output;
use tokio::sync::broadcast;

pub struct Runner<JobOutcome: Clone, JobApiType: JobApi<JobOutcome>> {
Expand All @@ -16,7 +17,8 @@ pub struct Runner<JobOutcome: Clone, JobApiType: JobApi<JobOutcome>> {
}

pub enum ExecutionJobOutput {
Succeeded(ShellOutput),
Succeeded(Output),
FailedExpectedly(Output),
EarlyExit,
}

Expand Down Expand Up @@ -215,7 +217,11 @@ impl<JobOutcomeType: Clone, JobApiType: JobApi<JobOutcomeType>> Runner<JobOutcom
job_id
)))?;
// Job API run future
let run_future = self.job_api.run_future(job_conf, *priority).await;
let run_future = self
.job_api
.run_future(job_conf, *priority)
.await
.map_err(|e| RunnerRunError::InternalError(e.to_string()))?;
// Whole execution job future
let job_future = Self::run_execution_job(run_future, job_outcome_tx);
execution_job_futures.insert(job_id, job_future);
Expand Down Expand Up @@ -269,7 +275,9 @@ impl<JobOutcomeType: Clone, JobApiType: JobApi<JobOutcomeType>> Runner<JobOutcom
}
Err(e) => {
outcome_broadcast_tx
.send(JobOutcomeAcquisitionResult::Failed(e.to_string()))
.send(JobOutcomeAcquisitionResult::FailedUnexpectedly(
e.to_string(),
))
.map_err(|e| {
RunnerRunError::InternalError(format!(
"Error while sending a job outcome: {}",
Expand All @@ -282,12 +290,14 @@ impl<JobOutcomeType: Clone, JobApiType: JobApi<JobOutcomeType>> Runner<JobOutcom
}

async fn run_execution_job(
run_future: impl Future<Output = Result<(JobOutcomeType, ShellOutput), job::ExecutionJobError>>,
run_future: impl Future<
Output = Result<ExecutionJobFinished<JobOutcomeType>, job::ExecutionJobError>,
>,
outcome_broadcast_tx: broadcast::Sender<JobOutcomeAcquisitionResult<JobOutcomeType>>,
) -> Result<ExecutionJobOutput, RunnerRunError> {
let run_result = run_future.await;
match run_result {
Ok((job_outcome, shell_output)) => {
Ok(ExecutionJobFinished::Succeeded(job_outcome, shell_output)) => {
outcome_broadcast_tx
.send(JobOutcomeAcquisitionResult::Succeeded(job_outcome))
.map_err(|e| {
Expand All @@ -298,9 +308,33 @@ impl<JobOutcomeType: Clone, JobApiType: JobApi<JobOutcomeType>> Runner<JobOutcom
})?;
Ok(ExecutionJobOutput::Succeeded(shell_output))
}
Ok(ExecutionJobFinished::PrecedingJobFailedExpectedly) => {
outcome_broadcast_tx
.send(JobOutcomeAcquisitionResult::FailedExpectedly)
.map_err(|e| {
RunnerRunError::InternalError(format!(
"Error while sending a job outcome: {}",
e
))
})?;
Ok(ExecutionJobOutput::EarlyExit)
}
Ok(ExecutionJobFinished::FailedExpectedly((_, shell_output))) => {
outcome_broadcast_tx
.send(JobOutcomeAcquisitionResult::FailedExpectedly)
.map_err(|e| {
RunnerRunError::InternalError(format!(
"Error while sending a job outcome: {}",
e
))
})?;
Ok(ExecutionJobOutput::FailedExpectedly(shell_output))
}
Err(e) => {
outcome_broadcast_tx
.send(JobOutcomeAcquisitionResult::Failed(e.to_string()))
.send(JobOutcomeAcquisitionResult::FailedUnexpectedly(
e.to_string(),
))
.map_err(|e| {
RunnerRunError::InternalError(format!(
"Error while sending a job outcome: {}",
Expand Down
6 changes: 5 additions & 1 deletion lib/local_jobapi/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,8 @@ name = "local_jobapi"
version = "0.1.0"
edition = "2021"

[dependencies]
[dependencies]
futures = { workspace = true }
uuid = { workspace = true }
anyhow = { workspace = true }
judge_core = { path = "../judge_core" }
31 changes: 31 additions & 0 deletions lib/local_jobapi/src/job_outcome.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
use std::path::PathBuf;
use std::sync::Arc;

#[derive(Debug, Clone)]
pub struct JobOutcome {
path: Arc<JobOutcomeInner>,
}

#[derive(Debug)]
struct JobOutcomeInner {
pub path: PathBuf,
}

impl JobOutcome {
pub fn new(path: PathBuf) -> Self {
Self {
path: Arc::new(JobOutcomeInner { path }),
}
}

pub(crate) fn path(&self) -> &PathBuf {
&self.path.path
}
}

impl Drop for JobOutcomeInner {
fn drop(&mut self) {
let _ = std::fs::remove_file(&self.path);
let _ = std::fs::remove_dir_all(&self.path);
}
}
127 changes: 127 additions & 0 deletions lib/local_jobapi/src/jobapi.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
use super::job_outcome::JobOutcome;
use futures::Future;
use judge_core::*;
use std::path::PathBuf;
use uuid::Uuid;

#[derive(Debug, Clone)]
pub struct JobApi<ProblemRegistryClient: problem_registry::ProblemRegistryClient + Clone> {
temp_dir: PathBuf,
problem_registry_client: ProblemRegistryClient,
}

impl<ProblemRegistryClient: problem_registry::ProblemRegistryClient + Clone>
JobApi<ProblemRegistryClient>
{
pub fn new(
temp_dir: PathBuf,
problem_registry_client: ProblemRegistryClient,
) -> anyhow::Result<Self> {
std::fs::create_dir_all(&temp_dir).map_err(|e| anyhow::anyhow!(e.to_string()))?;
Ok(Self {
temp_dir,
problem_registry_client,
})
}

async fn run_future_internal(
job_conf: job::ExecutionJob<JobOutcome>,
src_dir: JobOutcome,
script_file: JobOutcome,
) -> Result<job::ExecutionJobFinished<JobOutcome>, job::ExecutionJobError> {
// set up environment variables
let mut envvars = std::collections::HashMap::new();
envvars.insert(
"SRC".to_string(),
src_dir.path().to_string_lossy().to_string(),
);
envvars.insert(
"SCRIPT".to_string(),
script_file.path().to_string_lossy().to_string(),
);
for mut dep in job_conf.depends_on_with_names {
let dep_outcome = match dep.job_outcome_rx.recv().await {
Ok(job::JobOutcomeAcquisitionResult::Succeeded(outcome)) => outcome,
Ok(job::JobOutcomeAcquisitionResult::FailedExpectedly) => {
return Ok(job::ExecutionJobFinished::PrecedingJobFailedExpectedly);
}
Ok(job::JobOutcomeAcquisitionResult::FailedUnexpectedly(err_message)) => {
return Err(job::ExecutionJobError::InternalError(err_message));
}
Err(e) => {
return Err(job::ExecutionJobError::InternalError(e.to_string()));
}
};
envvars.insert(
dep.envvar_name,
dep_outcome.path().to_string_lossy().to_string(),
);
}
let output = std::process::Command::new(&script_file.path())
.output()
.map_err(|e| job::ExecutionJobError::InternalError(e.to_string()))?;
Ok(job::ExecutionJobFinished::Succeeded(src_dir, output))
}
}

impl<ProblemRegistryClient: problem_registry::ProblemRegistryClient + Clone> job::JobApi<JobOutcome>
for JobApi<ProblemRegistryClient>
{
async fn place_file(
&self,
file: job::FilePlacementJob,
) -> Result<JobOutcome, job::FilePlacementJobError> {
let path = self.temp_dir.join(Uuid::new_v4().to_string());
match file {
job::FilePlacementJob::PlaceEmptyDirectory => {
std::fs::create_dir(&path)
.map_err(|e| job::FilePlacementJobError::InternalError(e.to_string()))?;
}
job::FilePlacementJob::PlaceRuntimeTextFile(content) => {
std::fs::write(&path, content)
.map_err(|e| job::FilePlacementJobError::InternalError(e.to_string()))?;
}
job::FilePlacementJob::PlaceTextFile(resource_id) => {
let content = self
.problem_registry_client
.fetch(resource_id)
.await
.map_err(|e| match e {
problem_registry::ResourceFetchError::FetchFailed(err_message) => {
job::FilePlacementJobError::InternalError(err_message)
}
problem_registry::ResourceFetchError::NotFound(resource_id) => {
job::FilePlacementJobError::InvalidResourceId(resource_id)
}
})?;
std::fs::write(&path, content)
.map_err(|e| job::FilePlacementJobError::InternalError(e.to_string()))?;
}
}
Ok(JobOutcome::new(path))
}

async fn run_future(
&self,
job_conf: job::ExecutionJob<JobOutcome>,
_: i32,
) -> Result<
impl Future<Output = Result<job::ExecutionJobFinished<JobOutcome>, job::ExecutionJobError>>,
job::ExecutionJobPreparationError,
> {
// prepare files
let src_outcome = self
.place_file(job::FilePlacementJob::PlaceEmptyDirectory)
.await
.map_err(|e| job::ExecutionJobPreparationError::InternalError(e.to_string()))?;
let script_outcome = self
.place_file(job::FilePlacementJob::PlaceRuntimeTextFile(
job_conf.script.clone(),
))
.await
.map_err(|e| job::ExecutionJobPreparationError::InternalError(e.to_string()))?;

let future = Self::run_future_internal(job_conf, src_outcome, script_outcome);
Ok(future)
}
}
3 changes: 2 additions & 1 deletion lib/local_jobapi/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@

pub mod job_outcome;
pub mod jobapi;

0 comments on commit 458f1bf

Please sign in to comment.