diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..e425057 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,9 @@ +# OS +Thumbs.db +.DS_Store + +# Editors +.vs/ +.vscode/ +.idea/ +.fleet/ diff --git a/.github/dependabot.yaml b/.github/dependabot.yaml new file mode 100644 index 0000000..73d670d --- /dev/null +++ b/.github/dependabot.yaml @@ -0,0 +1,10 @@ +version: 2 +updates: + + - package-ecosystem: "cargo" + directory: "/" + schedule: + interval: "weekly" + timezone: "Europe/Warsaw" + day: "friday" + time: "18:00" diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml new file mode 100644 index 0000000..9aafe34 --- /dev/null +++ b/.github/workflows/build.yaml @@ -0,0 +1,47 @@ +name: ci & cd + +on: + push: + branches: + - "main" + tags: + - "v*.*.*" + pull_request: + +jobs: + build: + runs-on: ubuntu-22.04 + steps: + - name: Checkout repository + uses: actions/checkout@v4 + + - name: Set up Rust + uses: dtolnay/rust-toolchain@master + with: + toolchain: stable + + - name: Run Cargo:fmt + run: cargo +nightly fmt --all -- --check + + - name: Run Cargo:clippy + run: cargo clippy --all-features -- -D warnings + + - name: Run Cargo:test + run: cargo test --verbose --all-features + + publish: + runs-on: ubuntu-22.04 + if: github.event_name == 'push' + steps: + - name: Check out + uses: actions/checkout@v3 + + - name: Set up Rust + uses: dtolnay/rust-toolchain@master + with: + toolchain: stable + + - name: Publish + run: cargo publish --token ${CRATES_TOKEN} + env: + CRATES_TOKEN: ${{ secrets.CRATES_TOKEN }} diff --git a/.gitignore b/.gitignore index d01bd1a..d0768bf 100644 --- a/.gitignore +++ b/.gitignore @@ -1,21 +1,35 @@ -# Generated by Cargo -# will have compiled files and executables +# OS +Thumbs.db +.DS_Store +.ignore*/ + +# Editors +.vs/ +.vscode/ +.idea/ +.fleet/ + +# Lang: Rust debug/ target/ - -# Remove Cargo.lock from gitignore if creating an executable, leave it for libraries -# More information here https://doc.rust-lang.org/cargo/guide/cargo-toml-vs-cargo-lock.html Cargo.lock - -# These are backup files generated by rustfmt **/*.rs.bk - -# MSVC Windows builds of rustc generate these, which store debugging information *.pdb -# RustRover -# JetBrains specific template is maintained in a separate JetBrains.gitignore that can -# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore -# and can be added to the global gitignore or merged into this file. For a more nuclear -# option (not recommended) you can uncomment the following to ignore the entire idea folder. -#.idea/ \ No newline at end of file +# Output +dist/ +output/ +build/ + +# Environment +env/ +.env +.env* + +# Logs +logs/ +*.log +*.log* + +# Generated +/crates/schema/generated/ diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..746dbcb --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,64 @@ +# https://doc.rust-lang.org/cargo/reference/manifest.html + +[workspace] +resolver = "2" +members = [ + "./crates/cli", + "./crates/client", + "./crates/jsvm", + "./crates/schema", + "./crates/server", +] + +[workspace.package] +version = "0.1.0" +edition = "2021" +license = "Apache-2.0" +publish = true + +authors = ["Axiston "] +repository = "https://github.com/axiston/runtime" +homepage = "https://github.com/axiston/runtime" +documentation = "https://docs.rs/axiston" + +[workspace.dependencies] +axiston-rt-jsvm = { path = "./crates/jsvm", version = "0.1.0" } +axiston-rt-schema = { path = "./crates/schema", version = "0.1.0" } +axiston-rt-server = { path = "./crates/server", version = "0.1.0" } + +clap = { version = "4.5", features = ["derive"] } +tokio = { version = "1.43", features = ["macros", "rt-multi-thread", "signal"] } +deadpool = { version = "0.12", features = ["managed", "rt_tokio_1"] } +tokio-stream = { version = "0.1", features = [] } +futures = { version = "0.3", features = [] } +http = { version = "1.2", features = [] } +pin-project-lite = { version = "0.2", features = [] } +tracing = { version = "0.1", features = [] } +tracing-subscriber = { version = "0.3", features = ["env-filter"] } +tower = { version = "0.5", features = ["full"] } +tower-http = { version = "0.6", features = ["full"] } +thiserror = { version = "2.0", features = [] } +anyhow = { version = "1.0", features = ["backtrace"] } + +derive_more = { version = "2.0", features = ["full"] } +serde = { version = "1.0", features = ["derive"] } +serde_with = { version = "3.12", features = [] } +serde_toml = { package = "toml", version = "0.8", features = [] } +serde_json = { version = "1.0", features = [] } + +ecow = { version = "0.2", features = ["serde"] } +time = { version = "0.3", features = ["serde"] } +uuid = { version = "1.12", features = ["serde", "v4", "v7"] } +bytes = { version = "1.9", features = ["serde"] } +hashbrown = { version = "0.15", features = ["serde"] } +petgraph = { version = "0.7", features = [] } +cron = { version = "0.15", features = ["serde"] } +semver = { version = "1.0", features = ["serde"] } +jsonschema = { version = "0.28", features = [] } + +tonic = { version = "0.12", features = [] } +prost = { version = "0.13", features = [] } +tonic-types = { version = "0.12", features = [] } +prost-types = { version = "0.13", features = [] } +tonic-build = { version = "0.12", features = [] } +prost-build = { version = "0.13", features = [] } diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..c7b1d2b --- /dev/null +++ b/Dockerfile @@ -0,0 +1,47 @@ +# Stage 1: Build. +FROM rust:1.84 AS build +WORKDIR /usr/src/app/ + +RUN apt-get update +RUN apt-get install -y curl build-essential + +# Cache dependencies by copying only Cargo.* files. +COPY Cargo.toml Cargo.lock? ./ +COPY crates/cli/Cargo.toml crates/cli/ +COPY crates/jsvm/Cargo.toml crates/jsvm/ +COPY crates/schema/Cargo.toml crates/schema/ +COPY crates/client/Cargo.toml crates/client/ +COPY crates/server/Cargo.toml crates/server/ + +# Create a dummy files to allow dependency resolution. +RUN mkdir -p crates/cli crates/jsvm crates/schema +RUN mkdir -p crates/client crates/server +RUN echo "fn main() {}" > crates/cli/main.rs +RUN echo "" > crates/cli/lib.rs +RUN echo "" > crates/jsvm/lib.rs +RUN echo "" > crates/schema/lib.rs +RUN echo "" > crates/client/lib.rs +RUN echo "" > crates/server/lib.rs + +# Pre-build dependencies to cache them. +RUN cargo build --release --workspace + +# Copy the source code and build the final binaries. +COPY crates ./crates +RUN cargo build --release --workspace + +# Stage 2: Runtime. +FROM debian:bookworm-slim AS runtime +WORKDIR /usr/src/bin/ + +# Copy the built binary from the previous stage. +COPY --from=build /usr/src/app/target/release/cli /usr/src/bin/cli + +# Ensure the binary is an executable. +RUN chmod +x /usr/src/bin/cli + +# Expose the port the server runs on. +EXPOSE 8080 + +# Set the default command to run the server. +CMD ["/usr/src/bin/cli", "--port", "8080"] diff --git a/LICENSE b/LICENSE.txt similarity index 97% rename from LICENSE rename to LICENSE.txt index c0cdc68..568f639 100644 --- a/LICENSE +++ b/LICENSE.txt @@ -1,6 +1,6 @@ MIT License -Copyright (c) 2024 axiston +Copyright (c) 2024 Axiston Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..019e95c --- /dev/null +++ b/Makefile @@ -0,0 +1,11 @@ +# Makefile for client & server GRPC Generation. +# https://github.com/hyperium/tonic + +# Environment Variables +SCHEMA_OUTPUT = ./crates/schema/generated/ + +.PHONY: clean +clean: ## Deletes the output directory. + $(call print-info, "Cleaning project...") + rm -f $(SCHEMA_OUTPUT) + $(call print-success, "Project cleaned.") diff --git a/README.md b/README.md index 6a54eba..0735eea 100644 --- a/README.md +++ b/README.md @@ -1 +1,24 @@ -# runtime \ No newline at end of file +### axiston/runtime + +[![Build Status][action-badge]][action-url] +[![Crate Coverage][coverage-badge]][coverage-url] + +[action-badge]: https://img.shields.io/github/actions/workflow/status/axiston/runtime/build.yaml +[action-url]: https://github.com/axiston/runtime/actions/workflows/build.yaml +[coverage-badge]: https://img.shields.io/codecov/c/github/axiston/runtime +[coverage-url]: https://app.codecov.io/gh/axiston/runtime + +A server application based on `Deno` runtime, capable of running `JavaScript`, +`TypeScript`, and native `Rust` tasks. + +#### Notes + +- Lorem Ipsum. +- Lorem Ipsum. +- Lorem Ipsum. + +#### Usage + +```cmd +runtime --port 8080 +``` diff --git a/crates/cli/Cargo.toml b/crates/cli/Cargo.toml new file mode 100644 index 0000000..1074484 --- /dev/null +++ b/crates/cli/Cargo.toml @@ -0,0 +1,47 @@ +# https://doc.rust-lang.org/cargo/reference/manifest.html + +[package] +name = "axiston-rt-cli" +version = { workspace = true } +edition = { workspace = true } +license = { workspace = true } +publish = { workspace = true } +readme = "./README.md" + +authors = { workspace = true } +repository = { workspace = true } +homepage = { workspace = true } +documentation = { workspace = true } + +[package.metadata.docs.rs] +all-features = true +rustdoc-args = ["--cfg", "docsrs"] + +[[bin]] +name = "axiston" +path = "main.rs" + +[features] +default = [] + +# - Enables the global tracer provider. +support-otel = [] + +[dependencies] +axiston-rt-schema = { workspace = true, features = ["server"] } +axiston-rt-server = { workspace = true, features = [] } + +clap = { workspace = true } +tokio = { workspace = true } +tracing = { workspace = true } +tracing-subscriber = { workspace = true } +anyhow = { workspace = true } + +tonic = { workspace = true } +prost = { workspace = true } +tonic-types = { workspace = true } +prost-types = { workspace = true } + +serde = { workspace = true } +serde_toml = { workspace = true } +serde_json = { workspace = true } diff --git a/crates/cli/README.md b/crates/cli/README.md new file mode 100644 index 0000000..da1673b --- /dev/null +++ b/crates/cli/README.md @@ -0,0 +1,20 @@ +### runtime/client + +[![Build Status][action-badge]][action-url] +[![Crate Docs][docs-badge]][docs-url] +[![Crate Version][crates-badge]][crates-url] + +[action-badge]: https://img.shields.io/github/actions/workflow/status/axiston/runtime/build.yaml?branch=main&label=build&logo=github&style=flat-square +[action-url]: https://github.com/axiston/runtime/actions/workflows/build.yaml +[crates-badge]: https://img.shields.io/crates/v/axiston-rt-client.svg?logo=rust&style=flat-square +[crates-url]: https://crates.io/crates/axiston-rt-client +[docs-badge]: https://img.shields.io/docsrs/axiston-rt-client?logo=Docs.rs&style=flat-square +[docs-url]: http://docs.rs/axiston-rt-client + +Lorem Ipsum. Lorem Ipsum. Lorem Ipsum. + +#### Notes + +- Lorem Ipsum. +- Lorem Ipsum. +- Lorem Ipsum. diff --git a/crates/cli/config/from_json.rs b/crates/cli/config/from_json.rs new file mode 100644 index 0000000..4203f50 --- /dev/null +++ b/crates/cli/config/from_json.rs @@ -0,0 +1,14 @@ +use std::path::Path; + +use crate::config::Args; + +/// - Reads the entire contents of a file and deserializes an instance of [`Args`]. +/// +/// # Errors +/// +/// - See [std::fs::read] and [`serde_json::from_slice`] documentation for details. +pub fn load_json(path: impl AsRef) -> anyhow::Result { + let file_content = std::fs::read(path)?; + let parsed_args = serde_json::from_slice(&file_content)?; + Ok(parsed_args) +} diff --git a/crates/cli/config/from_toml.rs b/crates/cli/config/from_toml.rs new file mode 100644 index 0000000..5ee27f1 --- /dev/null +++ b/crates/cli/config/from_toml.rs @@ -0,0 +1,14 @@ +use std::path::Path; + +use crate::config::Args; + +/// - Reads the entire contents of a file and deserializes an instance of [`Args`]. +/// +/// # Errors +/// +/// - See [std::fs::read_to_string] and [`serde_toml::from_str`] documentation for details. +pub fn load_toml(path: impl AsRef) -> anyhow::Result { + let file_content = std::fs::read_to_string(path)?; + let parsed_args = serde_toml::from_str(&file_content)?; + Ok(parsed_args) +} diff --git a/crates/cli/config/mod.rs b/crates/cli/config/mod.rs new file mode 100644 index 0000000..e7e9e84 --- /dev/null +++ b/crates/cli/config/mod.rs @@ -0,0 +1,53 @@ +//! Loads and parses configuration files. + +mod from_json; +mod from_toml; + +use std::ffi::OsStr; +use std::path::PathBuf; + +use axiston_rt_server::service::ServiceConfig; +use clap::Parser; +use serde::{Deserialize, Serialize}; + +use crate::config::from_json::load_json; +use crate::config::from_toml::load_toml; +use crate::server::ServerConfig; + +/// Command-line arguments. +#[derive(Debug, Clone, Default, Serialize, Deserialize, Parser)] +#[must_use = "config does nothing unless you use it"] +pub struct Args { + #[command(flatten)] + pub server: ServerConfig, + #[command(flatten)] + pub service: ServiceConfig, +} + +/// Commands for the CLI. +#[derive(Debug, Clone, Parser)] +#[must_use = "configs do nothing unless you use them"] +pub struct Cli { + /// Provide configuration via command-line flags. + #[command(flatten)] + pub args: Args, + + /// Provide configuration via a configuration file. + #[arg(short, long, value_name = "FILE")] + pub config: Option, +} + +impl Args { + /// Parses the provided configuration via command-line flags or a configuration file. + pub fn try_parse_with_files() -> anyhow::Result { + let cli = Cli::parse(); + match cli.config { + None => Ok(cli.args), + Some(path) => match path.extension() { + Some(ext) if OsStr::new("toml") == ext => load_toml(path), + Some(ext) if OsStr::new("json") == ext => load_json(path), + _ => Err(anyhow::anyhow!("should specify a supported file extension")), + }, + } + } +} diff --git a/crates/cli/main.rs b/crates/cli/main.rs new file mode 100644 index 0000000..4a3e96f --- /dev/null +++ b/crates/cli/main.rs @@ -0,0 +1,32 @@ +#![forbid(unsafe_code)] + +use axiston_rt_server::handler::{InstanceService, RegistryService}; +use axiston_rt_server::service::{RouterExt, ServiceState}; + +use crate::config::Args; +use crate::middleware::initialize_tracing; +use crate::server::run_supported_server; + +mod config; +mod middleware; +mod server; + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + let args = Args::try_parse_with_files()?; + initialize_tracing().await?; + + // Service. + let state = ServiceState::new(args.service); + + let instance = InstanceService::new(state.clone()); + let instance = instance.into_server(); + + let registry = RegistryService::new(state); + let registry = registry.into_server(); + + // Listen. + run_supported_server(args.server, instance, registry).await?; + + Ok(()) +} diff --git a/crates/cli/middleware/mod.rs b/crates/cli/middleware/mod.rs new file mode 100644 index 0000000..84c34a1 --- /dev/null +++ b/crates/cli/middleware/mod.rs @@ -0,0 +1,40 @@ +#[must_use] +fn build_env_filter() -> tracing_subscriber::EnvFilter { + let current = std::env::var("RUST_LOG") + .or_else(|_| std::env::var("OTEL_LOG_LEVEL")) + .unwrap_or_else(|_| "info".to_string()); + + let env = format!("{},server=trace,otel=debug,tower_http=debug", current); + std::env::set_var("RUST_LOG", env); + tracing_subscriber::EnvFilter::from_default_env() +} + +pub async fn initialize_tracing() -> anyhow::Result<()> { + use tracing_subscriber::fmt::layer; + use tracing_subscriber::layer::SubscriberExt; + use tracing_subscriber::util::SubscriberInitExt; + + // Setups a temporary subscriber to log output during setup. + let env_filter = build_env_filter(); + let fmt_layer = layer().pretty(); + let subscriber = tracing_subscriber::registry() + .with(env_filter) + .with(fmt_layer); + + let _guard = tracing::subscriber::set_default(subscriber); + tracing::trace!(target: "server:otel", "initialized temporary subscriber"); + + // TODO: Enable OpenTelemetry. + // https://github.com/davidB/tracing-opentelemetry-instrumentation-sdk + + // Setups an actual subscriber. + let env_filter = build_env_filter(); + let fmt_layer = layer().pretty(); + tracing_subscriber::registry() + .with(env_filter) + .with(fmt_layer) + .init(); + + tracing::trace!(target: "server:otel", "initialized subscriber"); + Ok(()) +} diff --git a/crates/cli/server/config.rs b/crates/cli/server/config.rs new file mode 100644 index 0000000..2e00deb --- /dev/null +++ b/crates/cli/server/config.rs @@ -0,0 +1,34 @@ +use clap::Args; +use serde::{Deserialize, Serialize}; + +/// App [`server`] configuration. +/// +/// [`server`]: crate::server +#[derive(Debug, Clone, Serialize, Deserialize, Args)] +#[must_use = "config does nothing unless you use it"] +pub struct ServerConfig { + /// Port exposed by the server. + #[arg(short, long, default_value_t = 3000)] + pub port: u16, + + /// Server shutdown timeout (in seconds). + #[arg(short, long, default_value_t = 8)] + pub shutdown_timeout: u64, +} + +impl ServerConfig { + /// Returns a new [`ServerConfig`]. + #[inline] + pub fn new() -> Self { + Self::default() + } +} + +impl Default for ServerConfig { + fn default() -> Self { + Self { + port: 3000, + shutdown_timeout: 8, + } + } +} diff --git a/crates/cli/server/mod.rs b/crates/cli/server/mod.rs new file mode 100644 index 0000000..463b66c --- /dev/null +++ b/crates/cli/server/mod.rs @@ -0,0 +1,67 @@ +//! Contains a GRPC server and its utilities. + +mod config; +mod signal; + +use std::net::{Ipv4Addr, SocketAddr}; +use std::time::Duration; + +use axiston_rt_schema::instance::instance_server::{Instance, InstanceServer}; +use axiston_rt_schema::registry::registry_server::{Registry, RegistryServer}; +use tonic::transport::Server; + +pub use crate::server::config::ServerConfig; +use crate::server::signal::shutdown_signal; + +/// Runs the supported server. +pub async fn run_supported_server( + server_config: ServerConfig, + instance_server: InstanceServer, + registry_server: RegistryServer, +) -> anyhow::Result<()> +where + T1: Instance, + T2: Registry, +{ + let shutdown_timeout = Duration::from_secs(server_config.shutdown_timeout); + let fut = shutdown_signal(shutdown_timeout); + + let server_addr = SocketAddr::from((Ipv4Addr::LOCALHOST, server_config.port)); + + tracing::debug!( + target: "server:setup", port = server_config.port, + "runtime server is listening on {}", server_addr, + ); + + Server::builder() + .add_service(instance_server) + .add_service(registry_server) + .serve_with_shutdown(server_addr, fut) + .await?; + + Ok(()) +} + +#[cfg(test)] +mod test { + use axiston_rt_server::handler::{InstanceService, RegistryService, Result}; + use axiston_rt_server::service::ServiceState; + + use crate::config::Args; + use crate::server::run_supported_server; + + #[test] + fn run_server() -> Result<()> { + let args = Args::default(); + let state = ServiceState::new(args.service); + + let instance = InstanceService::new(state.clone()); + let instance = instance.into_server(); + + let registry = RegistryService::new(state); + let registry = registry.into_server(); + + let _ = run_supported_server(args.server, instance, registry); + Ok(()) + } +} diff --git a/crates/cli/server/signal.rs b/crates/cli/server/signal.rs new file mode 100644 index 0000000..b03ecdf --- /dev/null +++ b/crates/cli/server/signal.rs @@ -0,0 +1,71 @@ +use std::time::{Duration, Instant}; + +use tokio::signal::ctrl_c; +#[cfg(unix)] +use tokio::signal::unix; + +/// Completes once the terminate signal is received. +/// +/// See [`ctrl_c`] and [`unix::SignalKind::terminate`]. +pub async fn shutdown_signal(timeout: Duration) { + let ctrl_c = async { + ctrl_c().await.expect("failed to install Ctrl+C handler"); + }; + + #[cfg(unix)] + let terminate = async { + unix::signal(unix::SignalKind::terminate()) + .expect("failed to install signal handler") + .recv() + .await; + }; + + #[cfg(not(unix))] + let terminate = std::future::pending::<()>(); + + tokio::select! { + _ = ctrl_c => {}, + _ = terminate => {}, + } + + let t0 = Instant::now(); + + tracing::warn!( + target: "server:otel", timeout = timeout.as_millis(), + "global tracer provider is closing" + ); + + #[cfg(feature = "support-otel")] + let (tx, rx) = std::sync::mpsc::channel(); + #[cfg(feature = "support-otel")] + let _ = std::thread::spawn(move || { + // TODO: Setup opentelemetry. + // opentelemetry::global::shutdown_tracer_provider(); + tx.send(()).ok() + }); + + #[cfg(feature = "support-otel")] + if rx.recv_timeout(timeout).is_err() { + tracing::error!(target: "server:otel", timeout = timeout.as_millis(), + "global tracer provider failed to close" + ); + } + + let t1 = Instant::now().duration_since(t0); + tracing::warn!( + target: "server", timeout = timeout.as_millis(), + waiting = t1.as_millis(), "server is terminating" + ); +} + +#[cfg(test)] +mod test { + use std::time::Duration; + + use crate::server::signal::shutdown_signal; + + #[test] + fn create_shutdown_signal() { + let _ = shutdown_signal(Duration::default()); + } +} diff --git a/crates/client/Cargo.toml b/crates/client/Cargo.toml new file mode 100644 index 0000000..2075363 --- /dev/null +++ b/crates/client/Cargo.toml @@ -0,0 +1,41 @@ +# https://doc.rust-lang.org/cargo/reference/manifest.html + +[package] +name = "axiston-rt-client" +version = { workspace = true } +edition = { workspace = true } +license = { workspace = true } +publish = { workspace = true } +readme = "./README.md" + +authors = { workspace = true } +repository = { workspace = true } +homepage = { workspace = true } +documentation = { workspace = true } + +[package.metadata.docs.rs] +all-features = true +rustdoc-args = ["--cfg", "docsrs"] + +[lib] +path = "lib.rs" + +[dependencies] +axiston-rt-schema = { workspace = true, features = ["client"] } + +tokio = { workspace = true } +deadpool = { workspace = true } +thiserror = { workspace = true } +tracing = { workspace = true } + +tonic = { workspace = true } +prost = { workspace = true } +tonic-types = { workspace = true } +prost-types = { workspace = true } + +tower = { workspace = true } +http = { workspace = true } +derive_more = { workspace = true } +serde = { workspace = true } +uuid = { workspace = true } +time = { workspace = true } diff --git a/crates/client/README.md b/crates/client/README.md new file mode 100644 index 0000000..da1673b --- /dev/null +++ b/crates/client/README.md @@ -0,0 +1,20 @@ +### runtime/client + +[![Build Status][action-badge]][action-url] +[![Crate Docs][docs-badge]][docs-url] +[![Crate Version][crates-badge]][crates-url] + +[action-badge]: https://img.shields.io/github/actions/workflow/status/axiston/runtime/build.yaml?branch=main&label=build&logo=github&style=flat-square +[action-url]: https://github.com/axiston/runtime/actions/workflows/build.yaml +[crates-badge]: https://img.shields.io/crates/v/axiston-rt-client.svg?logo=rust&style=flat-square +[crates-url]: https://crates.io/crates/axiston-rt-client +[docs-badge]: https://img.shields.io/docsrs/axiston-rt-client?logo=Docs.rs&style=flat-square +[docs-url]: http://docs.rs/axiston-rt-client + +Lorem Ipsum. Lorem Ipsum. Lorem Ipsum. + +#### Notes + +- Lorem Ipsum. +- Lorem Ipsum. +- Lorem Ipsum. diff --git a/crates/client/config/custom_hooks.rs b/crates/client/config/custom_hooks.rs new file mode 100644 index 0000000..c3b1c97 --- /dev/null +++ b/crates/client/config/custom_hooks.rs @@ -0,0 +1,39 @@ +use deadpool::managed::{HookResult, Metrics}; + +use crate::manager::{RuntimeClient, RuntimePoolError}; + +/// Custom hook called after a new connection has been established. +/// +/// See [`PoolBuilder`] for more details. +/// +/// [`PoolBuilder`]: deadpool::managed::PoolBuilder +pub fn post_create(_conn: &mut RuntimeClient, _metrics: &Metrics) -> HookResult { + tracing::trace!(target: "runtime", "post_create"); + + // Note: should never return an error. + Ok(()) +} + +/// Custom hook called before a connection has been recycled. +/// +/// See [`PoolBuilder`] for more details. +/// +/// [`PoolBuilder`]: deadpool::managed::PoolBuilder +pub fn pre_recycle(_conn: &mut RuntimeClient, _metrics: &Metrics) -> HookResult { + tracing::trace!(target: "runtime", "pre_recycle"); + + // Note: should never return an error. + Ok(()) +} + +/// Custom hook called after a connection has been recycled. +/// +/// See [`PoolBuilder`] for more details. +/// +/// [`PoolBuilder`]: deadpool::managed::PoolBuilder +pub fn post_recycle(_conn: &mut RuntimeClient, _metrics: &Metrics) -> HookResult { + tracing::trace!(target: "runtime", "post_recycle"); + + // Note: should never return an error. + Ok(()) +} diff --git a/crates/client/config/mod.rs b/crates/client/config/mod.rs new file mode 100644 index 0000000..7868fa8 --- /dev/null +++ b/crates/client/config/mod.rs @@ -0,0 +1,83 @@ +use std::fmt; + +use deadpool::managed::{Hook, Object, Pool}; +use tonic::transport::Endpoint; + +use crate::config::custom_hooks::{post_create, post_recycle, pre_recycle}; +pub use crate::config::pool_config::RuntimeConfig; +use crate::manager::{RuntimeManager, RuntimeManagerConfig}; +use crate::RuntimeResult; + +mod custom_hooks; +mod pool_config; + +/// Asynchronous `runtime` connection pool. +/// +/// - Implemented with [`tonic`] and [`deadpool`]. +/// - Includes predefined create/recycle hooks. +/// - Emits traces on lifecycle events. +/// - Uses [`RuntimeConfig`] for configuration. +#[derive(Clone)] +pub struct Runtime { + conn: Pool, +} + +impl Runtime { + /// Returns a new [`Runtime`]. + pub fn new(endpoints: impl Iterator, config: RuntimeConfig) -> Self { + let manager_config = RuntimeManagerConfig::new().recycling_method(config.recycling_method); + + let manager = RuntimeManager::new(endpoints, manager_config); + let pool = Pool::builder(manager) + .max_size(config.max_conn.unwrap_or(64)) + .create_timeout(config.create_timeout) + .wait_timeout(config.wait_timeout) + .recycle_timeout(config.recycle_timeout) + .post_create(Hook::sync_fn(post_create)) + .pre_recycle(Hook::sync_fn(pre_recycle)) + .post_recycle(Hook::sync_fn(post_recycle)) + .runtime(deadpool::Runtime::Tokio1); + + let pool = pool.build().expect("should not require runtime"); + Self { conn: pool } + } + + /// Retrieves a connection from this pool or waits for one to become available. + pub async fn get_connection(&self) -> RuntimeResult> { + self.conn.get().await.map_err(Into::into) + } +} + +impl Default for Runtime { + fn default() -> Self { + let endpoints = Vec::new().into_iter(); + Self::new(endpoints, RuntimeConfig::new()) + } +} + +impl fmt::Debug for Runtime { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let status = self.conn.status(); + let is_closed = self.conn.is_closed(); + f.debug_struct("Runtime") + .field("size", &status.size) + .field("max_size", &status.max_size) + .field("available", &status.available) + .field("waiting", &status.waiting) + .field("is_closed", &is_closed) + .finish() + } +} + +#[cfg(test)] +mod test { + use crate::{Runtime, RuntimeConfig, RuntimeResult}; + + #[test] + fn build_default_runtime() -> RuntimeResult<()> { + let endpoints = Vec::new().into_iter(); + let config = RuntimeConfig::new(); + let _runtime = Runtime::new(endpoints, config); + Ok(()) + } +} diff --git a/crates/client/config/pool_config.rs b/crates/client/config/pool_config.rs new file mode 100644 index 0000000..6d9df0a --- /dev/null +++ b/crates/client/config/pool_config.rs @@ -0,0 +1,67 @@ +use std::time::Duration; + +use serde::{Deserialize, Serialize}; + +use crate::manager::RecyclingMethod; + +/// Configures [`Runtime`] for one or more runtimes. +/// +/// [`Runtime`]: crate::Runtime +#[derive(Debug, Default, Serialize, Deserialize)] +#[must_use = "configs do nothing unless you use them"] +pub struct RuntimeConfig { + pub(crate) max_conn: Option, + pub(crate) create_timeout: Option, + pub(crate) wait_timeout: Option, + pub(crate) recycle_timeout: Option, + pub(crate) recycling_method: RecyclingMethod, +} + +impl RuntimeConfig { + /// Creates a new [`RuntimeConfig`]. + #[inline] + pub fn new() -> Self { + Self::default() + } + + /// Overwrites the default value of [`RuntimeConfig`]`::max_conn`. + pub fn with_max_conn(mut self, max_conn: usize) -> Self { + self.max_conn = Some(max_conn); + self + } + + /// Overwrites the default value of [`RuntimeConfig`]`::create_timeout`. + pub fn with_create_timeout(mut self, create_timeout: Duration) -> Self { + self.create_timeout = Some(create_timeout); + self + } + + /// Overwrites the default value of [`RuntimeConfig`]`::wait_timeout`. + pub fn with_wait_timeout(mut self, wait_timeout: Duration) -> Self { + self.wait_timeout = Some(wait_timeout); + self + } + + /// Overwrites the default value of [`RuntimeConfig`]`::recycle_timeout`. + pub fn with_recycle_timeout(mut self, recycle_timeout: Duration) -> Self { + self.recycle_timeout = Some(recycle_timeout); + self + } + + /// Overrides the value of [`RuntimeConfig`]`::recycling_method`. + pub fn with_recycling_method(mut self, recycling_method: RecyclingMethod) -> Self { + self.recycling_method = recycling_method; + self + } +} + +#[cfg(test)] +mod test { + use crate::{RuntimeConfig, RuntimeResult}; + + #[test] + fn build_default_settings() -> RuntimeResult<()> { + let _config = RuntimeConfig::new(); + Ok(()) + } +} diff --git a/crates/client/lib.rs b/crates/client/lib.rs new file mode 100644 index 0000000..bd6b5b4 --- /dev/null +++ b/crates/client/lib.rs @@ -0,0 +1,88 @@ +#![forbid(unsafe_code)] +#![cfg_attr(docsrs, feature(doc_cfg))] +#![doc = include_str!("./README.md")] + +//! ### Examples +//! +//! ```rust +//! use axiston_rt_client::{Runtime, RuntimeResult, RuntimeConfig}; +//! +//! +//! #[tokio::main] +//! async fn main() -> RuntimeResult<()> { +//! let addr = "https://example.com/"; +//! let endpoint = RuntimeEndpoint::from_bytes(addr.into())?; +//! +//! let config = RuntimeConfig::new(); +//! let runtime = Runtime::new(config); +//! runtime.register_endpoint(endpoint).await?; +//! let _conn = runtime.get_connection().await?; +//! +//! Ok(()) +//! } +//! ``` + +use deadpool::managed::PoolError; + +pub use crate::config::{Runtime, RuntimeConfig}; +pub use crate::manager::RecyclingMethod; +use crate::manager::RuntimePoolError; + +mod config; +mod manager; +mod middleware; + +/// Unrecoverable failure of the [`Runtime`]. +/// +/// Includes all error types that may occur. +#[non_exhaustive] +#[derive(Debug, thiserror::Error)] +#[must_use = "errors do nothing unless you use them"] +pub enum RuntimeError { + /// [`deadpool::managed::PoolError::Timeout`]. + #[error("timeout happened")] + Timout(deadpool::managed::TimeoutType), + + /// Runtime: Connection pool has no endpoints. + #[error("runtime: connection pool has no endpoints")] + NoEndpoints, + + ///[`tonic::transport::Error`]. + #[error("runtime: transport failure: {0}")] + Transport(#[from] tonic::transport::Error), + + /// GRPC server failure. + #[error("transport failure: {0}")] + Status(#[from] tonic::Status), + + // TODO: Do i even need EndpointsLimit + /// Runtime: All endpoints have reached the limit. + #[error("runtime: all endpoints have reached the limit")] + EndpointsLimit, +} + +impl From for RuntimeError { + fn from(runtime_connection_error: RuntimePoolError) -> Self { + match runtime_connection_error { + RuntimePoolError::Transport(transport_failure) => Self::Transport(transport_failure), + RuntimePoolError::Status(server_status) => Self::Status(server_status), + RuntimePoolError::EndpointsLimit => Self::EndpointsLimit, + RuntimePoolError::NoEndpoints => Self::NoEndpoints, + } + } +} + +impl From> for RuntimeError { + fn from(value: PoolError) -> Self { + match value { + PoolError::Timeout(timeout_type) => Self::Timout(timeout_type), + PoolError::Backend(backend_error) => backend_error.into(), + PoolError::Closed => unreachable!(), + PoolError::NoRuntimeSpecified => unreachable!(), + PoolError::PostCreateHook(_) => unreachable!(), + } + } +} + +/// Specialized [`Result`] alias for the [`RuntimeError`] type. +pub type RuntimeResult = Result; diff --git a/crates/client/manager/client.rs b/crates/client/manager/client.rs new file mode 100644 index 0000000..58e9d4a --- /dev/null +++ b/crates/client/manager/client.rs @@ -0,0 +1,65 @@ +use std::fmt; + +use axiston_rt_schema::instance::instance_client::InstanceClient; +use axiston_rt_schema::registry::registry_client::RegistryClient; +use tonic::transport::Endpoint; +use uuid::Uuid; + +use crate::manager::RuntimePoolResult; +use crate::middleware::RuntimeChannel; + +/// Represents a client for interacting with runtime services. +/// +/// The `RuntimeClient` is responsible for managing communication with instance +/// and registry services, identified by a unique endpoint ID. It wraps generated +/// gRPC clients for both instance and registry operations, providing a cohesive +/// interface for runtime service interactions. +#[derive(Clone)] +pub struct RuntimeClient { + pub(crate) endpoint_id: Uuid, + pub(crate) instance_client: InstanceClient, + pub(crate) registry_client: RegistryClient, +} + +impl RuntimeClient { + /// Returns a new [`RuntimeClient`]. + #[inline] + pub fn new(id: Uuid, channel: RuntimeChannel) -> Self { + Self { + endpoint_id: id, + instance_client: InstanceClient::new(channel.clone()), + registry_client: RegistryClient::new(channel), + } + } + + /// Returns a new [`RuntimeClient`]. + pub async fn connect(id: Uuid, endpoint: Endpoint) -> RuntimePoolResult { + let channel = endpoint.connect().await?; + let channel = RuntimeChannel::new(channel); + Ok(Self::new(id, channel)) + } + + /// Returns the reference to the underlying unique endpoint identifier. + #[inline] + pub(crate) fn as_endpoint_id(&mut self) -> &Uuid { + &mut self.endpoint_id + } + + /// Returns the reference to the underlying (generated) instance client. + #[inline] + pub(crate) fn as_instance_client(&mut self) -> &mut InstanceClient { + &mut self.instance_client + } + + /// Returns the reference to the underlying (generated) registry client. + #[inline] + pub(crate) fn as_registry_client(&self) -> &RegistryClient { + &self.registry_client + } +} + +impl fmt::Debug for RuntimeClient { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("RuntimeClient").finish_non_exhaustive() + } +} diff --git a/crates/client/manager/config.rs b/crates/client/manager/config.rs new file mode 100644 index 0000000..fe4f342 --- /dev/null +++ b/crates/client/manager/config.rs @@ -0,0 +1,60 @@ +use serde::{Deserialize, Serialize}; + +/// Configures `RuntimeManager` for one or more runtimes. +#[derive(Debug, Default, Clone, Serialize, Deserialize)] +#[must_use = "configs do nothing unless you use them"] +pub struct RuntimeManagerConfig { + /// Method of how a connection is recycled. + /// + /// See [`RecyclingMethod`]. + pub recycling_method: RecyclingMethod, +} + +impl RuntimeManagerConfig { + /// Returns a new [`RuntimeManagerConfig`]. + #[inline] + pub fn new() -> Self { + Self::default() + } + + /// Overrides the value of [`RuntimeManagerConfig`]`::recycling_method`. + pub fn recycling_method(mut self, recycling_method: RecyclingMethod) -> Self { + self.recycling_method = recycling_method; + self + } +} + +/// Possible methods of how a connection is recycled. +#[derive(Debug, Default, Clone, Serialize, Deserialize)] +pub enum RecyclingMethod { + /// Only check for open event bus when recycling existing connections. + /// + /// Unless you have special needs this is a safe choice. + #[default] + Fast, + + /// In addition to checking for open event bus a test query is executed. + /// + /// This is slower, but guarantees that the database connection is ready to be used. + Verified, +} + +impl RecyclingMethod { + /// Returns a new [`RecyclingMethod`]. + #[inline] + pub fn new() -> Self { + Self::default() + } +} + +#[cfg(test)] +mod test { + use crate::manager::RuntimeManagerConfig; + use crate::RuntimeResult; + + #[test] + fn build_default_settings() -> RuntimeResult<()> { + let _ = RuntimeManagerConfig::new(); + Ok(()) + } +} diff --git a/crates/client/manager/mod.rs b/crates/client/manager/mod.rs new file mode 100644 index 0000000..2f3c278 --- /dev/null +++ b/crates/client/manager/mod.rs @@ -0,0 +1,115 @@ +//! [`Manager`] of [`RuntimeClient`]s. + +mod client; +mod config; + +use std::fmt; + +use axiston_rt_schema::instance::GetStatusRequest; +use deadpool::managed::{Manager, Metrics, RecycleError, RecycleResult}; +use time::OffsetDateTime; +use tonic::transport::{Channel, Endpoint}; +use uuid::{NoContext, Timestamp, Uuid}; + +pub use crate::manager::client::RuntimeClient; +pub use crate::manager::config::{RecyclingMethod, RuntimeManagerConfig}; +use crate::middleware::RuntimeChannel; + +/// [`Manager`] of [`RuntimeClient`]s. +pub struct RuntimeManager { + recycling_method: RecyclingMethod, + runtime_channel: RuntimeChannel, +} + +impl RuntimeManager { + /// Returns a new [`RuntimeManager`]. + pub fn new(endpoints: impl Iterator, config: RuntimeManagerConfig) -> Self { + // TODO: Use Channel::balance_channel instead. + // TODO: Add methods to add/delete Endpoints. + let channel = Channel::balance_list(endpoints); + let channel = RuntimeChannel::new(channel); + + Self { + recycling_method: config.recycling_method, + runtime_channel: channel, + } + } +} + +impl fmt::Debug for RuntimeManager { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("RuntimeManager").finish_non_exhaustive() + } +} + +impl Manager for RuntimeManager { + type Type = RuntimeClient; + type Error = RuntimePoolError; + + async fn create(&self) -> Result { + let utc_datetime = OffsetDateTime::now_utc(); + let uuid_timestamp = Timestamp::from_unix( + NoContext, + utc_datetime.unix_timestamp() as u64, + utc_datetime.nanosecond(), + ); + + let channel = self.runtime_channel.clone(); + let client_id = Uuid::new_v7(uuid_timestamp); + Ok(RuntimeClient::new(client_id, channel)) + } + + async fn recycle( + &self, + conn: &mut Self::Type, + _metrics: &Metrics, + ) -> RecycleResult { + match self.recycling_method { + RecyclingMethod::Fast => return Ok(()), + RecyclingMethod::Verified => {} + } + + // TODO: Delete if it fails to verify. + let conn = conn.as_instance_client(); + let request = conn.get_status(GetStatusRequest { + verbose_metrics: Some(false), + force_latest: Some(false), + sliding_window: Some(0), + }); + + let _response = request + .await + .map_err(|status| RecycleError::Backend(status.into()))?; + + Ok(()) + } +} + +/// Unrecoverable failure of the [`RuntimeClient`]. +/// +/// Includes all error types that may occur. +/// Used to remap from [`PoolError`]. +/// +/// [`PoolError`]: deadpool::managed::PoolError +#[derive(Debug, thiserror::Error)] +#[must_use = "errors do nothing unless you use them"] +pub enum RuntimePoolError { + /// All endpoints have reached the limit. + #[error("all endpoints have reached the limit")] + EndpointsLimit, + + /// Connection pool has no endpoints. + #[error("connection pool has no endpoints")] + NoEndpoints, + + /// Transport failure (from the client or server). + #[error("transport failure: {0}")] + Transport(#[from] tonic::transport::Error), + + /// GRPC server failure. + #[error("transport failure: {0}")] + Status(#[from] tonic::Status), +} + +/// Specialized [`Result`] alias for the [`RuntimePoolError`] type. +pub type RuntimePoolResult = Result; diff --git a/crates/client/middleware/future.rs b/crates/client/middleware/future.rs new file mode 100644 index 0000000..bee282b --- /dev/null +++ b/crates/client/middleware/future.rs @@ -0,0 +1,43 @@ +//! [`Future`]s for the runtime [`Channel`]. +//! +//! [`Channel`]: crate::middleware::RuntimeChannel + +use std::fmt; +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; + +use tonic::body::BoxBody; +use tonic::codegen::http::Response; +use tonic::transport::channel::ResponseFuture; +use tonic::transport::Error; + +/// Response [`Future`] for the runtime [`Channel`]. +/// +/// [`Channel`]: crate::middleware::RuntimeChannel +pub struct RuntimeResponseFuture { + inner: ResponseFuture, +} + +impl RuntimeResponseFuture { + /// Returns a new [`RuntimeResponseFuture`]. + #[inline] + pub fn new(inner: ResponseFuture) -> Self { + Self { inner } + } +} + +impl Future for RuntimeResponseFuture { + type Output = Result, Error>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + Pin::new(&mut self.inner).poll(cx) + } +} + +impl fmt::Debug for RuntimeResponseFuture { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("RuntimeResponseFuture") + .finish_non_exhaustive() + } +} diff --git a/crates/client/middleware/mod.rs b/crates/client/middleware/mod.rs new file mode 100644 index 0000000..1017ce4 --- /dev/null +++ b/crates/client/middleware/mod.rs @@ -0,0 +1,52 @@ +//! [`Channel`] with [`tower`] middlewares. + +pub mod future; + +use std::fmt; +use std::task::{Context, Poll}; + +use derive_more::{Deref, DerefMut}; +use http::{Request, Response}; +use tonic::body::BoxBody; +use tonic::transport::{Channel, Error}; +use tower::Service; + +use crate::middleware::future::RuntimeResponseFuture; + +/// [`Channel`] with [`tower`] middlewares. +#[derive(Clone, Deref, DerefMut)] +pub struct RuntimeChannel { + inner: Channel, +} + +impl RuntimeChannel { + /// Returns a new [`RuntimeChannel`]. + #[inline] + pub fn new(inner: Channel) -> Self { + // TODO: Apply middlewares. + Self { inner } + } +} + +impl Service> for RuntimeChannel { + type Response = Response; + type Error = Error; + type Future = RuntimeResponseFuture; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + Service::poll_ready(&mut self.inner, cx) + } + + fn call(&mut self, req: Request) -> Self::Future { + let fut = Service::call(&mut self.inner, req); + RuntimeResponseFuture::new(fut) + } +} + +impl fmt::Debug for RuntimeChannel { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("RuntimeChannel") + .field("channel", &self.inner) + .finish_non_exhaustive() + } +} diff --git a/crates/jsvm/Cargo.toml b/crates/jsvm/Cargo.toml new file mode 100644 index 0000000..a2e359d --- /dev/null +++ b/crates/jsvm/Cargo.toml @@ -0,0 +1,23 @@ +# https://doc.rust-lang.org/cargo/reference/manifest.html + +[package] +name = "axiston-rt-jsvm" +version = { workspace = true } +edition = { workspace = true } +license = { workspace = true } +publish = { workspace = true } +readme = "./README.md" + +authors = { workspace = true } +repository = { workspace = true } +homepage = { workspace = true } +documentation = { workspace = true } + +[package.metadata.docs.rs] +all-features = true +rustdoc-args = ["--cfg", "docsrs"] + +[lib] +path = "lib.rs" + +[dependencies] diff --git a/crates/jsvm/README.md b/crates/jsvm/README.md new file mode 100644 index 0000000..28d0e46 --- /dev/null +++ b/crates/jsvm/README.md @@ -0,0 +1,20 @@ +### runtime/jsvm + +[![Build Status][action-badge]][action-url] +[![Crate Docs][docs-badge]][docs-url] +[![Crate Version][crates-badge]][crates-url] + +[action-badge]: https://img.shields.io/github/actions/workflow/status/axiston/runtime/build.yaml?branch=main&label=build&logo=github&style=flat-square +[action-url]: https://github.com/axiston/runtime/actions/workflows/build.yaml +[crates-badge]: https://img.shields.io/crates/v/axiston-rt-jsvm.svg?logo=rust&style=flat-square +[crates-url]: https://crates.io/crates/axiston-rt-jsvm +[docs-badge]: https://img.shields.io/docsrs/axiston-rt-jsvm?logo=Docs.rs&style=flat-square +[docs-url]: http://docs.rs/axiston-rt-jsvm + +Lorem Ipsum. Lorem Ipsum. Lorem Ipsum. + +#### Notes + +- Lorem Ipsum. +- Lorem Ipsum. +- Lorem Ipsum. diff --git a/crates/jsvm/extension/mod.rs b/crates/jsvm/extension/mod.rs new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/crates/jsvm/extension/mod.rs @@ -0,0 +1 @@ + diff --git a/crates/jsvm/lib.rs b/crates/jsvm/lib.rs new file mode 100644 index 0000000..206a3a6 --- /dev/null +++ b/crates/jsvm/lib.rs @@ -0,0 +1,7 @@ +#![forbid(unsafe_code)] +#![cfg_attr(docsrs, feature(doc_cfg))] +#![doc = include_str!("./README.md")] + +mod extension; +mod runtime; +mod utility; diff --git a/crates/jsvm/runtime/mod.rs b/crates/jsvm/runtime/mod.rs new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/crates/jsvm/runtime/mod.rs @@ -0,0 +1 @@ + diff --git a/crates/jsvm/utility/hashing.rs b/crates/jsvm/utility/hashing.rs new file mode 100644 index 0000000..48a160f --- /dev/null +++ b/crates/jsvm/utility/hashing.rs @@ -0,0 +1,184 @@ +//! A utility for computing the SHA-256 hash of a directory. +//! +//! This module recursively traverses a directory, calculates hashes of all files, +//! and combines them to generate a single hash for the directory structure and contents. + +use std::fs::File; +use std::io::{self, Read}; +use std::path::Path; + +use sha2::{Digest, Sha256}; +use walkdir::WalkDir; + +/// Computes the [SHA-256] hash of a single file. +/// +/// See the [`OpenOptions::open`] method for more details. +/// +/// # Errors +/// +/// This function will return an error if `path` does not already exist. +/// Other errors may also be returned according to [`OpenOptions::open`]. +/// +/// # Examples +/// +/// ```rust,no_run +/// use axiston_rt_source::utils::io::hash_file; +/// +/// fn main() -> std::io::Result<()> { +/// let _ = hash_file("foo.txt")?; +/// Ok(()) +/// } +/// ``` +/// +/// [SHA-256]: https://en.wikipedia.org/wiki/SHA-2 +/// [`OpenOptions::open`]: std::fs::OpenOptions +pub fn hash_file(path: impl AsRef) -> io::Result> { + _hash_file_impl(path.as_ref()) +} + +/// Computes the [SHA-256] hash of a directory by traversing its contents. +/// +/// # Errors +/// +/// This function will return an error in the following situations, but is not +/// limited to just these cases: +/// +/// * The provided `path` doesn't exist. +/// * The process lacks permissions to view the contents. +/// * The `path` points at a non-directory file. +/// +/// # Examples +/// +/// ```rust,no_run +/// use axiston_rt_source::utils::io::hash_directory; +/// +/// fn main() -> std::io::Result<()> { +/// let _ = hash_directory("./foo")?; +/// Ok(()) +/// } +/// ``` +pub fn hash_directory(path: impl AsRef) -> io::Result> { + _hash_directory_impl(path.as_ref(), None) +} + +/// Computes the [SHA-256] hash of a directory with a custom file filter. +/// +/// # Errors +/// +/// This function will return an error in the following situations, but is not +/// limited to just these cases: +/// +/// * The provided `path` doesn't exist. +/// * The process lacks permissions to view the contents. +/// * The `path` points at a non-directory file. +/// +/// # Examples +/// +/// ```rust,no_run +/// use axiston_rt_source::utils::io::hash_directory_with_filter; +/// +/// fn main() -> std::io::Result<()> { +/// let _ = hash_directory_with_filter("./foo", |path| { +/// // Path is guaranteed to point to the file. +/// path.extension().map_or(false, |ext| ext == "txt") +/// })?; +/// +/// Ok(()) +/// } +/// ``` +/// +/// [SHA-256]: https://en.wikipedia.org/wiki/SHA-2 +pub fn hash_directory_with_filter(path: impl AsRef, filter: F) -> io::Result> +where + F: Fn(&Path) -> bool, +{ + _hash_directory_impl(path.as_ref(), Some(&filter)) +} + +/// Computes the [SHA-256] hash of a file. +/// +/// [SHA-256]: https://en.wikipedia.org/wiki/SHA-2 +fn _hash_file_impl(path: &Path) -> io::Result> { + let mut file = File::open(path)?; + let mut hasher = Sha256::new(); + let mut buffer = [0; 1024]; + + while let Ok(n) = file.read(&mut buffer) { + if n == 0 { + break; + } + + hasher.update(&buffer[..n]); + } + + Ok(hasher.finalize().to_vec()) +} + +/// Computes the [SHA-256] hash of a directory with an optional filter. +/// +/// [SHA-256]: https://en.wikipedia.org/wiki/SHA-2 +fn _hash_directory_impl( + path: &Path, + filter: Option<&dyn Fn(&Path) -> bool>, +) -> io::Result> { + let mut hasher = Sha256::new(); + + for entry in WalkDir::new(path).sort_by_file_name() { + let entry = entry?; + let path = entry.path(); + + if !path.is_file() { + continue; + } + + if let Some(filter_fn) = filter { + if !filter_fn(path) { + continue; + } + } + + hasher.update(path.as_os_str().as_encoded_bytes()); + let file_hash = _hash_file_impl(path)?; + hasher.update(&file_hash); + } + + Ok(hasher.finalize().to_vec()) +} + +// Tests for the hashing functions. +#[cfg(test)] +mod tests { + use std::fs::File; + use std::io::Write; + + use super::*; + + #[test] + fn test_hash_file() -> io::Result<()> { + let temp_dir = tempfile::tempdir()?; + let file_path = temp_dir.path().join("test.txt"); + let mut file = File::create(&file_path)?; + file.write_all(b"hello world")?; + + let hash = hash_file(&file_path)?; + assert_eq!( + hex::encode(hash), + "b94d27b9934d3e08a52e52d7da7dabfac484efe37a5380ee9088f7ace2efcde9" + ); + + Ok(()) + } + + #[test] + fn test_hash_directory() -> io::Result<()> { + let temp_dir = tempfile::tempdir()?; + let file_path = temp_dir.path().join("test.txt"); + let mut file = File::create(&file_path)?; + file.write_all(b"hello world")?; + + let hash = hash_directory(temp_dir.path())?; + assert!(!hash.is_empty()); + + Ok(()) + } +} diff --git a/crates/jsvm/utility/mod.rs b/crates/jsvm/utility/mod.rs new file mode 100644 index 0000000..fedcacc --- /dev/null +++ b/crates/jsvm/utility/mod.rs @@ -0,0 +1,13 @@ +//! Additional utilities for [`std::io`] and [`std::fs`]. + +mod hashing; + +pub mod io { + //! `std::io` utilities. + + pub use crate::utils::hashing::{hash_directory, hash_directory_with_filter, hash_file}; +} + +pub mod fs { + //! `std::fs` utilities. +} diff --git a/crates/schema/Cargo.toml b/crates/schema/Cargo.toml new file mode 100644 index 0000000..880c5e4 --- /dev/null +++ b/crates/schema/Cargo.toml @@ -0,0 +1,40 @@ +# https://doc.rust-lang.org/cargo/reference/manifest.html + +[package] +name = "axiston-rt-schema" +version = { workspace = true } +edition = { workspace = true } +license = { workspace = true } +publish = { workspace = true } +readme = "./README.md" + +authors = { workspace = true } +repository = { workspace = true } +homepage = { workspace = true } +documentation = { workspace = true } + +[package.metadata.docs.rs] +all-features = true +rustdoc-args = ["--cfg", "docsrs"] + +[lib] +path = "lib.rs" + +[features] +default = ["client", "server"] + +# Enables gRPC client code generation. +client = [] +# Enables gRPC server code generation. +server = [] + +[dependencies] +tonic = { workspace = true } +prost = { workspace = true } +tonic-types = { workspace = true } +prost-types = { workspace = true } + +[build-dependencies] +tonic-build = { workspace = true } +prost-build = { workspace = true } +anyhow = { workspace = true } diff --git a/crates/schema/README.md b/crates/schema/README.md new file mode 100644 index 0000000..8356fc0 --- /dev/null +++ b/crates/schema/README.md @@ -0,0 +1,20 @@ +### runtime/schema + +[![Build Status][action-badge]][action-url] +[![Crate Docs][docs-badge]][docs-url] +[![Crate Version][crates-badge]][crates-url] + +[action-badge]: https://img.shields.io/github/actions/workflow/status/axiston/runtime/build.yaml?branch=main&label=build&logo=github&style=flat-square +[action-url]: https://github.com/axiston/runtime/actions/workflows/build.yaml +[crates-badge]: https://img.shields.io/crates/v/axiston-rt-schema.svg?logo=rust&style=flat-square +[crates-url]: https://crates.io/crates/axiston-rt-schema +[docs-badge]: https://img.shields.io/docsrs/axiston-rt-schema?logo=Docs.rs&style=flat-square +[docs-url]: http://docs.rs/axiston-rt-schema + +Lorem Ipsum. Lorem Ipsum. Lorem Ipsum. + +#### Notes + +- Lorem Ipsum. +- Lorem Ipsum. +- Lorem Ipsum. diff --git a/crates/schema/build.rs b/crates/schema/build.rs new file mode 100644 index 0000000..4b0bedb --- /dev/null +++ b/crates/schema/build.rs @@ -0,0 +1,25 @@ +#![forbid(unsafe_code)] + +use std::path::PathBuf; + +fn main() -> anyhow::Result<()> { + println!("cargo:rerun-if-changed=./protobuf"); + + let generate_client = cfg!(feature = "client"); + let generate_server = cfg!(feature = "server"); + + let builder = tonic_build::configure() + .build_server(generate_server) + .build_client(generate_client) + .build_transport(true); + + let input_dir = PathBuf::from("./protobuf/"); + let instance = input_dir.join("./instance.proto"); + let registry = input_dir.join("./registry.proto"); + + let protos = [instance.as_path(), registry.as_path()]; + let includes = [input_dir.as_path()]; + builder.compile_protos(&protos, &includes)?; + + Ok(()) +} diff --git a/crates/schema/lib.rs b/crates/schema/lib.rs new file mode 100644 index 0000000..10f70c1 --- /dev/null +++ b/crates/schema/lib.rs @@ -0,0 +1,69 @@ +#![forbid(unsafe_code)] +#![cfg_attr(docsrs, feature(doc_cfg))] +#![doc = include_str!("./README.md")] + +pub mod message { + //! Includes files generated by `prost`. + //! Built from `message/*.proto`. + + pub mod event { + //! Includes files generated by `prost`. + //! Built from `message/event.proto`. + + tonic::include_proto!("rt.message.event"); + } + + pub mod graph { + //! Includes files generated by `prost`. + //! Built from `message/graph.proto`. + + tonic::include_proto!("rt.message.graph"); + } + + pub mod status { + //! Includes files generated by `prost`. + //! Built from `message/status.proto`. + + tonic::include_proto!("rt.message.status"); + } +} + +pub mod policy { + //! Includes files generated by `prost`. + //! Built from `policy/*.proto`. + + pub mod resource { + //! Includes files generated by `prost`. + //! Built from `policy/resource.proto`. + + tonic::include_proto!("rt.policy.resource"); + } + + pub mod retry { + //! Includes files generated by `prost`. + //! Built from `policy/retry.proto`. + + tonic::include_proto!("rt.policy.retry"); + } + + pub mod timeout { + //! Includes files generated by `prost`. + //! Built from `policy/timeout.proto`. + + tonic::include_proto!("rt.policy.timeout"); + } +} + +pub mod instance { + //! Includes files generated by `prost`. + //! Built from `instance.proto`. + + tonic::include_proto!("rt.instance"); +} + +pub mod registry { + //! Includes files generated by `prost`. + //! Built from `registry.proto`. + + tonic::include_proto!("rt.registry"); +} diff --git a/crates/schema/protobuf/instance.proto b/crates/schema/protobuf/instance.proto new file mode 100644 index 0000000..861a6cb --- /dev/null +++ b/crates/schema/protobuf/instance.proto @@ -0,0 +1,101 @@ +syntax = "proto3"; + +import "google/protobuf/timestamp.proto"; +import "google/protobuf/duration.proto"; +import "message/event.proto"; + +package rt.instance; + +// Requests service status and metrics. +message GetStatusRequest { + // Includes detailed metrics in the response. + optional bool verbose_metrics = 1; + // Forces retrieval of the latest metrics. + optional bool force_latest = 2; + // Sliding window length (used by metrics). + optional uint32 sliding_window = 3; +} + +// Contains service status and performance metrics. +message GetStatusResponse { + // Task-related metrics: + + // Number of tasks waiting in the queue to be processed. + uint32 tasks_waiting = 11; + // Number of tasks currently being processed. + uint32 tasks_running = 12; + // Number of tasks already completed. + uint32 tasks_completed = 13; + + // Time-related metrics: + + // Average waiting time for tasks in the most recent window. + google.protobuf.Duration recent_waiting_time = 21; + // Average running time for tasks in the most recent window. + google.protobuf.Duration recent_running_time = 22; + // Overall average waiting time since the service started. + google.protobuf.Duration average_waiting_time = 23; + // Overall average running time since the service started. + google.protobuf.Duration average_running_time = 24; +} + +// Describes the message format for sending events. +message EventRequest { + // The unique ID of the request message. + uint32 request_id = 1; + // The unique ID of the message group. + // Initial value is usually set by the Runtime. + optional uint32 group_id = 2; + + // When the event was recv by the gateway. + google.protobuf.Timestamp recv = 3; + // When the event was sent to the runtime. + google.protobuf.Timestamp send = 4; + + // The content of the message. + oneof payload { + // Step 1.1: Gateway requests to open a connection. + rt.message.event.OpenRequest open_request = 11; + // Step 2.1: Gateway submits a task for the execution. + rt.message.event.ExecuteRequest execute_request = 12; + // Step 3.1: Gateway requests to close the connection. + rt.message.event.CloseRequest close_request = 13; + } +} + +// Describes the message format for receiving events. +message EventResponse { + // The unique ID of the request message. + uint32 request_id = 1; + // The unique ID of the response message. + uint32 response_id = 2; + // The unique ID of the message group. + uint32 group_id = 3; + + // When the event was recv by the runtime. + google.protobuf.Timestamp recv = 4; + // When the event was sent to the gateway. + google.protobuf.Timestamp send = 5; + + // The content of the message. + oneof payload { + // Step 1.2: Runtime acknowledges that the connection is open. + rt.message.event.OpenResponse open_response = 11; + // Step 2.2: Runtime notifies the Gateway about the task's status change. + rt.message.event.NotifyResponse notify_response = 12; + // Step 2.3: Runtime responds with the result of executing the task. + rt.message.event.ExecuteResponse execute_response = 13; + // Step 3.2: Runtime confirms the connection is closed. + rt.message.event.CloseResponse close_response = 14; + } +} + +// Provides runtime instance management. +service Instance { + // Retrieves detailed service health and performance metrics. + rpc GetStatus(GetStatusRequest) returns (GetStatusResponse); + + // Provides a bidirectional event streaming RPC for continuous communication + // between the gateway (as a client) and the runtime (as a server). + rpc EventBus(stream EventRequest) returns (stream EventResponse); +} diff --git a/crates/schema/protobuf/message/event.proto b/crates/schema/protobuf/message/event.proto new file mode 100644 index 0000000..02de0a3 --- /dev/null +++ b/crates/schema/protobuf/message/event.proto @@ -0,0 +1,103 @@ +syntax = "proto3"; + +import "google/protobuf/timestamp.proto"; +import "google/protobuf/duration.proto"; +import "message/graph.proto"; +import "message/status.proto"; +import "policy/resource.proto"; +import "policy/retry.proto"; +import "policy/timeout.proto"; + +package rt.message.event; + +// Indicates the gateway is ready accept events. +message OpenRequest { + // Token for authenticating the gateway. + string authentication_token = 3; + // Required runtime capabilities. + repeated string runtime_capabilities = 4; + // Preferred communication protocols. + repeated string preferred_protocols = 5; + + // Constraints on resources for the task. + optional rt.policy.resource.ResourceLimits resource_limits = 21; + // Policy for retrying failed tasks. + optional rt.policy.retry.RetryPolicy retry_policy = 31; + // Policy for handling task timeouts. + optional rt.policy.timeout.TimeoutPolicy timeout_policy = 32; +} + +// Request to submit a graph/task for execution by the Runtime. +message ExecuteRequest { + // Includes either a single task of an entire graph. + oneof execute_model { + // Includes an entire graph, returns the output of all tasks. + rt.message.graph.TaskGraph task_graph = 1; + // Includes a single task, returns the output of it. + rt.message.graph.TaskNode task_node = 2; + } + + // Priority level of the graph/task (higher is more important). + optional int32 priority = 21; + // Deadline for the graph/task completion. + optional google.protobuf.Timestamp deadline = 22; + // Whether graph/task dependencies are cached. + optional bool cache_deps = 23; +} + +// Request to close the connection, blocking the Runtime queue. +message CloseRequest { + // Forces immediate closure without waiting. + optional bool force_close = 2; + // Reason for closing the connection. + optional string reason = 3; + // Require acknowledgment before closing. + optional bool ack_required = 4; +} + +// Start execution response message. +message OpenResponse { + // Allocated runtime resources. + optional rt.policy.resource.ResourceAllocated resource = 21; + // Policy for retrying failed tasks. + optional rt.policy.retry.RetryPolicy retry_policy = 31; + // Policy for handling task timeouts. + optional rt.policy.timeout.TimeoutPolicy timeout_policy = 32; +} + +// Intermediate graph status notification sent by the runtime. +message NotifyResponse { + oneof status_info { + // Details for the "waiting" status. + rt.message.status.WaitingStatus waiting = 21; + // Details for the "pre-running" status. + rt.message.status.PreRunningStatus pre_running = 22; + // Details for the "running" status. + rt.message.status.RunningStatus running = 23; + // Details for the "post-running" status. + rt.message.status.PostRunningStatus post_running = 24; + } +} + +// Response message containing the result of graph execution. +message ExecuteResponse { + // Unique identifier for the graph. + string graph_id = 1; + // Graph's return code indicating success or failure. + uint32 return_code = 2; + + // When the graph started. + google.protobuf.Timestamp start_time = 4; + // When the graph completed. + google.protobuf.Timestamp end_time = 5; + // Total time taken for execution. + google.protobuf.Duration execution_time = 6; +} + +// Response to acknowledge that no new graphs will be accepted. +message CloseResponse { + // Indicates if it's safe to terminate the connection. + bool is_safe_to_close = 1; + // Number of graphs still in the queue or running. + int32 remaining_graphs = 2; +} diff --git a/crates/schema/protobuf/message/graph.proto b/crates/schema/protobuf/message/graph.proto new file mode 100644 index 0000000..cb3f80e --- /dev/null +++ b/crates/schema/protobuf/message/graph.proto @@ -0,0 +1,41 @@ +syntax = "proto3"; + +package rt.message.graph; + +message TaskGraph { + map nodes = 1; + map edges = 2; +} + +message TaskNode { + oneof task_source { + GitRepository git_repo = 1; + BuiltinModule module = 2; + // TarArchive tar_archive = 5; + // ZipArchive zip_archive = 6; + } + + // Custom task parameters as key-value pairs. + map task_fields = 12; + // Sensitive task-specific data (e.g., API keys). + map task_secrets = 13; +} + +message TaskEdge { + // Unique identifier of the source node. + string head_node = 1; + // Unique identifier of the target node. + string tail_node = 2; +} + +message GitRepository { + string url = 11; +} + +// message TarArchive {} +// message ZipArchive {} + +message BuiltinModule { + // Unique identifier for the task. + string task_id = 11; +} diff --git a/crates/schema/protobuf/message/status.proto b/crates/schema/protobuf/message/status.proto new file mode 100644 index 0000000..3ee758b --- /dev/null +++ b/crates/schema/protobuf/message/status.proto @@ -0,0 +1,55 @@ +syntax = "proto3"; + +import "google/protobuf/timestamp.proto"; +import "google/protobuf/duration.proto"; +import "policy/resource.proto"; + +package rt.message.status; + +// Waiting status information. +message WaitingStatus { + // Number of graphs ahead in the queue. + uint32 graphs_before = 1; + // Total number of graphs in the queue. + uint32 queue_size = 2; + // Maximum capacity of the queue. + uint32 queue_capacity = 3; + // Estimated wait time before the graph starts. + google.protobuf.Duration wait_time = 4; +} + +// Pre-running status information. +message PreRunningStatus { + // Size (in bytes) of data to serialize. + uint64 input_bytes = 1; + // Version of the Runtime. + uint64 runtime_version = 2; + + // Whether task dependencies are cached. + bool cache_deps = 4; +} + +// Running status information. +message RunningStatus { + // Identifier of the thread running the graph. + int32 thread_id = 8; + // Estimated remaining run time. + google.protobuf.Duration run_time = 1; + // Current progress checkpoint. + int32 checkpoint = 6; +} + +// Post-running status information. +message PostRunningStatus { + // Graph's return code indicating success or failure. + uint32 return_code = 1; + // Total bytes read during graph execution. + uint64 read_bytes = 2; + // Total bytes written during graph execution. + uint64 written_bytes = 3; + // Size (in bytes) of data to deserialize. + uint64 output_bytes = 4; + // Peak or maximum recorded resource usage. + rt.policy.resource.ResourceUsage resource_usage = 5; +} + diff --git a/crates/schema/protobuf/policy/resource.proto b/crates/schema/protobuf/policy/resource.proto new file mode 100644 index 0000000..31cc3f4 --- /dev/null +++ b/crates/schema/protobuf/policy/resource.proto @@ -0,0 +1,53 @@ +syntax = "proto3"; + +package rt.policy.resource; + +// Represents the type of filesystem used. +enum FileSystemType { + // Default value, should not be used. + FILESYSTEM_UNSPECIFIED = 0; + // Virtual filesystem only (e.g., in-memory, network-based). + FILESYSTEM_VIRTUAL = 1; + // Physical filesystem only (e.g., SSD, HDD). + FILESYSTEM_PHYSICAL = 2; + // Virtual and physical filesystems. + FILESYSTEM_BOTH = 3; +} + +// Limits runtime resources. +message ResourceLimits { + // Maximum used CPU percentage. + optional uint32 max_cpu_cores = 1; + // Maximum used RAM in MB. + optional uint32 max_ram_mib = 2; + // Maximum used disk in MB. + optional uint64 max_disk_mib = 3; + + // File system type used ("virtual" or "physical"). + optional FileSystemType fs_type = 11; +} + +// Allocated runtime resources. +message ResourceAllocated { + // Allocated CPU cores. + optional uint32 allocated_cpu_cores = 1; + // Allocated RAM in megabytes. + optional uint32 allocated_ram_mib = 2; + // Allocated HDD in megabytes. + optional uint32 allocated_disk_mib = 3; + + // File system type used ("virtual" or "physical"). + FileSystemType fs_type = 11; +} + +// Peak (or maximum) recorded resource usage. +message ResourceUsage { + // Peak CPU usage as a percentage. + uint32 peak_cpu_percent = 1; + // Peak RAM usage in megabytes. + uint32 peak_ram_mb = 2; + // Peak disk usage in megabytes. + uint32 peak_disk_mb = 3; + // Peak GPU usage as a percentage. + uint32 peak_gpu_percent = 4; +} diff --git a/crates/schema/protobuf/policy/retry.proto b/crates/schema/protobuf/policy/retry.proto new file mode 100644 index 0000000..6e200e4 --- /dev/null +++ b/crates/schema/protobuf/policy/retry.proto @@ -0,0 +1,43 @@ +syntax = "proto3"; + +import "google/protobuf/duration.proto"; + +package rt.policy.retry; + +// Policy for retrying failed tasks. +message RetryPolicy { + // Maximum number of retry attempts. + uint32 max_retries = 1; + // Base delay between consecutive retries. + google.protobuf.Duration base_backoff = 2; + // Multiplier for exponential backoff. + optional double exponential_multiplier = 3; + // Maximum delay between consecutive retries. + google.protobuf.Duration max_backoff = 4; +} + +// Backoff strategy for retries. +message RetryStrategy { + // Options for different backoff strategies. + oneof strategy { + LinearBackoff linear = 1; + ExponentialBackoff exponential = 2; + } + + // Maximum delay between consecutive retries. + optional google.protobuf.Duration max_backoff = 3; + // Optional jitter percentage to randomize delays (0.0 - 1.0). + // No value applies the default jitter, a value of 0 disables jitter. + optional double jitter_percent = 4; +} + +// Linear backoff configuration. +message LinearBackoff { + optional google.protobuf.Duration step_backoff = 1; +} + +// Exponential backoff configuration. +message ExponentialBackoff { + optional google.protobuf.Duration base_backoff = 1; + optional double exponential_multiplier = 2; +} diff --git a/crates/schema/protobuf/policy/timeout.proto b/crates/schema/protobuf/policy/timeout.proto new file mode 100644 index 0000000..c438a7c --- /dev/null +++ b/crates/schema/protobuf/policy/timeout.proto @@ -0,0 +1,29 @@ +syntax = "proto3"; + +import "google/protobuf/duration.proto"; + +package rt.policy.timeout; + +// Policy for handling task timeouts. +message TimeoutPolicy { + // Maximum execution time allowed for the task. + google.protobuf.Duration execution_timeout = 1; + // Action to take on timeout (e.g., "retry", "terminate"). + optional TimeoutAction timeout_action = 2; + // Extra time given before final termination after timeout. + optional google.protobuf.Duration grace_period = 3; + // Frequency of checking for timeout conditions. + optional google.protobuf.Duration monitor_interval = 4; +} + +// Lists all of possible timeout actions. +enum TimeoutAction { + // Default value, action unspecified. + TIMEOUT_ACTION_UNSPECIFIED = 0; + // Task is considered to be failed. Retry the task. + // Timeout is the total time spent for all the retries. + TIMEOUT_ACTION_RETRY = 1; + // Task is considered to be failed. Do not retry the task. + // Timeout is the total time spent for a single attempt. + TIMEOUT_ACTION_TERMINATE = 2; +} diff --git a/crates/schema/protobuf/registry.proto b/crates/schema/protobuf/registry.proto new file mode 100644 index 0000000..69d1d33 --- /dev/null +++ b/crates/schema/protobuf/registry.proto @@ -0,0 +1,109 @@ +syntax = "proto3"; + +import "google/protobuf/timestamp.proto"; +import "google/protobuf/empty.proto"; + +package rt.registry; + +// Describes service's details. +message Service { + // Unique identifier for the service. + string service_id = 1; + // Current version of the service implementation. + string version = 2; + + // Display name of the service (e.g., Google Email). + string display_name = 21; + // Display unique identifier for the service's icon. + string display_icon = 22; + // Brief description of the service. + string description = 23; +} + +// Describes task's details. +message Task { + // Unique identifier for the task. + string task_id = 1; + // Unique identifier for the service. + string service_id = 2; + + // Display name of the task (e.g., "Send via Google Email"). + string display_name = 21; + // Unique identifier for the task's icon (e.g., "gmail_send"). + string display_icon = 22; + + // Contains a JSON schemas for inputs, outputs and errors. + string json_schema = 31; +} + +// Describes secrets required by a task. +message Secret { + // Unique identifier for the secret. + string secret_id = 1; + + // Display name of the secret (e.g., "Google Email API Key"). + string display_name = 21; + // Unique identifier for the secret's icon (e.g., "gmail_auth"). + string display_icon = 22; + // Brief description of the secret. + string description = 3; +} + +// Contains the registry details. +message RegistryContentResponse { + // Total number of registered services. + uint32 total_services = 1; + // Total number of registered tasks. + uint32 total_tasks = 2; + + // List of registered services. + repeated rt.registry.Service services = 11; + // List of registered tasks. + repeated rt.registry.Task tasks = 12; + + // Registry registration startup timestamp. + google.protobuf.Timestamp first_updated_at = 21; + // Registry registration shutdown timestamp. + google.protobuf.Timestamp last_updated_at = 22; +} + +// Defines the format of the search query. +message FindServicesRequest { + // Unique identifier for the query. + string query_id = 1; + // Searches the query to match services or tasks. + string query = 2; + // Filters by associated tags (if any). + repeated string tags = 3; + + // Limits on the number of search results. + uint32 max_results = 11; + // Includes deprecated tasks in search results. + bool include_deprecated = 12; + // Includes code-related tasks in search results. + bool include_code = 13; +} + +// Defines the format of the search results. +message FindServicesResponse { + // Unique identifier for the query. + string query_id = 1; + + // Services matching the search criteria. + repeated rt.registry.Service matching_services = 2; + // tasks matching the search criteria. + repeated rt.registry.Task matching_tasks = 3; + + // Total number of matches found. + uint32 total_matches = 11; + // Indicates if results were truncated. + bool is_truncated = 12; +} + +service Registry { + // Retrieves all available tasks and their metadata. + rpc GetRegistryServices(google.protobuf.Empty) returns (RegistryContentResponse); + + // Searches for specific services and tasks in the registry. + rpc GetRegistryTasks(FindServicesRequest) returns (FindServicesResponse); +} diff --git a/crates/server/Cargo.toml b/crates/server/Cargo.toml new file mode 100644 index 0000000..5513616 --- /dev/null +++ b/crates/server/Cargo.toml @@ -0,0 +1,58 @@ +# https://doc.rust-lang.org/cargo/reference/manifest.html + +[package] +name = "axiston-rt-server" +version = { workspace = true } +edition = { workspace = true } +license = { workspace = true } +publish = { workspace = true } +readme = "./README.md" + +authors = { workspace = true } +repository = { workspace = true } +homepage = { workspace = true } +documentation = { workspace = true } + +[package.metadata.docs.rs] +all-features = true +rustdoc-args = ["--cfg", "docsrs"] + +[features] +default = ["hashbrown"] + +# Enables high-performance SwissTable hash map. +hashbrown = ["dep:hashbrown"] + +[lib] +path = "lib.rs" + +[dependencies] +axiston-rt-schema = { workspace = true, features = ["server"] } +axiston-rt-jsvm = { workspace = true, features = [] } + +clap = { workspace = true } +futures = { workspace = true } +pin-project-lite = { workspace = true } + +tonic = { workspace = true } +prost = { workspace = true } +tonic-types = { workspace = true } +prost-types = { workspace = true } + +tower = { workspace = true } +tower-http = { workspace = true } +tracing = { workspace = true } + +serde = { workspace = true } +serde_json = { workspace = true } +derive_more = { workspace = true } +petgraph = { workspace = true } +thiserror = { workspace = true } + +semver = { workspace = true } +jsonschema = { workspace = true } +hashbrown = { workspace = true, optional = true } +ecow = { workspace = true } + +[dev-dependencies] +tokio = { workspace = true } diff --git a/crates/server/README.md b/crates/server/README.md new file mode 100644 index 0000000..a31cdb2 --- /dev/null +++ b/crates/server/README.md @@ -0,0 +1,20 @@ +### runtime/server + +[![Build Status][action-badge]][action-url] +[![Crate Docs][docs-badge]][docs-url] +[![Crate Version][crates-badge]][crates-url] + +[action-badge]: https://img.shields.io/github/actions/workflow/status/axiston/runtime/build.yaml?branch=main&label=build&logo=github&style=flat-square +[action-url]: https://github.com/axiston/runtime/actions/workflows/build.yaml +[crates-badge]: https://img.shields.io/crates/v/axiston-rt-server.svg?logo=rust&style=flat-square +[crates-url]: https://crates.io/crates/axiston-rt-server +[docs-badge]: https://img.shields.io/docsrs/axiston-rt-server?logo=Docs.rs&style=flat-square +[docs-url]: http://docs.rs/axiston-rt-server + +Lorem Ipsum. Lorem Ipsum. Lorem Ipsum. + +#### Notes + +- Lorem Ipsum. +- Lorem Ipsum. +- Lorem Ipsum. diff --git a/crates/server/handler/instance.rs b/crates/server/handler/instance.rs new file mode 100644 index 0000000..c729dcb --- /dev/null +++ b/crates/server/handler/instance.rs @@ -0,0 +1,47 @@ +use axiston_rt_schema::instance::instance_server::{Instance, InstanceServer}; +use axiston_rt_schema::instance::{ + EventRequest, EventResponse, GetStatusRequest, GetStatusResponse, +}; +use futures::stream::BoxStream; +use tonic::{Request, Response, Status, Streaming}; + +use crate::service::ServiceState; + +/// Implements [`Instance`] service for the [`InstanceService`]. +#[derive(Clone)] +pub struct InstanceService { + state: ServiceState, +} + +impl InstanceService { + /// Returns a new [`InstanceService`]. + #[inline] + pub fn new(state: ServiceState) -> Self { + Self { state } + } + + /// Returns a `GRPC` service. + #[inline] + pub fn into_server(self) -> InstanceServer { + InstanceServer::new(self) + } +} + +#[tonic::async_trait] +impl Instance for InstanceService { + async fn get_status( + &self, + request: Request, + ) -> Result, Status> { + todo!() + } + + type EventBusStream = BoxStream<'static, Result>; + + async fn event_bus( + &self, + request: Request>, + ) -> Result, Status> { + todo!() + } +} diff --git a/crates/server/handler/mod.rs b/crates/server/handler/mod.rs new file mode 100644 index 0000000..a509fd1 --- /dev/null +++ b/crates/server/handler/mod.rs @@ -0,0 +1,140 @@ +//! All `tonic::`[`Server`]s with related handlers. +//! +//! [`Server`]: tonic::transport::Server + +use std::borrow::Cow; + +use derive_more::From; +use tonic::{Code, Status}; + +pub use crate::handler::instance::InstanceService; +pub use crate::handler::registry::RegistryService; + +mod instance; +mod registry; + +/// The error type for [`Server`]s. +/// +/// [`Server`]: tonic::transport::Server +#[derive(Debug, Default, From)] +pub struct Error { + kind: ErrorKind, +} + +impl Error { + /// Returns a new [`Error`]. + #[inline] + pub fn new(kind: ErrorKind) -> Self { + Self { kind } + } + + /// Returns a new [`Status`]. + #[inline] + pub fn into_status(self) -> Status { + self.kind.into_status() + } +} + +impl From for Status { + #[inline] + fn from(value: Error) -> Self { + value.into_status() + } +} + +/// Comprehensive list of all possible [`Error`]s. +#[derive(Debug, Default, Clone, Copy)] +#[must_use = "errors do nothing unless you use them"] +pub enum ErrorKind { + #[default] + Unknown, + Aborted, +} + +impl ErrorKind { + /// Explicitly converts into the [`Error`]. + #[inline] + pub fn into_error(self) -> Error { + self.into() + } + + /// Returns a new [`ErrorRepr`]. + fn into_repr(self) -> ErrorRepr<'static> { + match self { + ErrorKind::Unknown => ErrorRepr::INTERNAL_SERVICE_ERROR, + ErrorKind::Aborted => ErrorRepr::SERVICE_WAS_ABORTED, + } + } + + /// Returns a new [`Status`]. + pub fn into_status(self) -> Status { + self.into_repr().into_status() + } +} + +/// Internal representation of a serialized [`Error`] response. +#[derive(Debug, Clone)] +#[must_use = "errors do nothing unless you use them"] +struct ErrorRepr<'a> { + pub message: Cow<'a, str>, + pub code: Code, +} + +impl<'a> ErrorRepr<'a> { + const INTERNAL_SERVICE_ERROR: Self = Self::new( + "Internal service error. Unknown underlying error or panic.", + Code::Unknown, + ); + + const SERVICE_WAS_ABORTED: Self = Self::new( + "Request processing was aborted by either client or server.", + Code::Unknown, + ); + + /// Returns a new [`ErrorRepr`]. + #[inline] + pub const fn new(message: &'a str, code: Code) -> Self { + Self { + message: Cow::Borrowed(message), + code, + } + } + + /// Returns a new [`Status`]. + #[inline] + pub fn into_status(self) -> Status { + Status::new(self.code, self.message) + } +} + +impl From> for Status { + #[inline] + fn from(value: ErrorRepr<'_>) -> Self { + value.into_status() + } +} + +/// A specialized [`Result`] type for the [`Error`] type. +/// +/// Used by [`Server`]s. +/// +/// [`Result`]: std::result::Result +/// [`Server`]: tonic::transport::Server +pub type Result = std::result::Result; + +#[cfg(test)] +mod test { + use crate::handler::{Error, ErrorKind}; + + #[test] + fn build_default_error() { + let error = Error::default(); + let _ = error.into_status(); + } + + #[test] + fn build_error_kind() { + let error = Error::new(ErrorKind::default()); + let _ = error.into_status(); + } +} diff --git a/crates/server/handler/registry.rs b/crates/server/handler/registry.rs new file mode 100644 index 0000000..9d94b08 --- /dev/null +++ b/crates/server/handler/registry.rs @@ -0,0 +1,43 @@ +use axiston_rt_schema::registry::registry_server::{Registry, RegistryServer}; +use axiston_rt_schema::registry::{ + FindServicesRequest, FindServicesResponse, RegistryContentResponse, +}; +use tonic::{Request, Response, Status}; + +use crate::service::ServiceState; + +/// Implements [`Registry`] service for the [`RegistryServer`]. +pub struct RegistryService { + state: ServiceState, +} + +impl RegistryService { + /// Returns a new [`RegistryService`]. + #[inline] + pub fn new(state: ServiceState) -> Self { + Self { state } + } + + /// Returns a `GRPC` service. + #[inline] + pub fn into_server(self) -> RegistryServer { + RegistryServer::new(self) + } +} + +#[tonic::async_trait] +impl Registry for RegistryService { + async fn get_registry_services( + &self, + request: Request<()>, + ) -> Result, Status> { + todo!() + } + + async fn get_registry_tasks( + &self, + request: Request, + ) -> Result, Status> { + todo!() + } +} diff --git a/crates/server/lib.rs b/crates/server/lib.rs new file mode 100644 index 0000000..4835da1 --- /dev/null +++ b/crates/server/lib.rs @@ -0,0 +1,14 @@ +#![forbid(unsafe_code)] +#![cfg_attr(docsrs, feature(doc_cfg))] +#![doc = include_str!("./README.md")] + +//! ### Examples +//! +//! ```rust +//! fn main() {} +//! ``` + +pub mod handler; +pub mod middleware; +pub mod routing; +pub mod service; diff --git a/crates/server/middleware/mod.rs b/crates/server/middleware/mod.rs new file mode 100644 index 0000000..874de47 --- /dev/null +++ b/crates/server/middleware/mod.rs @@ -0,0 +1 @@ +//! TODO. diff --git a/crates/server/routing/context.rs b/crates/server/routing/context.rs new file mode 100644 index 0000000..505e788 --- /dev/null +++ b/crates/server/routing/context.rs @@ -0,0 +1,405 @@ +use std::error::Error; +use std::fmt; + +use derive_more::{Deref, DerefMut}; +use jsonschema::ValidationError; +use serde::{Deserialize, Serialize}; +use serde_json::{json, Map, Value}; + +use crate::routing::context::layers::TaskLayers; + +/// Serializable [`TaskHandler`] service request. +/// +/// [`TaskHandler`]: crate::routing::handler::TaskHandler +#[derive(Clone, Serialize, Deserialize, Deref, DerefMut)] +#[must_use = "requests do nothing unless you serialize them"] +pub struct TaskRequest { + #[deref] + #[deref_mut] + inner: T, + + #[serde(rename = "task")] + pub(crate) task_id: String, + #[serde(skip)] + pub(crate) layers: Option, + + #[serde(rename = "inputs")] + pub(crate) inputs: Option, + #[serde(rename = "secrets")] + pub(crate) secrets: Option, +} + +impl TaskRequest { + /// Returns a new [`TaskRequest`]. + #[inline] + pub fn new(task_id: &str, inner: T) -> Self { + Self { + inner, + task_id: task_id.to_owned(), + layers: None, + inputs: None, + secrets: None, + } + } + + /// Merges the provided [`TaskLayers`] with the defaults. + pub fn with_layers(mut self, layers: TaskLayers) -> Self { + let _ = self.layers.get_or_insert(layers); + self + } + + /// Adds other key/value pair into the [`TaskRequest`]`::inputs` object. + pub fn with_inputs(mut self, key: &str, value: impl Into) -> Self { + let inputs = self + .inputs + .get_or_insert_with(|| Value::Object(Map::default())); + let Value::Object(object) = inputs else { + unreachable!(); + }; + + object.insert(key.to_owned(), value.into()); + self + } + + /// Adds other key/value pair into the [`TaskRequest`]`::secrets` object. + pub fn with_secrets(mut self, key: &str, value: impl Into) -> Self { + let inputs = self + .secrets + .get_or_insert_with(|| Value::Object(Map::default())); + let Value::Object(object) = inputs else { + unreachable!(); + }; + + object.insert(key.to_owned(), value.into()); + self + } + + /// Returns the inner data. + #[inline] + pub fn into_inner(self) -> T { + self.inner + } +} + +impl fmt::Debug for TaskRequest { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("TaskRequest") + .field("inputs", &self.inputs) + .field("secrets", &"*****") + .finish_non_exhaustive() + } +} + +/// Deserializable [`TaskHandler`] service response. +/// +/// [`TaskHandler`]: crate::handler::TaskHandler +#[derive(Clone, Serialize, Deserialize, Deref, DerefMut)] +#[must_use = "responses do nothing unless you deserialize them"] +pub struct TaskResponse { + #[deref] + #[deref_mut] + inner: T, + + #[serde(rename = "outputs")] + pub(crate) outputs: Option, + #[serde(rename = "metrics")] + pub(crate) metrics: Option, +} + +impl TaskResponse { + /// Returns a new [`TaskResponse`]. + #[inline] + pub fn new(inner: T) -> Self { + Self { + inner, + outputs: None, + metrics: None, + } + } + + /// Adds other key/value pair into the [`TaskResponse`]`::outputs` object. + pub fn with_outputs(mut self, key: &str, value: impl Into) -> Self { + let outputs = self + .outputs + .get_or_insert_with(|| Value::Object(Map::default())); + let Value::Object(object) = outputs else { + unreachable!(); + }; + + object.insert(key.to_owned(), value.into()); + self + } + + /// Adds other key/value pair into the [`TaskResponse`]`::metrics` object. + pub fn with_metrics(mut self, key: &str, value: impl Into) -> Self { + let metrics = self + .metrics + .get_or_insert_with(|| Value::Object(Map::default())); + let Value::Object(object) = metrics else { + unreachable!(); + }; + + object.insert(key.to_owned(), value.into()); + self + } + + /// Returns the inner data. + #[inline] + pub fn into_inner(self) -> T { + self.inner + } +} + +impl fmt::Debug for TaskResponse { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("TaskResponse") + .field("outputs", &self.outputs) + .field("metrics", &self.metrics) + .finish_non_exhaustive() + } +} + +/// Unrecoverable failure during the [`TaskHandler`] execution. +/// +/// [`TaskHandler`]: crate::handler::TaskHandler +#[derive(Debug, thiserror::Error, Serialize, Deserialize)] +#[error("internal handler error")] +#[must_use = "errors do nothing unless you use them"] +pub struct TaskError { + #[serde(skip)] + pub(crate) error: Option>, + + #[serde(rename = "kind")] + pub(crate) kind: TaskErrorKind, + #[serde(rename = "values")] + pub(crate) values: Option, +} +/// Specifies the general categories of [`TaskError`]s. +#[derive(Debug, Clone, Serialize, Deserialize)] +#[must_use = "errors do nothing unless you use them"] +pub enum TaskErrorKind { + /// Task wih a requested identifier was not found. + NotFound, + /// Request or response schema validation failed. + Schema, + + /// Error caused by the timeout policy. + TimeoutPolicy, + /// Error caused by the retry policy. + RetryPolicy, + + /// Unknown (type-erased) error occurred. + Unknown, +} + +impl TaskError { + /// Returns a new [`TaskError`]. + #[inline] + pub fn new(kind: TaskErrorKind, error: E) -> Self + where + E: Into>, + { + Self { + kind, + error: Some(error.into()), + values: None, + } + } + + /// Overrides the default value of [`TaskError`]`::values`. + #[inline] + pub fn with_values(mut self, values: Value) -> Self { + self.values = Some(values); + self + } +} + +impl<'a> From> for TaskError { + fn from(value: ValidationError<'a>) -> Self { + Self::new( + TaskErrorKind::Schema, + "request or response schema validation failed", + ) + .with_values(json!({ + "instance": value.instance.into_owned(), + })) + } +} + +/// Specialized [`Result`] alias for the [`TaskError`] type. +pub type TaskResult = Result; + +pub mod layers { + //! Declarative `tower::`[`Layer`]s configuration. + //! + //! [`Layer`]: tower::Layer + + use serde::{Deserialize, Serialize}; + + use crate::routing::context::policies::{RetryPolicy, TimeoutPolicy}; + + /// Declarative `tower::`[`Layer`]s configuration. + /// + /// [`Layer`]: tower::Layer + #[derive(Debug, Default, Clone, Serialize, Deserialize)] + #[must_use = "layers do nothing unless you use them"] + pub struct TaskLayers { + #[serde(rename = "timeout")] + pub(crate) timeout_policy: Option, + #[serde(rename = "retry")] + pub(crate) retry_policy: Option, + } + + impl TaskLayers { + /// Returns a new [`TaskLayers`]. + #[inline] + pub fn new() -> Self { + Self::default() + } + + /// Overrides the default value of [`TaskLayers`]`::timeout_policy`. + pub fn with_timeout_policy(mut self, timeout_policy: TimeoutPolicy) -> Self { + self.timeout_policy = Some(timeout_policy); + self + } + + /// Overrides the default value of [`TaskLayers`]`::retry_policy`. + pub fn with_retry_policy(mut self, retry_policy: RetryPolicy) -> Self { + self.retry_policy = Some(retry_policy); + self + } + } +} + +pub mod policies { + //! [`RetryPolicy`] and [`TimeoutPolicy`]. + + use std::time::Duration; + + use serde::{Deserialize, Serialize}; + + /// Defines a policy for handling timeouts. + #[derive(Debug, Clone, Serialize, Deserialize)] + #[must_use = "policies do nothing unless you use them"] + pub struct TimeoutPolicy { + /// The duration after which a timeout will occur. + pub duration: Duration, + /// The action to take when a timeout occurs. + pub action: TimeoutAction, + } + + /// Specifies actions to take when a timeout occurs. + #[derive(Debug, Default, Clone, Serialize, Deserialize)] + #[must_use = "policies do nothing unless you use them"] + pub enum TimeoutAction { + /// Retry the operation after a timeout. + Retry, + /// Terminate the operation after a timeout (default behavior). + #[default] + Terminate, + } + + impl TimeoutPolicy { + /// Returns a new retry [`TimeoutPolicy`] with the specified timeout duration. + pub fn retry(timeout: Duration) -> Self { + Self { + duration: timeout, + action: TimeoutAction::Retry, + } + } + + /// Returns a new terminate [`TimeoutPolicy`] with the specified timeout duration. + pub fn terminate(timeout: Duration) -> Self { + Self { + duration: timeout, + action: TimeoutAction::Terminate, + } + } + } + + /// Defines a policy for handling retries. + #[derive(Debug, Clone, Serialize, Deserialize)] + #[must_use = "policies do nothing unless you use them"] + pub struct RetryPolicy { + /// The maximum number of retry attempts. + pub retries: u32, + /// The strategy to use for determining retry intervals. + pub strategy: RetryStrategy, + } + + /// Specifies strategies for calculating retry intervals. + #[derive(Debug, Clone, Serialize, Deserialize)] + #[must_use = "policies do nothing unless you use them"] + pub enum RetryStrategy { + /// Linear backoff strategy with optional jitter and max backoff duration. + Linear { + step_backoff: Duration, + max_backoff: Option, + jitter_perc: Option, + }, + /// Exponential backoff strategy with optional jitter and max backoff duration. + Exponential { + base_backoff: Duration, + max_backoff: Option, + jitter_perc: Option, + }, + } + + impl RetryPolicy { + /// Returns a new linear [`RetryPolicy`] with the specified retries and base backoff duration. + pub fn linear(retries: u32, base_backoff: Duration) -> Self { + Self { + retries, + strategy: RetryStrategy::Linear { + step_backoff: base_backoff, + max_backoff: None, + jitter_perc: None, + }, + } + } + + /// Returns a new exponential [`RetryPolicy`] with the specified retries, base backoff. + pub fn exponential(retries: u32, base_backoff: Duration) -> Self { + Self { + retries, + strategy: RetryStrategy::Exponential { + base_backoff, + max_backoff: None, + jitter_perc: None, + }, + } + } + + /// Sets the maximum backoff duration and returns the modified policy. + pub fn with_max_backoff(mut self, new_max_backoff: Duration) -> Self { + match self.strategy { + RetryStrategy::Linear { + ref mut max_backoff, + .. + } => *max_backoff = Some(new_max_backoff), + RetryStrategy::Exponential { + ref mut max_backoff, + .. + } => *max_backoff = Some(new_max_backoff), + }; + + self + } + + /// Sets the jitter percentage and returns the modified policy. + pub fn with_jitter_perc(mut self, new_jitter_perc: f64) -> Self { + match self.strategy { + RetryStrategy::Linear { + ref mut jitter_perc, + .. + } => *jitter_perc = Some(new_jitter_perc), + RetryStrategy::Exponential { + ref mut jitter_perc, + .. + } => *jitter_perc = Some(new_jitter_perc), + }; + + self + } + } +} diff --git a/crates/server/routing/handler.rs b/crates/server/routing/handler.rs new file mode 100644 index 0000000..aac1a04 --- /dev/null +++ b/crates/server/routing/handler.rs @@ -0,0 +1,371 @@ +//! [`TaskHandler`], [`TaskHandlerLayer`], its future and metrics. + +use std::fmt; +use std::marker::PhantomData; +use std::task::{Context, Poll}; + +use tower::load::Load; +use tower::util::BoxCloneSyncService; +use tower::{Layer, Service, ServiceBuilder}; + +use crate::routing::context::{TaskError, TaskRequest, TaskResponse}; +use crate::routing::handler::future::TaskFuture; +use crate::routing::handler::metric::{TaskMetrics, TaskMetricsLock}; + +/// Unified `tower::`[`Service`] for executing tasks. +/// +/// Opaque [`BoxCloneSyncService`]<[`TaskRequest`], [`TaskResponse`], [`TaskError`]>. +#[must_use = "services do nothing unless you `.poll_ready` or `.call` them"] +pub struct TaskHandler { + inner: BoxCloneSyncService, TaskResponse, TaskError>, + metrics: TaskMetricsLock, +} + +impl TaskHandler { + /// Returns a new [`TaskHandler`]. + pub fn new(inner: S) -> Self + where + T: 'static, + U: 'static, + S: Service + Clone + Send + Sync + 'static, + Req: From> + 'static, + S::Response: Into> + 'static, + S::Error: Into + 'static, + S::Future: Send + 'static, + { + Self::with_metrics(inner, TaskMetricsLock::default()) + } + + /// Returns a new [`TaskHandler`] with provided [`TaskMetricsLock`]. + /// + /// Allows to share [`TaskMetricsLock`] and the inner [`TaskMetrics`]. + pub fn with_metrics(inner: S, metrics: TaskMetricsLock) -> Self + where + T: 'static, + U: 'static, + S: Service + Clone + Send + Sync + 'static, + Req: From> + 'static, + S::Response: Into> + 'static, + S::Error: Into + 'static, + S::Future: Send + 'static, + { + let inner = ServiceBuilder::new() + .map_request(From::from) + .map_response(Into::into) + .map_err(Into::into) + .service(inner); + + Self { + inner: BoxCloneSyncService::new(inner), + metrics, + } + } + + /// Maps a `TaskHandler` to `TaskHandler` by applying a function to a contained service. + pub fn map(self, f: F) -> TaskHandler + where + F: FnOnce( + BoxCloneSyncService, TaskResponse, TaskError>, + ) -> BoxCloneSyncService, TaskResponse, TaskError>, + { + TaskHandler { + inner: f(self.inner), + metrics: self.metrics, + } + } + + /// Returns a new [`TaskMetrics`]. + #[inline] + pub fn snapshot(&self) -> TaskMetrics { + self.metrics.snapshot() + } +} + +impl Clone for TaskHandler { + #[inline] + fn clone(&self) -> Self { + Self { + inner: self.inner.clone(), + metrics: self.metrics.clone(), + } + } +} + +impl fmt::Debug for TaskHandler { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("TaskHandler").finish_non_exhaustive() + } +} + +impl Service> for TaskHandler +where + T: 'static + Send + Clone, + U: 'static + Send, +{ + type Response = TaskResponse; + type Error = TaskError; + type Future = TaskFuture; + + #[inline] + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_ready(cx) + } + + #[inline] + fn call(&mut self, req: TaskRequest) -> Self::Future { + let Some(layers) = &req.layers else { + return TaskFuture::with_metrics(self.inner.call(req), self.metrics.clone()); + }; + + // let compose = LayerCompose::new(layers); + // let mut svc = compose.apply_layers(self.inner.clone()); + // TaskFuture::with_metrics(svc.call(req), self.metrics.clone()) + + todo!() + } +} + +impl Load for TaskHandler { + type Metric = TaskMetrics; + + #[inline] + fn load(&self) -> Self::Metric { + self.metrics.snapshot() + } +} + +/// `tower::`[`Layer`] that produces a [`TaskHandler`] services. +pub struct TaskHandlerLayer { + metrics: TaskMetricsLock, + inner: PhantomData<(Req, T, U)>, +} + +impl TaskHandlerLayer { + /// Returns a new [`TaskHandlerLayer`]. + #[inline] + pub fn new(metrics: TaskMetricsLock) -> Self { + Self { + metrics, + inner: PhantomData, + } + } +} + +impl Default for TaskHandlerLayer { + #[inline] + fn default() -> Self { + Self { + metrics: TaskMetricsLock::default(), + inner: PhantomData, + } + } +} + +impl Layer for TaskHandlerLayer +where + T: 'static, + U: 'static, + S: Service + Clone + Send + Sync + 'static, + Req: From> + 'static, + S::Response: Into> + 'static, + S::Error: Into + 'static, + S::Future: Send + 'static, +{ + type Service = TaskHandler; + + #[inline] + fn layer(&self, inner: S) -> Self::Service { + TaskHandler::with_metrics(inner, self.metrics.clone()) + } +} + +pub mod future { + //! [`Future`] types for [`TaskHandler`]s. + //! + //! [`TaskHandler`]: crate::routing::handler::TaskHandler + + use std::future::Future; + use std::pin::Pin; + use std::task::{Context, Poll}; + + use futures::future::BoxFuture; + use futures::FutureExt; + use pin_project_lite::pin_project; + + use crate::routing::context::{TaskResponse, TaskResult}; + use crate::routing::handler::metric::TaskMetricsLock; + + pin_project! { + /// Opaque [`Future`] return type for [`TaskHandler::call`]. + /// + /// Contains a single `futures::`[`BoxFuture`]. + /// + /// [`TaskHandler::call`]: crate::routing::handler::TaskHandler + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct TaskFuture { + #[pin] fut: BoxFuture<'static, TaskResult>>, + metrics: Option, + } + } + + impl TaskFuture { + /// Returns a new [`TaskFuture`]. + #[inline] + pub fn new(fut: F) -> Self + where + F: Future>> + Sized + Send + 'static, + { + Self { + fut: fut.boxed(), + metrics: None, + } + } + + /// Returns a new [`TaskFuture`]. + #[inline] + pub fn with_metrics(fut: F, metrics: TaskMetricsLock) -> Self + where + F: Future>> + Sized + Send + 'static, + { + Self { + fut: fut.boxed(), + metrics: Some(metrics), + } + } + } + + impl From>>> for TaskFuture { + #[inline] + fn from(fut: BoxFuture<'static, TaskResult>>) -> Self { + Self { fut, metrics: None } + } + } + + impl Future for TaskFuture { + type Output = TaskResult>; + + #[inline] + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + this.fut.poll(cx) + } + } + + #[cfg(test)] + mod test { + use crate::routing::context::{TaskResponse, TaskResult}; + use crate::routing::handler::future::TaskFuture; + + #[test] + fn from_async_block() -> TaskResult<()> { + let fut = async move { Ok(TaskResponse::new(5)) }; + let _fut = TaskFuture::new(fut); + + Ok(()) + } + } +} + +pub mod metric { + //! `tower::`[`Load`] metric types for [`TaskHandler`]s. + //! + //! [`Load`]: tower::load::Load + //! [`TaskHandler`]: crate::routing::handler::TaskHandler + + use std::sync::{Arc, Mutex}; + + use serde::{Deserialize, Serialize}; + + /// Reference-counting wrapper for [`TaskMetrics`]. + /// + /// Use by [`TaskHandler`]s and [`TaskFuture`]s. + /// + /// [`TaskHandler`]: crate::routing::handler::TaskHandler + /// [`TaskFuture`]: crate::routing::handler::future::TaskFuture + #[derive(Debug, Default, Clone)] + #[must_use = "metrics do nothing unless you serialize them"] + pub struct TaskMetricsLock { + inner: Arc>, + } + + impl TaskMetricsLock { + /// Returns a new [`TaskMetricsLock`]. + #[inline] + pub fn new(metrics: TaskMetrics) -> Self { + Self { + inner: Arc::new(Mutex::new(metrics)), + } + } + + /// Returns a new [`TaskMetrics`]. + pub fn snapshot(&self) -> TaskMetrics { + let guard = self.inner.lock().expect("should not be locked"); + guard.clone() + } + } + + /// `tower::load::`[`Load`] metrics for [`TaskHandler`]s. + /// + /// [`Load`]: tower::load::Load + /// [`TaskHandler`]: crate::routing::handler::TaskHandler + #[derive(Debug, Default, Clone, PartialOrd, PartialEq, Serialize, Deserialize)] + #[must_use = "metrics do nothing unless you serialize them"] + pub struct TaskMetrics { + // TODO: Implement all metrics. + + // pub average_waiting_time: Duration, + // pub average_recent_waiting_time: Duration, + // pub average_running_time: Duration, + // pub average_recent_running_time: Duration, + // pub total_success_runs: u32, + // pub total_failure_runs: u32, + } + + impl TaskMetrics { + /// Returns a new [`TaskMetrics`]. + #[inline] + pub fn new() -> Self { + Self::default() + } + } + + #[cfg(test)] + mod test { + use crate::routing::context::TaskResult; + use crate::routing::handler::metric::{TaskMetrics, TaskMetricsLock}; + + #[test] + fn metrics_lock() -> TaskResult<()> { + let metrics_lock = TaskMetricsLock::default(); + assert_eq!(TaskMetrics::new(), metrics_lock.snapshot()); + Ok(()) + } + } +} + +#[cfg(test)] +mod test { + use tower::{service_fn, ServiceBuilder}; + + use crate::routing::context::{TaskError, TaskRequest, TaskResponse}; + use crate::routing::handler::{TaskHandler, TaskHandlerLayer}; + + async fn handle(request: TaskRequest) -> Result, TaskError> { + Ok(TaskResponse::new(request.into_inner())) + } + + #[test] + fn service_compose() -> Result<(), TaskError> { + let inner = service_fn(handle); + let _service = TaskHandler::new(inner); + Ok(()) + } + + #[test] + fn service_builder() -> Result<(), TaskError> { + let _service = ServiceBuilder::new() + .layer(TaskHandlerLayer::default()) + .service(service_fn(handle)); + Ok(()) + } +} diff --git a/crates/server/routing/manifest.rs b/crates/server/routing/manifest.rs new file mode 100644 index 0000000..e4e88b9 --- /dev/null +++ b/crates/server/routing/manifest.rs @@ -0,0 +1,95 @@ +//! [`TaskManifest`] and [`ServiceManifest`]. + +use semver::Version; +use serde::{Deserialize, Serialize}; +use serde_json::Value; + +/// Metadata and properties of a single service. +#[derive(Debug, Clone, Serialize, Deserialize)] +#[must_use = "manifests do nothing unless you serialize them"] +pub struct ServiceManifest { + /// Unique service identifier. + #[serde(rename = "service")] + pub service_id: String, + /// Currently used service's version. + #[serde(rename = "version")] + pub version: Option, + + /// Stabilization flag and reason. + #[serde(rename = "stabilized")] + pub stabilized: Option, + /// Deprecation flag and reason. + #[serde(rename = "deprecated")] + pub deprecated: Option, +} + +impl ServiceManifest { + /// Creates a new [`ServiceManifest`] with the specified service identifier. + pub fn new(id: &str) -> Self { + Self { + service_id: id.to_owned(), + version: None, + stabilized: None, + deprecated: None, + } + } +} + +/// Metadata and properties of a single task. +#[derive(Debug, Clone, Serialize, Deserialize)] +#[must_use = "manifests do nothing unless you serialize them"] +pub struct TaskManifest { + /// Unique task identifier. + #[serde(rename = "task")] + pub route_id: String, + /// Unique service identifier. + #[serde(rename = "service")] + pub service_id: Option, + /// Currently used task version. + #[serde(rename = "version")] + pub version: Option, + + /// JSON Schema used for i/o validation. + #[serde(rename = "schemas")] + pub schemas: Option, + + /// Stabilization flag and reason. + #[serde(rename = "stabilized")] + pub stabilized: Option, + /// Deprecation flag and reason. + #[serde(rename = "deprecated")] + pub deprecated: Option, +} + +/// Stabilization or deprecation notices with metadata. +#[derive(Debug, Clone, Serialize, Deserialize)] +#[must_use = "manifests do nothing unless you serialize them"] +pub struct Notice { + /// The version since the notice was applied. + pub since_version: Option, + /// Reason for the change. + pub change_reason: Option, +} + +/// Schemas used for input, output, and error validation. +#[derive(Debug, Clone, Serialize, Deserialize)] +#[must_use = "manifests do nothing unless you serialize them"] +pub struct TaskSchemas { + pub inputs_schema: Option, + pub outputs_schema: Option, + pub errors_schema: Option, +} + +impl TaskManifest { + /// Creates a new [`TaskManifest`] with the specified task identifier. + pub fn new(id: &str) -> Self { + Self { + route_id: id.to_owned(), + service_id: None, + version: None, + schemas: None, + stabilized: None, + deprecated: None, + } + } +} diff --git a/crates/server/routing/mod.rs b/crates/server/routing/mod.rs new file mode 100644 index 0000000..b610c68 --- /dev/null +++ b/crates/server/routing/mod.rs @@ -0,0 +1,333 @@ +//! Task routing with [`Router`]<[`TaskRequest`], [`TaskResponse`]>. + +#[cfg(not(feature = "hashbrown"))] +use std::collections::HashMap; +use std::fmt; +use std::sync::Arc; + +#[cfg(feature = "hashbrown")] +use hashbrown::HashMap; +use tower::ServiceExt; + +use crate::routing::context::layers::TaskLayers; +use crate::routing::context::{TaskError, TaskErrorKind, TaskRequest, TaskResponse, TaskResult}; +use crate::routing::handler::metric::TaskMetrics; +use crate::routing::handler::TaskHandler; +use crate::routing::manifest::{ServiceManifest, TaskManifest}; +use crate::routing::route::{Route, ServiceIndex, TaskIndex}; + +pub mod context; +pub mod handler; +pub mod manifest; +pub mod route; + +/// Request data alias for a default [`Router`]. +pub type RouteRequest = (); + +/// Response data alias for a default [`Router`]. +pub type RouteResponse = (); + +/// Provides a mechanism for managing and executing tasks within a system. +/// +/// It allows the registration of services and tasks using manifests, supports middleware [`TaskLayers`] +/// for extensibility, and routes incoming [`TaskRequest`] to the appropriate [`TaskHandler`]s. +#[must_use = "routes do nothing unless you use them"] +pub struct Router { + inner: Arc>, +} + +struct RouterInner { + layer_compose: TaskLayers, + service_manifests: HashMap, + routes: HashMap>, +} + +impl Router { + /// Returns an empty [`Router`]. + #[inline] + pub fn new(layers: TaskLayers) -> Self { + let router_inner = RouterInner { + layer_compose: layers, + service_manifests: HashMap::default(), + routes: HashMap::new(), + }; + + Self { + inner: Arc::new(router_inner), + } + } + + fn inspect_inner_mut(self, f: F) -> Self + where + F: FnOnce(&mut RouterInner), + { + let inner = Arc::try_unwrap(self.inner); + let mut inner = inner.unwrap_or_else(|x| (*x).clone()); + f(&mut inner); + Self { + inner: Arc::new(inner), + } + } + + /// Overrides the default value of [`Router`]`::layer_compose`. + pub fn with_layers(self, layers: TaskLayers) -> Self { + self.inspect_inner_mut(|x| { + x.layer_compose = layers; + }) + } + + /// Registers multiple [`ServiceManifest`]s by their [`ServiceIndex`]es. + pub fn with_services(self, services: Vec) -> Self { + self.inspect_inner_mut(|x| { + for service_manifest in services { + let service_index = ServiceIndex::new(&service_manifest.service_id); + x.service_manifests.insert(service_index, service_manifest); + } + }) + } + + /// Registers another [`ServiceManifest`] by its [`ServiceIndex`]. + pub fn with_service(self, service_manifest: impl Into) -> Self { + self.with_service_impl(service_manifest.into()) + } + + fn with_service_impl(self, service_manifest: ServiceManifest) -> Self { + self.inspect_inner_mut(|x| { + let service_index = ServiceIndex::new(&service_manifest.service_id); + x.service_manifests.insert(service_index, service_manifest); + }) + } + + /// Registers multiple [`TaskHandler`]s by their [`TaskIndex`]es. + pub fn with_routes(self, routes: Vec<(TaskManifest, TaskHandler)>) -> Self { + self.inspect_inner_mut(|x| { + for (task_manifest, task_handler) in routes { + let task_index = TaskIndex::new(&task_manifest.route_id); + let route = Route::new(task_handler, task_manifest) + .expect("should not provide malformed manifests"); + x.routes.insert(task_index, route); + } + }) + } + + /// Registers another [`TaskHandler`] by its [`TaskIndex`]. + pub fn with_route( + self, + task_manifest: impl Into, + task_handler: impl Into>, + ) -> Self { + self.with_route_impl(task_handler.into(), task_manifest.into()) + } + + fn with_route_impl(self, task_handler: TaskHandler, task_manifest: TaskManifest) -> Self { + self.inspect_inner_mut(move |x| { + let task_index = TaskIndex::new(&task_manifest.route_id); + let route = Route::new(task_handler, task_manifest) + .expect("should not provide malformed manifests"); + x.routes.insert(task_index, route); + }) + } + + /// Executes the requested task handler with a given request. + /// + /// # Errors + /// + /// - Returns an error if the task wasn't found in the registry. + /// - Returns an error if the requested handler returns an error. + pub async fn route_task(&self, task_request: TaskRequest) -> TaskResult> + where + T: 'static + Send + Clone, + U: 'static + Send, + { + let task_index = TaskIndex::new(&task_request.task_id); + let task_handler = self.find_task_handler(task_index).ok_or_else(|| { + TaskError::new( + TaskErrorKind::NotFound, + "requested task identifier was not found", + ) + })?; + + self.route_task_with_handler(task_request, task_handler) + .await + } + + /// Executes the provided task handler with a given request. + /// + /// # Errors + /// + /// - Returns an error if the provided handler returns an error. + pub async fn route_task_with_handler( + &self, + mut task_request: TaskRequest, + task_handler: TaskHandler, + ) -> TaskResult> + where + T: 'static + Send + Clone, + U: 'static + Send, + { + let layer_compose = self.inner.layer_compose.clone(); + task_handler + .oneshot(task_request.with_layers(layer_compose)) + .await + } + + /// Returns the reference to the [`ServiceManifest`]. + pub fn find_service_manifest( + &self, + service_index: impl Into, + ) -> Option<&ServiceManifest> { + self.inner.service_manifests.get(&service_index.into()) + } + + /// Returns the reference to the [`TaskManifest`]. + pub fn find_task_manifest(&self, task_index: impl Into) -> Option<&TaskManifest> { + self.inner + .routes + .get(&task_index.into()) + .map(|r| r.manifest()) + } + + /// Returns the [`TaskHandler`] of the given task. + pub fn find_task_handler(&self, task_index: impl Into) -> Option> { + self.inner + .routes + .get(&task_index.into()) + .map(|r| r.task_handler()) + .cloned() + } + + /// Returns the [`TaskMetrics`] of the given task. + pub fn find_task_metrics(&self, task_index: impl Into) -> Option { + self.inner + .routes + .get(&task_index.into()) + .map(|r| r.task_handler_metrics()) + } + + /// Returns a new [`Registry`]. + /// + /// # Notes + /// + /// - Clones every [`ServiceManifest`]s and [`TaskManifest`]s. + pub fn as_registry(&self) -> Registry { + let routes = self.inner.routes.iter(); + Registry { + #[cfg(not(feature = "hashbrown"))] + services: self.inner.service_manifests.clone(), + #[cfg(feature = "hashbrown")] + services: self.inner.service_manifests.clone().into_iter().collect(), + tasks: routes + .map(|(i, r)| (i.clone(), r.manifest().clone())) + .collect(), + } + } +} + +impl fmt::Debug for Router { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Router").finish_non_exhaustive() + } +} + +impl Default for Router { + fn default() -> Self { + Self::new(TaskLayers::new()) + } +} + +impl Default for RouterInner { + fn default() -> Self { + Self { + layer_compose: TaskLayers::default(), + service_manifests: HashMap::default(), + routes: HashMap::default(), + } + } +} + +impl Clone for Router { + fn clone(&self) -> Self { + Self { + inner: self.inner.clone(), + } + } +} + +impl Clone for RouterInner { + fn clone(&self) -> Self { + Self { + layer_compose: self.layer_compose.clone(), + service_manifests: self.service_manifests.clone(), + routes: self.routes.clone(), + } + } +} + +/// Lists all registered services and tasks. +/// +/// Also see [`Router::as_registry`]. +/// +/// [`Router::as_registry`]: routing::Router::as_registry +#[derive(Debug, Default)] +pub struct Registry { + /// List of all registered services. + pub services: std::collections::HashMap, + /// List of all registered tasks. + pub tasks: std::collections::HashMap, +} + +#[cfg(test)] +mod test { + use std::time::Duration; + + use tower::{service_fn, ServiceBuilder}; + + use crate::routing::context::layers::TaskLayers; + use crate::routing::context::policies::{RetryPolicy, TimeoutPolicy}; + use crate::routing::context::{TaskRequest, TaskResponse, TaskResult}; + use crate::routing::handler::{TaskHandler, TaskHandlerLayer}; + use crate::routing::manifest::{ServiceManifest, TaskManifest}; + use crate::routing::Router; + + async fn handle_builtin0(request: TaskRequest) -> TaskResult> { + Ok(TaskResponse::new(request.into_inner())) + } + + async fn handle_builtin1(request: TaskRequest) -> TaskResult> { + Ok(TaskResponse::new(request.into_inner())) + } + + fn create_testing_router() -> Router { + let service0_manifest = ServiceManifest::new("service0"); + + let builtin0_manifest = TaskManifest::new("builtin0"); + let builtin0_service: TaskHandler = ServiceBuilder::new() + .layer(TaskHandlerLayer::default()) + .service(service_fn(handle_builtin0)); + + let builtin1_manifest = TaskManifest::new("builtin1"); + let builtin1_service: TaskHandler = ServiceBuilder::new() + .layer(TaskHandlerLayer::default()) + .service(service_fn(handle_builtin1)); + + let default_layers = TaskLayers::new() + .with_retry_policy(RetryPolicy::linear(3, Duration::from_secs(2))) + .with_timeout_policy(TimeoutPolicy::retry(Duration::from_secs(12))); + + Router::default() + .with_layers(default_layers) + .with_service(service0_manifest) + .with_route(builtin0_manifest, builtin0_service) + .with_route(builtin1_manifest, builtin1_service) + } + + #[tokio::test] + async fn simple_routing() -> TaskResult<()> { + let router = create_testing_router(); + let request = TaskRequest::new("builtin0", 5); + let response = router.route_task(request).await?; + assert_eq!(response.into_inner(), 5); + + Ok(()) + } +} diff --git a/crates/server/routing/route.rs b/crates/server/routing/route.rs new file mode 100644 index 0000000..a93f165 --- /dev/null +++ b/crates/server/routing/route.rs @@ -0,0 +1,238 @@ +//! [`Route`], [`TaskIndex`] and [`ServiceIndex`]. + +use std::borrow::Cow; +use std::fmt; +use std::sync::Arc; +use std::task::{Context, Poll}; + +use derive_more::{Deref, DerefMut, From}; +use ecow::EcoString; +use jsonschema::{draft202012, Validator}; +use serde_json::Value; +use tower::load::Load; +use tower::Service; + +use crate::routing::context::{TaskError, TaskRequest, TaskResponse, TaskResult}; +use crate::routing::handler::future::TaskFuture; +use crate::routing::handler::metric::TaskMetrics; +use crate::routing::handler::TaskHandler; +use crate::routing::manifest::{TaskManifest, TaskSchemas}; + +/// Routing structure that wraps [`TaskHandler`] with req/resp validation. +#[must_use = "routes do nothing unless you use them"] +pub struct Route { + inner: Arc>, +} + +#[must_use = "routes do nothing unless you use them"] +struct RouteHandler { + task_handler: TaskHandler, + schema_validators: TaskSchemaValidators, + manifest: TaskManifest, +} + +impl Route { + /// Creates a new [`Route`]. + pub fn new(task_handler: TaskHandler, task_manifest: TaskManifest) -> TaskResult { + let schema_validators = TaskSchemaValidators::new(task_manifest.schemas.as_ref())?; + + Ok(Self { + inner: Arc::new(RouteHandler { + task_handler, + schema_validators, + manifest: task_manifest, + }), + }) + } + + /// Returns the reference to the inner [`TaskHandler`]. + #[inline] + pub fn task_handler(&self) -> &TaskHandler { + &self.inner.task_handler + } + + /// Returns [`TaskMetrics`] of the inner [`TaskHandler`]. + #[inline] + pub fn task_handler_metrics(&self) -> TaskMetrics { + self.inner.task_handler.load() + } + + /// Returns the reference to the inner [`TaskManifest`]. + #[inline] + pub fn manifest(&self) -> &TaskManifest { + &self.inner.manifest + } +} + +impl fmt::Debug for Route { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Route").finish_non_exhaustive() + } +} + +impl Clone for Route { + fn clone(&self) -> Self { + Self { + inner: self.inner.clone(), + } + } +} + +impl Service> for Route +where + T: 'static + Send + Clone, + U: 'static + Send, +{ + type Response = TaskResponse; + type Error = TaskError; + type Future = TaskFuture; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + let mut handler = self.inner.task_handler.clone(); + handler.poll_ready(cx) + } + + fn call(&mut self, req: TaskRequest) -> Self::Future { + let this = self.clone(); + let fut = async move { + this.inner + .schema_validators + .validate_inputs(req.inputs.as_ref())?; + + let mut handler = this.inner.task_handler.clone(); + let response = handler.call(req).await; + + response + .and_then(|response| { + this.inner + .schema_validators + .validate_outputs(response.outputs.as_ref()) + .and(Ok(response)) + }) + .map_err(|error| { + match this + .inner + .schema_validators + .validate_errors(error.values.as_ref()) + { + Ok(_) => error, + Err(v_error) => v_error, + } + }) + }; + + TaskFuture::new(fut) + } +} + +/// Validators for task schemas to validate input, output, and error structures. +#[derive(Debug, Default)] +pub(crate) struct TaskSchemaValidators { + pub inputs: Option, + pub outputs: Option, + pub errors: Option, +} + +impl TaskSchemaValidators { + /// Returns a new `TaskSchemaValidators` or an error if any schema compilation fails. + pub(crate) fn new(schemas: Option<&TaskSchemas>) -> TaskResult { + let Some(schemas) = schemas else { + return Ok(TaskSchemaValidators::default()); + }; + + let inputs_schema = schemas.inputs_schema.as_ref(); + let outputs_schema = schemas.outputs_schema.as_ref(); + let errors_schema = schemas.errors_schema.as_ref(); + + Ok(TaskSchemaValidators { + inputs: inputs_schema.map(draft202012::new).transpose()?, + outputs: outputs_schema.map(draft202012::new).transpose()?, + errors: errors_schema.map(draft202012::new).transpose()?, + }) + } + + pub fn validate_inputs(&self, values: Option<&Value>) -> TaskResult<()> { + let Some(values) = values else { + return Ok(()); + }; + + let Some(schema) = self.inputs.as_ref() else { + return Ok(()); + }; + + schema.validate(values).map_err(From::from) + } + + pub fn validate_outputs(&self, values: Option<&Value>) -> TaskResult<()> { + let Some(values) = values else { + return Ok(()); + }; + + let Some(schema) = self.outputs.as_ref() else { + return Ok(()); + }; + + schema.validate(values).map_err(From::from) + } + + pub fn validate_errors(&self, values: Option<&Value>) -> TaskResult<()> { + let Some(values) = values else { + return Ok(()); + }; + + let Some(schema) = self.errors.as_ref() else { + return Ok(()); + }; + + schema.validate(values).map_err(From::from) + } +} + +/// Opaque and unique [`Service`] identifier. +/// +/// [`Service`]: crate::routing::manifest::ServiceManifest +#[derive(Debug, Clone, Eq, PartialEq, Hash, Deref, DerefMut)] +#[must_use = "indexes do nothing unless you serialize them"] +pub struct ServiceIndex { + inner: EcoString, +} + +impl ServiceIndex { + /// Returns a new [`ServiceIndex`]. + #[inline] + pub fn new(inner: impl AsRef) -> Self { + let inner = EcoString::from(inner.as_ref()); + Self { inner } + } + + /// Returns the underlying index. + #[inline] + pub fn into_inner(self) -> EcoString { + self.inner.clone() + } +} + +/// Opaque and unique [`TaskHandler`] identifier. +/// +/// [`TaskHandler`]: crate::handler::TaskHandler +#[derive(Debug, Clone, Eq, PartialEq, Hash, Deref, DerefMut, From)] +#[from(Cow<'static, str>, String, &'static str)] +#[must_use = "indexes do nothing unless you serialize them"] +pub struct TaskIndex { + inner: EcoString, +} + +impl TaskIndex { + /// Returns a new [`TaskIndex`]. + #[inline] + pub fn new(inner: impl AsRef) -> Self { + let inner = EcoString::from(inner.as_ref()); + Self { inner } + } + + /// Returns the underlying index. + #[inline] + pub fn into_inner(self) -> EcoString { + self.inner.clone() + } +} diff --git a/crates/server/service/config.rs b/crates/server/service/config.rs new file mode 100644 index 0000000..3f1ba0b --- /dev/null +++ b/crates/server/service/config.rs @@ -0,0 +1,42 @@ +/// App [`state`] configuration. +/// +/// [`state`]: crate::service::ServiceState +use clap::Args; +use serde::{Deserialize, Serialize}; + +/// App [`service`] configuration. +/// +/// [`service`]: crate::service +#[derive(Debug, Clone, Serialize, Deserialize, Args)] +#[must_use = "config does nothing unless you use it"] +pub struct ServiceConfig { + /// Task execution timeout (in seconds). + #[arg(long, default_value_t = 120)] + pub running_timeout: u64, + + /// Task retry attempts. + #[arg(long, default_value_t = 3)] + pub retrying_attempts: u32, + + /// Task retry timeout (in seconds). + #[arg(long, default_value_t = 10)] + pub retrying_timeout: u64, +} + +impl ServiceConfig { + /// Returns a new [`ServiceConfig`]. + #[inline] + pub fn new() -> Self { + Self::default() + } +} + +impl Default for ServiceConfig { + fn default() -> Self { + Self { + running_timeout: 120, + retrying_attempts: 3, + retrying_timeout: 10, + } + } +} diff --git a/crates/server/service/graph.rs b/crates/server/service/graph.rs new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/crates/server/service/graph.rs @@ -0,0 +1 @@ + diff --git a/crates/server/service/mod.rs b/crates/server/service/mod.rs new file mode 100644 index 0000000..3e6d691 --- /dev/null +++ b/crates/server/service/mod.rs @@ -0,0 +1,74 @@ +//! Application state and dependency injection. + +mod config; +mod graph; +mod source; + +use std::time::Duration; + +use crate::routing::context::layers::TaskLayers; +use crate::routing::context::policies::{RetryPolicy, TimeoutPolicy}; +use crate::routing::context::{TaskRequest, TaskResponse, TaskResult}; +use crate::routing::{Registry, RouteRequest, RouteResponse, Router}; +pub use crate::service::config::ServiceConfig; + +/// Application state. +/// +/// Used by [`handlers`]. +/// +/// [`handlers`]: crate::handler +#[derive(Debug, Clone)] +#[must_use = "state does nothing unless you use it"] +pub struct ServiceState { + router: Router, +} + +impl ServiceState { + /// Returns a new [`ServiceState`]. + pub fn new(config: ServiceConfig) -> Self { + let layers = TaskLayers::new() + .with_timeout_policy(TimeoutPolicy::retry(Duration::from_secs( + config.running_timeout, + ))) + .with_retry_policy(RetryPolicy::exponential( + config.retrying_attempts, + Duration::from_secs(config.retrying_timeout), + )); + + let router = Router::new(layers); + Self { router } + } +} + +impl Default for ServiceState { + fn default() -> Self { + Self { + router: Router::default(), + } + } +} + +/// Provides a mechanism for managing and executing tasks within a system. +pub trait RouterExt { + /// Executes the requested task handler with a given request. + fn route_task_request( + &self, + request: TaskRequest, + ) -> TaskResult>; + + /// Returns a new [`Registry`]. + fn get_task_registry(&self) -> TaskResult; +} + +impl RouterExt for ServiceState { + fn route_task_request( + &self, + request: TaskRequest, + ) -> TaskResult> { + self.router.route_task(request) + } + + fn get_task_registry(&self) -> TaskResult { + Ok(self.router.as_registry()) + } +} diff --git a/crates/server/service/source.rs b/crates/server/service/source.rs new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/crates/server/service/source.rs @@ -0,0 +1 @@ + diff --git a/deno.jsonc b/deno.jsonc new file mode 100644 index 0000000..6493337 --- /dev/null +++ b/deno.jsonc @@ -0,0 +1,25 @@ +{ + // https://docs.deno.com/runtime/manual/ + "compilerOptions": { + "strict": true + }, + "workspace": [ + "./modules/assert", + "./modules/runtime", + "./modules/testing" + ], + "lint": { + "include": ["./crates/", "./examples/", "./modules/"], + "exclude": ["./target/", "./debug/"], + "report": "compact" + }, + "test": { + "include": ["./examples/", "./modules/"], + "exclude": ["./target/", "./debug/"] + }, + "fmt": { + "include": ["./**/*.ts", "./**/*.js", "./**/*.md", "./*.json*"], + "exclude": ["./target/", "./debug/"], + "indentWidth": 4 + } +} diff --git a/docs/INSTALLATION.md b/docs/INSTALLATION.md new file mode 100644 index 0000000..e69de29 diff --git a/docs/REQUIREMENTS.md b/docs/REQUIREMENTS.md new file mode 100644 index 0000000..e69de29 diff --git a/examples/direct/deps.ts b/examples/direct/deps.ts new file mode 100644 index 0000000..e69de29 diff --git a/examples/direct/main.ts b/examples/direct/main.ts new file mode 100644 index 0000000..e69de29 diff --git a/examples/hello/deps.ts b/examples/hello/deps.ts new file mode 100644 index 0000000..e69de29 diff --git a/examples/hello/main.ts b/examples/hello/main.ts new file mode 100644 index 0000000..e69de29 diff --git a/modules/assert/README.md b/modules/assert/README.md new file mode 100644 index 0000000..ba40fc5 --- /dev/null +++ b/modules/assert/README.md @@ -0,0 +1,22 @@ +### @axiston/assert + +[![Build Status][action-badge]][action-url] +[![Crate Coverage][coverage-badge]][coverage-url] + +[action-badge]: https://img.shields.io/github/actions/workflow/status/axiston/runtime/build.yaml +[action-url]: https://github.com/axiston/runtime/actions/workflows/build.yaml +[coverage-badge]: https://img.shields.io/codecov/c/github/axiston/runtime +[coverage-url]: https://app.codecov.io/gh/axiston/runtime + +Lorem ipsum. Lorem ipsum. Lorem ipsum. + +#### Features + +- Lorem ipsum. +- Lorem ipsum. +- Lorem ipsum. + +#### Usage + +```typescript +``` diff --git a/modules/assert/deno.jsonc b/modules/assert/deno.jsonc new file mode 100644 index 0000000..fd8c661 --- /dev/null +++ b/modules/assert/deno.jsonc @@ -0,0 +1,13 @@ +{ + "name": "@axiston/assert", + "version": "0.1.0", + "exports": { + ".": "./mod.ts", + "./match": "./match.ts" + }, + "imports": { + "@std/assert": "jsr:@std/assert@^1.0.0", + "@std/internal": "jsr:@std/internal@^1.0.1", + "@std/text": "jsr:@std/text@^1.0.0" + } +} diff --git a/modules/assert/match.ts b/modules/assert/match.ts new file mode 100644 index 0000000..e69de29 diff --git a/modules/assert/match_test.ts b/modules/assert/match_test.ts new file mode 100644 index 0000000..e69de29 diff --git a/modules/assert/mod.ts b/modules/assert/mod.ts new file mode 100644 index 0000000..4e4de52 --- /dev/null +++ b/modules/assert/mod.ts @@ -0,0 +1 @@ +export * from "./match.ts"; diff --git a/modules/runtime/README.md b/modules/runtime/README.md new file mode 100644 index 0000000..41499e2 --- /dev/null +++ b/modules/runtime/README.md @@ -0,0 +1,22 @@ +### @axiston/runtime + +[![Build Status][action-badge]][action-url] +[![Crate Coverage][coverage-badge]][coverage-url] + +[action-badge]: https://img.shields.io/github/actions/workflow/status/axiston/runtime/build.yaml +[action-url]: https://github.com/axiston/runtime/actions/workflows/build.yaml +[coverage-badge]: https://img.shields.io/codecov/c/github/axiston/runtime +[coverage-url]: https://app.codecov.io/gh/axiston/runtime + +Lorem ipsum. Lorem ipsum. Lorem ipsum. + +#### Features + +- Lorem ipsum. +- Lorem ipsum. +- Lorem ipsum. + +#### Usage + +```typescript +``` diff --git a/modules/runtime/deno.jsonc b/modules/runtime/deno.jsonc new file mode 100644 index 0000000..3e32fc4 --- /dev/null +++ b/modules/runtime/deno.jsonc @@ -0,0 +1,15 @@ +{ + "name": "@axiston/runtime", + "version": "0.1.0", + "exports": { + ".": "./mod.ts", + "./lifecycle": "./lifecycle.ts", + "./request": "./request.ts", + "./response": "./response.ts" + }, + "imports": { + "@std/assert": "jsr:@std/assert@^1.0.0", + "@std/internal": "jsr:@std/internal@^1.0.1", + "@std/text": "jsr:@std/text@^1.0.0" + } +} diff --git a/modules/runtime/lifecycle.ts b/modules/runtime/lifecycle.ts new file mode 100644 index 0000000..e69de29 diff --git a/modules/runtime/lifecycle_test.ts b/modules/runtime/lifecycle_test.ts new file mode 100644 index 0000000..e69de29 diff --git a/modules/runtime/mod.ts b/modules/runtime/mod.ts new file mode 100644 index 0000000..ac5e76e --- /dev/null +++ b/modules/runtime/mod.ts @@ -0,0 +1,2 @@ +export * from "./request.ts"; +export * from "./response.ts"; diff --git a/modules/runtime/request.ts b/modules/runtime/request.ts new file mode 100644 index 0000000..e69de29 diff --git a/modules/runtime/request_test.ts b/modules/runtime/request_test.ts new file mode 100644 index 0000000..e69de29 diff --git a/modules/runtime/response.ts b/modules/runtime/response.ts new file mode 100644 index 0000000..e69de29 diff --git a/modules/runtime/response_test.ts b/modules/runtime/response_test.ts new file mode 100644 index 0000000..e69de29 diff --git a/modules/testing/README.md b/modules/testing/README.md new file mode 100644 index 0000000..461096b --- /dev/null +++ b/modules/testing/README.md @@ -0,0 +1,22 @@ +### @axiston/testing + +[![Build Status][action-badge]][action-url] +[![Crate Coverage][coverage-badge]][coverage-url] + +[action-badge]: https://img.shields.io/github/actions/workflow/status/axiston/runtime/build.yaml +[action-url]: https://github.com/axiston/runtime/actions/workflows/build.yaml +[coverage-badge]: https://img.shields.io/codecov/c/github/axiston/runtime +[coverage-url]: https://app.codecov.io/gh/axiston/runtime + +Lorem ipsum. Lorem ipsum. Lorem ipsum. + +#### Features + +- Lorem ipsum. +- Lorem ipsum. +- Lorem ipsum. + +#### Usage + +```typescript +``` diff --git a/modules/testing/deno.jsonc b/modules/testing/deno.jsonc new file mode 100644 index 0000000..0cca5de --- /dev/null +++ b/modules/testing/deno.jsonc @@ -0,0 +1,12 @@ +{ + "name": "@axiston/testing", + "version": "0.1.0", + "exports": { + ".": "./mod.ts", + "./setup": "./setup.ts" + }, + "imports": { + "@std/internal": "jsr:@std/internal@^1.0.1", + "@std/text": "jsr:@std/text@^1.0.0" + } +} diff --git a/modules/testing/mod.ts b/modules/testing/mod.ts new file mode 100644 index 0000000..ad29263 --- /dev/null +++ b/modules/testing/mod.ts @@ -0,0 +1 @@ +export * from "./setup.ts"; diff --git a/modules/testing/setup.ts b/modules/testing/setup.ts new file mode 100644 index 0000000..e69de29 diff --git a/modules/testing/setup_test.ts b/modules/testing/setup_test.ts new file mode 100644 index 0000000..e69de29 diff --git a/rustfmt.toml b/rustfmt.toml new file mode 100644 index 0000000..92d876f --- /dev/null +++ b/rustfmt.toml @@ -0,0 +1,3 @@ +group_imports = "StdExternalCrate" +imports_granularity = "Module" +# reorder_impl_items = true