Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rust client main #210

Merged
merged 4 commits into from
Jun 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 76 additions & 53 deletions Cargo.lock

Large diffs are not rendered by default.

17 changes: 17 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,15 +1,32 @@
[workspace]
members = [
"crates/trycp_api",
"crates/trycp_client",
"crates/trycp_server",
"ts/test/fixture/zomes/coordinator",
"ts/test/fixture/zomes/integrity",
]
resolver = "2"

[workspace.dependencies]
futures = "0.3"
hdi = "0.4.1-rc.0"
hdk = "0.3.1-rc.0"
nix = { version = "0.28", features = ["signal"] }
once_cell = "1.5.0"
parking_lot = "0.12"
reqwest = { version = "0.12", default-features = false }
rmp-serde = "=0.15.5"
serde = "1.0.181"
serde_bytes = "0.11"
serde_json = "1.0.117"
slab = "0.4"
snafu = "0.6"
structopt = "0.2"
tokio = "1.38"
tokio-tungstenite = "0.21"
trycp_api = { version = "0.16.0-dev.7", path = "crates/trycp_api" }
url = "2"

[profile.dev]
opt-level = "z"
11 changes: 11 additions & 0 deletions crates/trycp_api/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
[package]
name = "trycp_api"
version = "0.16.0-dev.7"
description = "conductor provisioner API for tryorama"
license = "CAL-1.0"
edition = "2021"

[dependencies]
serde = { workspace = true, features = ["derive"] }
serde_bytes = { workspace = true }
serde_json = { workspace = true }
155 changes: 155 additions & 0 deletions crates/trycp_api/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
#![deny(missing_docs)]
//! Protocol for trycp_server websocket messages.

/// Requests must include a message id.
#[derive(serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "snake_case")]
pub struct RequestWrapper {
/// The message id.
pub id: u64,

/// The request content.
pub request: Request,
}

/// Trycp server requests.
#[derive(Debug, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "snake_case")]
#[serde(tag = "type")]
pub enum Request {
/// Given a DNA file, stores the DNA and returns the path at which it is stored.
SaveDna {
/// This is actually the dna filename.
id: String,

/// Content.
#[serde(with = "serde_bytes")]
content: Vec<u8>,
},

/// Given a DNA URL, ensures that the DNA is downloaded and returns the path at which it is stored.
DownloadDna {
/// Url.
url: String,
},

/// Set up a player.
ConfigurePlayer {
/// The player id.
id: String,

/// The Holochain configuration data that is not provided by trycp.
///
/// For example:
/// ```yaml
/// signing_service_uri: ~
/// encryption_service_uri: ~
/// decryption_service_uri: ~
/// dpki: ~
/// network: ~
/// ```
partial_config: String,
},

/// Start a conductor.
Startup {
/// The conductor id.
id: String,

/// The log level of the conductor.
log_level: Option<String>,
},

/// Shut down a conductor.
Shutdown {
/// The id of the conductor to shut down.
id: String,

/// The signal with which to shut down the conductor.
signal: Option<String>,
},

/// Shuts down all running conductors.
Reset,

/// Make an admin request.
CallAdminInterface {
/// The conductor id.
id: String,

/// The request.
#[serde(with = "serde_bytes")]
message: Vec<u8>,
},

/// Hook up an app interface.
ConnectAppInterface {
/// Token.
token: Vec<u8>,

/// Port.
port: u16,
},

/// Disconnect an app interface.
DisconnectAppInterface {
/// Port.
port: u16,
},

/// Make an ap request.
CallAppInterface {
/// Port.
port: u16,

/// The request.
#[serde(with = "serde_bytes")]
message: Vec<u8>,
},
}

/// Message response types.
#[derive(Debug, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "snake_case")]
#[serde(untagged)]
pub enum MessageResponse {
/// Unit response.
Null,

/// Encoded response.
Bytes(Vec<u8>)
}

impl MessageResponse {
/// Convert into bytes.
pub fn into_bytes(self) -> Vec<u8> {
match self {
Self::Null => Vec::new(),
Self::Bytes(v) => v,
}
}
}

/// A Message from a trycp_server.
#[derive(Debug, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "snake_case")]
#[serde(tag = "type")]
pub enum MessageToClient {
/// A signal emitted by a conductor.
Signal {
/// The app port from which this signal was emitted.
port: u16,

/// The content of the signal.
data: Vec<u8>,
},

/// A response to a trycp server request.
Response {
/// request message id.
id: u64,

/// message content.
response: std::result::Result<MessageResponse, String>,
},
}
14 changes: 14 additions & 0 deletions crates/trycp_client/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
[package]
name = "trycp_client"
version = "0.16.0-dev.7"
description = "Client for TryCP"
license = "CAL-1.0"
edition = "2021"

