Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for META flags and nonrecursive LIST #24

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions lighthouse-client/examples/admin_crud.rs
Original file line number Diff line number Diff line change
@@ -8,41 +8,41 @@ async fn run(lh: Lighthouse<TokioWebSocket>) -> Result<()> {
async {
_ = lh.delete(&["test"]).await;
_ = lh.mkdir(&["test"]).await; // TODO: No longer ignore once Beacon no longer 400s here
info!(tree = %lh.list(&["test"]).await?.payload);
info!(tree = %lh.list_tree(&["test"]).await?.payload);
Ok::<_, Error>(())
}.instrument(info_span!("Recreating test directory")).await?;

async {
lh.post(&["test", "a", "nested"], "Hello world!".to_string()).await?;
info!(tree = %lh.list(&["test"]).await?.payload);
info!(tree = %lh.list_tree(&["test"]).await?.payload);
Ok::<_, Error>(())
}.instrument(info_span!("Posting to test directory")).await?;

async {
_ = lh.create(&["test", "b"]).await; // TODO: No longer ignore once Beacon no longer 418s here
lh.link(&["test", "a", "nested"], &["test", "b"]).await?;
lh.put(&["test", "a", "nested"], "Another string".to_string()).await?;
info!(tree = %lh.list(&["test"]).await?.payload);
info!(tree = %lh.list_tree(&["test"]).await?.payload);
Ok::<_, Error>(())
}.instrument(info_span!("Linking to sibling resource")).await?;

async {
let result: String = lh.get(&["test", "b"]).await?.payload;
info!(result = result);
info!(tree = %lh.list(&["test"]).await?.payload);
info!(tree = %lh.list_tree(&["test"]).await?.payload);
Ok::<_, Error>(())
}.instrument(info_span!("Getting linked resource")).await?;

async {
lh.link(&["test", "a", "nested"], &["test", "b"]).await?;
info!(tree = %lh.list(&["test"]).await?.payload);
info!(tree = %lh.list_tree(&["test"]).await?.payload);
Ok::<_, Error>(())
}.instrument(info_span!("Unlinking sibling resource")).await?;

async {
let result: String = lh.get(&["test", "b"]).await?.payload;
info!(result = result);
info!(tree = %lh.list(&["test"]).await?.payload);
info!(tree = %lh.list_tree(&["test"]).await?.payload);
Ok::<_, Error>(())
}.instrument(info_span!("Getting unlinked resource")).await?;

24 changes: 13 additions & 11 deletions lighthouse-client/examples/admin_list_root.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,7 @@
use clap::Parser;
use lighthouse_client::{protocol::Authentication, root, Lighthouse, Result, TokioWebSocket, LIGHTHOUSE_URL};
use lighthouse_client::{protocol::Authentication, root, Lighthouse, Result, LIGHTHOUSE_URL};
use tracing::info;

async fn run(lh: Lighthouse<TokioWebSocket>) -> Result<()> {
info!("Connected to the Lighthouse server");

let tree = lh.list(root![]).await?.payload;
info!("Got {}", tree);

Ok(())
}

#[derive(Parser)]
struct Args {
/// The username.
@@ -22,6 +13,9 @@ struct Args {
/// The server URL.
#[arg(long, env = "LIGHTHOUSE_URL", default_value = LIGHTHOUSE_URL)]
url: String,
/// Whether to list only the first layer.
#[arg(short, long)]
nonrecursive: bool,
}

#[tokio::main(flavor = "current_thread")]
@@ -33,5 +27,13 @@ async fn main() -> Result<()> {
let auth = Authentication::new(&args.username, &args.token);
let lh = Lighthouse::connect_with_tokio_to(&args.url, auth).await?;

run(lh).await
let tree = if args.nonrecursive {
lh.list_dir(root![]).await
} else {
lh.list_tree(root![]).await
}?.payload;

info!("Got {}", tree);

Ok(())
}
46 changes: 27 additions & 19 deletions lighthouse-client/src/lighthouse.rs
Original file line number Diff line number Diff line change
@@ -2,7 +2,7 @@ use std::{collections::HashMap, fmt::Debug, sync::{atomic::{AtomicI32, Ordering}

use async_tungstenite::tungstenite::{Message, self};
use futures::{prelude::*, channel::mpsc::{Sender, self}, stream::{SplitSink, SplitStream}, lock::Mutex};
use lighthouse_protocol::{Authentication, ClientMessage, DirectoryTree, Frame, InputEvent, LaserMetrics, Model, ServerMessage, Value, Verb};
use lighthouse_protocol::{Authentication, ClientMessage, DirectoryTree, Frame, InputEvent, LaserMetrics, Meta, Model, ServerMessage, Value, Verb};
use serde::{Deserialize, Serialize};
use stream_guard::GuardStreamExt;
use tracing::{warn, error, debug, info};
@@ -158,77 +158,85 @@ impl<S> Lighthouse<S>
pub async fn post<P>(&self, path: &[impl AsRef<str> + Debug], payload: P) -> Result<ServerMessage<()>>
where
P: Serialize {
self.perform(&Verb::Post, path, payload).await
self.perform(&Verb::Post, path, Meta::default(), payload).await
}

/// Updates the resource at the given path with the given payload. Requires WRITE permission.
pub async fn put<P>(&self, path: &[impl AsRef<str> + Debug], payload: P) -> Result<ServerMessage<()>>
where
P: Serialize {
self.perform(&Verb::Put, path, payload).await
self.perform(&Verb::Put, path, Meta::default(), payload).await
}

/// Creates a resource at the given path. Requires CREATE permission.
pub async fn create(&self, path: &[impl AsRef<str> + Debug]) -> Result<ServerMessage<()>> {
self.perform(&Verb::Create, path, ()).await
self.perform(&Verb::Create, path, Meta::default(), ()).await
}

/// Deletes a resource at the given path. Requires DELETE permission.
pub async fn delete(&self, path: &[impl AsRef<str> + Debug]) -> Result<ServerMessage<()>> {
self.perform(&Verb::Delete, path, ()).await
self.perform(&Verb::Delete, path, Meta::default(), ()).await
}

/// Creates a directory at the given path. Requires CREATE permission.
pub async fn mkdir(&self, path: &[impl AsRef<str> + Debug]) -> Result<ServerMessage<()>> {
self.perform(&Verb::Mkdir, path, ()).await
self.perform(&Verb::Mkdir, path, Meta::default(), ()).await
}

/// Lists the directory at the given path. Requires READ permission.
pub async fn list_dir(&self, path: &[impl AsRef<str> + Debug]) -> Result<ServerMessage<DirectoryTree>> {
self.perform(&Verb::List, path, Meta {
nonrecursive: Some(true),
..Default::default()
}, ()).await
}

/// Lists the directory tree at the given path. Requires READ permission.
pub async fn list(&self, path: &[impl AsRef<str> + Debug]) -> Result<ServerMessage<DirectoryTree>> {
self.perform(&Verb::List, path, ()).await
pub async fn list_tree(&self, path: &[impl AsRef<str> + Debug]) -> Result<ServerMessage<DirectoryTree>> {
self.perform(&Verb::List, path, Meta::default(), ()).await
}

/// Gets the resource at the given path. Requires READ permission.
pub async fn get<R>(&self, path: &[impl AsRef<str> + Debug]) -> Result<ServerMessage<R>>
where
R: for<'de> Deserialize<'de> {
self.perform(&Verb::Get, path, ()).await
self.perform(&Verb::Get, path, Meta::default(), ()).await
}

/// Links the given source to the given destination path.
pub async fn link(&self, src_path: &[impl AsRef<str> + Debug], dest_path: &[impl AsRef<str> + Debug]) -> Result<ServerMessage<()>> {
self.perform(&Verb::Link, dest_path, src_path.iter().map(|s| s.as_ref().to_owned()).collect::<Vec<_>>()).await
self.perform(&Verb::Link, dest_path, Meta::default(), src_path.iter().map(|s| s.as_ref().to_owned()).collect::<Vec<_>>()).await
}

/// Unlinks the given source from the given destination path.
pub async fn unlink(&self, src_path: &[impl AsRef<str> + Debug], dest_path: &[impl AsRef<str> + Debug]) -> Result<ServerMessage<()>> {
self.perform(&Verb::Unlink, dest_path, src_path.iter().map(|s| s.as_ref().to_owned()).collect::<Vec<_>>()).await
self.perform(&Verb::Unlink, dest_path, Meta::default(), src_path.iter().map(|s| s.as_ref().to_owned()).collect::<Vec<_>>()).await
}

/// Stops the given stream. **Should generally not be called manually**,
/// since streams will automatically be stopped once dropped.
pub async fn stop(&self, request_id: i32, path: &[impl AsRef<str> + Debug]) -> Result<ServerMessage<()>> {
self.perform_with_id(request_id, &Verb::Stop, path, ()).await
self.perform_with_id(request_id, &Verb::Stop, path, Meta::default(), ()).await
}

/// Performs a single request to the given path with the given payload.
#[tracing::instrument(skip(self, payload))]
pub async fn perform<P, R>(&self, verb: &Verb, path: &[impl AsRef<str> + Debug], payload: P) -> Result<ServerMessage<R>>
pub async fn perform<P, R>(&self, verb: &Verb, path: &[impl AsRef<str> + Debug], meta: Meta, payload: P) -> Result<ServerMessage<R>>
where
P: Serialize,
R: for<'de> Deserialize<'de> {
let request_id = self.next_request_id();
self.perform_with_id(request_id, verb, path, payload).await
self.perform_with_id(request_id, verb, path, meta, payload).await
}

/// Performs a single request to the given path with the given request id.
#[tracing::instrument(skip(self, payload))]
async fn perform_with_id<P, R>(&self, request_id: i32, verb: &Verb, path: &[impl AsRef<str> + Debug], payload: P) -> Result<ServerMessage<R>>
async fn perform_with_id<P, R>(&self, request_id: i32, verb: &Verb, path: &[impl AsRef<str> + Debug], meta: Meta, payload: P) -> Result<ServerMessage<R>>
where
P: Serialize,
R: for<'de> Deserialize<'de> {
assert_ne!(verb, &Verb::Stream, "Lighthouse::perform may only be used for one-off requests, use Lighthouse::stream for streaming.");
self.send_request(request_id, verb, path, payload).await?;
self.send_request(request_id, verb, path, meta, payload).await?;
let response = self.receive_single(request_id).await?.check()?.decode_payload()?;
Ok(response)
}
@@ -242,7 +250,7 @@ impl<S> Lighthouse<S>
R: for<'de> Deserialize<'de> {
let request_id = self.next_request_id();
let path: Vec<String> = path.into_iter().map(|s| s.as_ref().to_string()).collect();
self.send_request(request_id, &Verb::Stream, &path, payload).await?;
self.send_request(request_id, &Verb::Stream, &path, Meta::default(), payload).await?;
let stream = self.receive_streaming(request_id).await?;
Ok(stream.guard({
// Stop the stream on drop
@@ -258,7 +266,7 @@ impl<S> Lighthouse<S>
}

/// Sends a request to the given path with the given payload.
async fn send_request<P>(&self, request_id: i32, verb: &Verb, path: &[impl AsRef<str> + Debug], payload: P) -> Result<i32>
async fn send_request<P>(&self, request_id: i32, verb: &Verb, path: &[impl AsRef<str> + Debug], meta: Meta, payload: P) -> Result<i32>
where
P: Serialize {
let path = path.into_iter().map(|s| s.as_ref().to_string()).collect();
@@ -267,7 +275,7 @@ impl<S> Lighthouse<S>
request_id,
authentication: self.authentication.clone(),
path,
meta: HashMap::new(),
meta,
verb: verb.clone(),
payload
}).await?;
6 changes: 2 additions & 4 deletions lighthouse-protocol/src/client_message.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
use std::collections::HashMap;

use serde::{Serialize, Deserialize};

use crate::{Authentication, Value, ValueError, Verb};
use crate::{Authentication, Meta, Value, ValueError, Verb};

/// A message originating from the lighthouse client.
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
@@ -14,7 +12,7 @@ pub struct ClientMessage<P> {
#[serde(rename = "PATH")]
pub path: Vec<String>,
#[serde(rename = "META")]
pub meta: HashMap<String, String>,
pub meta: Meta,
#[serde(rename = "AUTH")]
pub authentication: Authentication,
#[serde(rename = "PAYL")]
2 changes: 2 additions & 0 deletions lighthouse-protocol/src/lib.rs
Original file line number Diff line number Diff line change
@@ -2,6 +2,7 @@ mod authentication;
mod client_message;
mod constants;
mod frame;
mod meta;
mod input;
mod payload;
mod server_message;
@@ -12,6 +13,7 @@ pub use authentication::*;
pub use client_message::*;
pub use constants::*;
pub use frame::*;
pub use meta::*;
pub use input::*;
pub use payload::*;
pub use server_message::*;
11 changes: 11 additions & 0 deletions lighthouse-protocol/src/meta.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
use serde::{Deserialize, Serialize};

/// Additional flags set by the client.
#[derive(Debug, Default, Serialize, Deserialize, PartialEq, Eq, Clone, Copy)]
#[serde(rename_all = "UPPERCASE")]
pub struct Meta {
/// Setting this flag on a LIST request specifies that only the first level
/// should be listed (instead of the full tree).
#[serde(skip_serializing_if = "Option::is_none")]
pub nonrecursive: Option<bool>,
}