[dependencies]
futures = { workspace = true }
rmp-serde = { workspace = true }
serde_json = { workspace = true }
tokio = { workspace = true, features = [ "full" ] }
tokio-tungstenite = { workspace = true }
trycp_api = { workspace = true }
42 changes: 42 additions & 0 deletions crates/trycp_client/examples/start_stop.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
use trycp_client::*;

const ONE_MIN: std::time::Duration = std::time::Duration::from_secs(60);

#[tokio::main(flavor = "multi_thread")]
async fn main() {
let (c, _r) = TrycpClient::connect("ws://127.0.0.1:9000").await.unwrap();

c.request(Request::Reset, ONE_MIN).await.unwrap();

c.request(
Request::ConfigurePlayer {
id: "alice".to_string(),
partial_config: "".to_string(),
},
ONE_MIN,
)
.await
.unwrap();

c.request(
Request::Startup {
id: "alice".to_string(),
log_level: None,
},
ONE_MIN,
)
.await
.unwrap();

c.request(
Request::Shutdown {
id: "alice".to_string(),
signal: None,
},
ONE_MIN,
)
.await
.unwrap();

c.request(Request::Reset, ONE_MIN).await.unwrap();
}
138 changes: 138 additions & 0 deletions crates/trycp_client/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
#![deny(missing_docs)]
//! Trycp client.

use futures::{sink::SinkExt, stream::StreamExt};
use std::collections::HashMap;
use std::io::Result;
use std::sync::Arc;
use tokio_tungstenite::{
tungstenite::{client::IntoClientRequest, Message},
*,
};
pub use trycp_api::Request;
use trycp_api::*;

type WsCore = WebSocketStream<MaybeTlsStream<tokio::net::TcpStream>>;
type WsSink = futures::stream::SplitSink<WsCore, Message>;
type Ws = Arc<tokio::sync::Mutex<WsSink>>;

/// Signal emitted from a conductor.
pub struct Signal {
/// The app port from which this signal was emitted.
pub port: u16,

/// The content of the signal.
pub data: Vec<u8>,
}

/// Trycp client recv.
pub struct SignalRecv(tokio::sync::mpsc::Receiver<Signal>);

impl SignalRecv {
/// Receive.
pub async fn recv(&mut self) -> Option<Signal> {
self.0.recv().await
}
}

/// Trycp client.
pub struct TrycpClient {
ws: Ws,
pend: Arc<
std::sync::Mutex<HashMap<u64, tokio::sync::oneshot::Sender<Result<MessageResponse>>>>,
>,
recv_task: tokio::task::JoinHandle<()>,
}

impl Drop for TrycpClient {
fn drop(&mut self) {
let ws = self.ws.clone();
tokio::task::spawn(async move {
let _ = ws.lock().await.close().await;
});
self.recv_task.abort();
}
}

impl TrycpClient {
/// Connect to a remote trycp server.
pub async fn connect<R>(request: R) -> Result<(Self, SignalRecv)>
where
R: IntoClientRequest + Unpin,
{
let (w, _) = tokio_tungstenite::connect_async(request)
.await
.map_err(std::io::Error::other)?;

let (sink, mut stream) = w.split();

let map: HashMap<u64, tokio::sync::oneshot::Sender<Result<MessageResponse>>> =
HashMap::new();
let pend = Arc::new(std::sync::Mutex::new(map));

let (recv_send, recv_recv) = tokio::sync::mpsc::channel(32);

let pend2 = pend.clone();
let recv_task = tokio::task::spawn(async move {
while let Some(Ok(msg)) = stream.next().await {
let msg = msg.into_data();
let msg: MessageToClient = rmp_serde::from_slice(&msg).unwrap();

match msg {
MessageToClient::Signal { port, data } => {
recv_send.send(Signal { port, data }).await.unwrap();
}
MessageToClient::Response { id, response } => {
if let Some(resp) = pend2.lock().unwrap().remove(&id) {
let _ = resp.send(response.map_err(std::io::Error::other));
}
}
}
}
});

let ws = Arc::new(tokio::sync::Mutex::new(sink));

Ok((
Self {
ws,
pend,
recv_task,
},
SignalRecv(recv_recv),
))
}

/// Make a request of the trycp server.
pub async fn request(
&self,
request: Request,
timeout: std::time::Duration,
) -> Result<MessageResponse> {
static RID: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(1);
let mid = RID.fetch_add(1, std::sync::atomic::Ordering::Relaxed);

let (s, r) = tokio::sync::oneshot::channel();

self.pend.lock().unwrap().insert(mid, s);

let pend = self.pend.clone();
tokio::task::spawn(async move {
tokio::time::sleep(timeout).await;
pend.lock().unwrap().remove(&mid);
});

let request = RequestWrapper { id: mid, request };

let request = rmp_serde::to_vec_named(&request).map_err(std::io::Error::other)?;

self.ws
.lock()
.await
.send(Message::Binary(request))
.await
.map_err(std::io::Error::other)?;

r.await.map_err(|_| std::io::Error::other("Closed"))?
}
}
Loading
Loading