From 02ef85fa81b74e1948c0f9c9c241f0f405fcc13b Mon Sep 17 00:00:00 2001 From: Elaina <48662592+oestradiol@users.noreply.github.com> Date: Thu, 19 Sep 2024 18:06:41 -0300 Subject: [PATCH] Formatting --- atrium-streams-client/src/client/mod.rs | 15 ++-- atrium-streams-client/src/client/tests.rs | 56 +++++++------ .../subscriptions/repositories/firehose.rs | 82 +++---------------- .../src/subscriptions/repositories/tests.rs | 76 ++++++----------- atrium-streams/src/client.rs | 2 +- .../src/subscriptions/frames/tests.rs | 9 +- atrium-streams/src/subscriptions/mod.rs | 5 +- 7 files changed, 80 insertions(+), 165 deletions(-) diff --git a/atrium-streams-client/src/client/mod.rs b/atrium-streams-client/src/client/mod.rs index 0d6a048d..29e694e7 100644 --- a/atrium-streams-client/src/client/mod.rs +++ b/atrium-streams-client/src/client/mod.rs @@ -43,7 +43,10 @@ type StreamKind = WebSocketStream>; impl EventStreamClient<::Item, Error> for WssClient

{ - async fn connect(&self, mut uri: String) -> Result::Item>, Error> { + async fn connect( + &self, + mut uri: String, + ) -> Result::Item>, Error> { let Self { params } = self; // Query parameters @@ -66,9 +69,7 @@ impl EventStreamClient<::Item, fn get_host(uri: &str) -> Result<(Uri, Box), Error> { let uri = Uri::from_str(uri).map_err(|_| Error::InvalidUri)?; let authority = uri.authority().ok_or_else(|| Error::InvalidUri)?.as_str(); - let host = authority - .find('@') - .map_or_else(|| authority, |idx| authority.split_at(idx + 1).1); + let host = authority.find('@').map_or_else(|| authority, |idx| authority.split_at(idx + 1).1); let host = Box::from(host); Ok((uri, host)) } @@ -76,7 +77,11 @@ fn get_host(uri: &str) -> Result<(Uri, Box), Error> { /// Generate a request for the given URI and host. /// It sets the necessary headers for a WebSocket connection, /// plus the client's `AtprotoProxy` and `AtprotoAcceptLabelers` headers. -async fn gen_request(client: &WssClient

, uri: &Uri, host: &str) -> Result, Error> { +async fn gen_request( + client: &WssClient

, + uri: &Uri, + host: &str, +) -> Result, Error> { let mut request = Request::builder() .uri(uri) .method("GET") diff --git a/atrium-streams-client/src/client/tests.rs b/atrium-streams-client/src/client/tests.rs index 57d27cb0..d7265ab3 100644 --- a/atrium-streams-client/src/client/tests.rs +++ b/atrium-streams-client/src/client/tests.rs @@ -3,8 +3,17 @@ use std::net::{Ipv4Addr, SocketAddr}; use atrium_streams::{atrium_api::com::atproto::sync::subscribe_repos, client::EventStreamClient}; use atrium_xrpc::http::{header::SEC_WEBSOCKET_KEY, HeaderMap, HeaderValue}; use futures::{SinkExt, StreamExt}; -use tokio::{net::{TcpListener, TcpStream}, runtime::Runtime}; -use tokio_tungstenite::{tungstenite::{handshake::server::{ErrorResponse, Request, Response}, Message}, WebSocketStream}; +use tokio::{ + net::{TcpListener, TcpStream}, + runtime::Runtime, +}; +use tokio_tungstenite::{ + tungstenite::{ + handshake::server::{ErrorResponse, Request, Response}, + Message, + }, + WebSocketStream, +}; use crate::WssClient; @@ -35,15 +44,13 @@ fn client() { Runtime::new().unwrap().block_on(fut); } -async fn wss_client(uri: &str) -> (WssClient, HeaderMap) { - let params = subscribe_repos::ParametersData { - cursor: None, - }; +async fn wss_client( + uri: &str, +) -> (WssClient, HeaderMap) { + let params = subscribe_repos::ParametersData { cursor: None }; + + let client = WssClient::builder().params(params).build(); - let client = WssClient::builder() - .params(params) - .build(); - let (uri, host) = get_host(uri).unwrap(); let req = gen_request(&client, &uri, &host).await.unwrap(); let headers = req.headers(); @@ -54,11 +61,9 @@ async fn wss_client(uri: &str) -> (WssClient, H async fn mock_wss_server() -> (WebSocketStream, HeaderMap, String) { let sock_addr = SocketAddr::from((Ipv4Addr::LOCALHOST, 3000)); - let listener = TcpListener::bind(sock_addr) - .await - .expect("Failed to bind to port!"); + let listener = TcpListener::bind(sock_addr).await.expect("Failed to bind to port!"); - let headers: HeaderMap; + let headers: HeaderMap; let route: String; let (stream, _) = listener.accept().await.unwrap(); let (headers_, route_, stream) = extract_headers(stream).await; @@ -68,21 +73,22 @@ async fn mock_wss_server() -> (WebSocketStream, HeaderMap, String) { (stream, headers, route) } -async fn extract_headers(raw_stream: TcpStream) -> (HeaderMap, String, WebSocketStream) { +async fn extract_headers( + raw_stream: TcpStream, +) -> (HeaderMap, String, WebSocketStream) { let mut headers: Option> = None; let mut route: Option = None; - let copy_headers_callback = |request: &Request, response: Response| -> Result { - headers = Some(request.headers().clone()); - route = Some(request.uri().path().to_owned()); - Ok(response) - }; + let copy_headers_callback = + |request: &Request, response: Response| -> Result { + headers = Some(request.headers().clone()); + route = Some(request.uri().path().to_owned()); + Ok(response) + }; - let stream = tokio_tungstenite::accept_hdr_async( - raw_stream, - copy_headers_callback, - ).await + let stream = tokio_tungstenite::accept_hdr_async(raw_stream, copy_headers_callback) + .await .expect("Error during the websocket handshake occurred"); (headers.unwrap(), route.unwrap(), stream) -} \ No newline at end of file +} diff --git a/atrium-streams-client/src/subscriptions/repositories/firehose.rs b/atrium-streams-client/src/subscriptions/repositories/firehose.rs index e061f617..e07bde2f 100644 --- a/atrium-streams-client/src/subscriptions/repositories/firehose.rs +++ b/atrium-streams-client/src/subscriptions/repositories/firehose.rs @@ -136,19 +136,8 @@ impl Handler for Firehose { &self, payload: subscribe_repos::Commit, ) -> Result>, Self::HandlingError> { - let CommitData { - blobs, - blocks, - commit, - ops, - repo, - rev, - seq, - since, - time, - too_big, - .. - } = payload.data; + let CommitData { blobs, blocks, commit, ops, repo, rev, seq, since, time, too_big, .. } = + payload.data; // If it is too big, the blocks and ops are not sent, so we skip the processing. let ops_opt = if too_big { @@ -172,15 +161,7 @@ impl Handler for Firehose { Ok(Some(ProcessedPayload { seq: Some(seq), - data: Self::ProcessedCommitData { - ops: ops_opt, - blobs, - commit, - repo, - rev, - since, - time, - }, + data: Self::ProcessedCommitData { ops: ops_opt, blobs, commit, repo, rev, since, time }, })) } @@ -189,12 +170,7 @@ impl Handler for Firehose { &self, payload: subscribe_repos::Identity, ) -> Result>, Self::HandlingError> { - let IdentityData { - did, - handle, - seq, - time, - } = payload.data; + let IdentityData { did, handle, seq, time } = payload.data; Ok(Some(ProcessedPayload { seq: Some(seq), data: Self::ProcessedIdentityData { did, handle, time }, @@ -206,21 +182,10 @@ impl Handler for Firehose { &self, payload: subscribe_repos::Account, ) -> Result>, Self::HandlingError> { - let AccountData { - did, - seq, - time, - active, - status, - } = payload.data; + let AccountData { did, seq, time, active, status } = payload.data; Ok(Some(ProcessedPayload { seq: Some(seq), - data: Self::ProcessedAccountData { - did, - active, - status, - time, - }, + data: Self::ProcessedAccountData { did, active, status, time }, })) } @@ -229,12 +194,7 @@ impl Handler for Firehose { &self, payload: subscribe_repos::Handle, ) -> Result>, Self::HandlingError> { - let HandleData { - did, - handle, - seq, - time, - } = payload.data; + let HandleData { did, handle, seq, time } = payload.data; Ok(Some(ProcessedPayload { seq: Some(seq), data: Self::ProcessedHandleData { did, handle, time }, @@ -246,19 +206,10 @@ impl Handler for Firehose { &self, payload: subscribe_repos::Migrate, ) -> Result>, Self::HandlingError> { - let MigrateData { - did, - migrate_to, - seq, - time, - } = payload.data; + let MigrateData { did, migrate_to, seq, time } = payload.data; Ok(Some(ProcessedPayload { seq: Some(seq), - data: Self::ProcessedMigrateData { - did, - migrate_to, - time, - }, + data: Self::ProcessedMigrateData { did, migrate_to, time }, })) } @@ -279,10 +230,7 @@ impl Handler for Firehose { &self, payload: subscribe_repos::Info, ) -> Result>, Self::HandlingError> { - Ok(Some(ProcessedPayload { - seq: None, - data: payload.data, - })) + Ok(Some(ProcessedPayload { seq: None, data: payload.data })) } } @@ -315,15 +263,9 @@ fn process_op( // Finds in the map the `Record` with the operation's CID and deserializes it. // If the item is not found, returns `None`. let record = match cid.as_ref().and_then(|c| map.get_mut(&c.0)) { - Some(item) => Some(serde_ipld_dagcbor::from_reader::( - Cursor::new(item), - )?), + Some(item) => Some(serde_ipld_dagcbor::from_reader::(Cursor::new(item))?), None => None, }; - Ok(Operation { - action, - path, - record, - }) + Ok(Operation { action, path, record }) } diff --git a/atrium-streams-client/src/subscriptions/repositories/tests.rs b/atrium-streams-client/src/subscriptions/repositories/tests.rs index c68059ed..d94dd78a 100644 --- a/atrium-streams-client/src/subscriptions/repositories/tests.rs +++ b/atrium-streams-client/src/subscriptions/repositories/tests.rs @@ -69,11 +69,7 @@ fn test_packet( packet: Option<(&str, &str)>, ) -> Option, HandledData), SubscriptionError>> { - let connection = mock_connection(if let Some(packet) = packet { - vec![packet] - } else { - vec![] - }); + let connection = mock_connection(if let Some(packet) = packet { vec![packet] } else { vec![] }); let subscription = gen_default_subscription(connection); @@ -99,10 +95,7 @@ fn gen_default_subscription( .enable_tombstone(true) .enable_info(true) .build(); - let subscription = Repositories::builder() - .connection(connection) - .handler(firehose) - .build(); + let subscription = Repositories::builder().connection(connection).handler(firehose).build(); subscription } @@ -117,9 +110,7 @@ fn disconnect() { #[test] fn invalid_packet() { if let SubscriptionError::Abort(msg) = - test_packet(Some(("{ not-a-header }", "{ not-a-payload }"))) - .unwrap() - .unwrap_err() + test_packet(Some(("{ not-a-header }", "{ not-a-payload }"))).unwrap().unwrap_err() { assert_eq!(msg, "Received invalid packet. Error: Utf8"); return; @@ -135,10 +126,7 @@ fn commit() { data: Some(CommitData { blobs: vec![], blocks: vec![], - commit: CidLink(Cid::new_v1( - 0x70, - Multihash::<64>::wrap(0x12, &[0; 64]).unwrap(), - )), + commit: CidLink(Cid::new_v1(0x70, Multihash::<64>::wrap(0x12, &[0; 64]).unwrap())), ops: vec![], prev: None, rebase: false, @@ -152,9 +140,8 @@ fn commit() { extra_data: Ipld::Null, }; let body = serde_json::to_string(&body).unwrap(); - let (seq, data) = test_packet(Some((r##"{ "op": 1, "t": "#commit" }"##, &body))) - .unwrap() - .unwrap(); + let (seq, data) = + test_packet(Some((r##"{ "op": 1, "t": "#commit" }"##, &body))).unwrap().unwrap(); assert_eq!(seq, Some(99)); assert_eq!( format!("{:?}", data), @@ -187,9 +174,8 @@ fn identity() { extra_data: Ipld::Null, }; let body = serde_json::to_string(&body).unwrap(); - let (seq, data) = test_packet(Some((r##"{ "op": 1, "t": "#identity" }"##, &body))) - .unwrap() - .unwrap(); + let (seq, data) = + test_packet(Some((r##"{ "op": 1, "t": "#identity" }"##, &body))).unwrap().unwrap(); assert_eq!(seq, Some(99)); assert_eq!( format!("{:?}", data), @@ -218,9 +204,8 @@ fn account() { extra_data: Ipld::Null, }; let body = serde_json::to_string(&body).unwrap(); - let (seq, data) = test_packet(Some((r##"{ "op": 1, "t": "#account" }"##, &body))) - .unwrap() - .unwrap(); + let (seq, data) = + test_packet(Some((r##"{ "op": 1, "t": "#account" }"##, &body))).unwrap().unwrap(); assert_eq!(seq, Some(99)); assert_eq!( format!("{:?}", data), @@ -249,9 +234,8 @@ fn handle() { extra_data: Ipld::Null, }; let body = serde_json::to_string(&body).unwrap(); - let (seq, data) = test_packet(Some((r##"{ "op": 1, "t": "#handle" }"##, &body))) - .unwrap() - .unwrap(); + let (seq, data) = + test_packet(Some((r##"{ "op": 1, "t": "#handle" }"##, &body))).unwrap().unwrap(); assert_eq!(seq, Some(99)); assert_eq!( format!("{:?}", data), @@ -279,9 +263,8 @@ fn migrate() { extra_data: Ipld::Null, }; let body = serde_json::to_string(&body).unwrap(); - let (seq, data) = test_packet(Some((r##"{ "op": 1, "t": "#migrate" }"##, &body))) - .unwrap() - .unwrap(); + let (seq, data) = + test_packet(Some((r##"{ "op": 1, "t": "#migrate" }"##, &body))).unwrap().unwrap(); assert_eq!(seq, Some(99)); assert_eq!( format!("{:?}", data), @@ -308,9 +291,8 @@ fn tombstone() { extra_data: Ipld::Null, }; let body = serde_json::to_string(&body).unwrap(); - let (seq, data) = test_packet(Some((r##"{ "op": 1, "t": "#tombstone" }"##, &body))) - .unwrap() - .unwrap(); + let (seq, data) = + test_packet(Some((r##"{ "op": 1, "t": "#tombstone" }"##, &body))).unwrap().unwrap(); assert_eq!(seq, Some(99)); assert_eq!( format!("{:?}", data), @@ -333,9 +315,8 @@ fn info() { extra_data: Ipld::Null, }; let body = serde_json::to_string(&body).unwrap(); - let (seq, data) = test_packet(Some((r##"{ "op": 1, "t": "#info" }"##, &body))) - .unwrap() - .unwrap(); + let (seq, data) = + test_packet(Some((r##"{ "op": 1, "t": "#info" }"##, &body))).unwrap().unwrap(); assert_eq!(seq, None); assert_eq!( format!("{:?}", data), @@ -349,11 +330,8 @@ fn info() { #[test] fn ignored_frame() { - if test_packet(Some(( - r##"{ "op": 1, "t": "#non-existent" }"##, - r#"{ "foo": "bar" }"#, - ))) - .is_none() + if test_packet(Some((r##"{ "op": 1, "t": "#non-existent" }"##, r#"{ "foo": "bar" }"#))) + .is_none() { return; } @@ -366,10 +344,7 @@ fn invalid_body() { data: Some(CommitData { blobs: vec![], blocks: vec![1], // Invalid CAR file - commit: CidLink(Cid::new_v1( - 0x70, - Multihash::<64>::wrap(0x12, &[0; 64]).unwrap(), - )), + commit: CidLink(Cid::new_v1(0x70, Multihash::<64>::wrap(0x12, &[0; 64]).unwrap())), ops: vec![], prev: None, rebase: false, @@ -384,9 +359,7 @@ fn invalid_body() { }; let body = serde_json::to_string(&body).unwrap(); if let SubscriptionError::Abort(msg) = - test_packet(Some((r##"{ "op": 1, "t": "#commit" }"##, &body))) - .unwrap() - .unwrap_err() + test_packet(Some((r##"{ "op": 1, "t": "#commit" }"##, &body))).unwrap().unwrap_err() { assert_eq!( msg, @@ -447,10 +420,7 @@ fn unknown_error() { fn empty_payload() { let res = test_packet(Some((r##"{ "op": 1, "t": "#commit" }"##, r#""#))); if let SubscriptionError::Abort(msg) = res.unwrap().unwrap_err() { - assert_eq!( - "Received empty payload for header: {\"op\": 1, \"t\": \"#commit\"}", - &msg - ); + assert_eq!("Received empty payload for header: {\"op\": 1, \"t\": \"#commit\"}", &msg); return; } panic!("Expected Empty Payload") diff --git a/atrium-streams/src/client.rs b/atrium-streams/src/client.rs index 1f656749..5226c194 100644 --- a/atrium-streams/src/client.rs +++ b/atrium-streams/src/client.rs @@ -10,7 +10,7 @@ pub trait EventStreamClient { /// [`Result`] fn connect( &self, - uri: String + uri: String, ) -> impl Future, ConnectionError>> + Send; /// Get the `atproto-proxy` header. diff --git a/atrium-streams/src/subscriptions/frames/tests.rs b/atrium-streams/src/subscriptions/frames/tests.rs index bfab5a9d..10804549 100644 --- a/atrium-streams/src/subscriptions/frames/tests.rs +++ b/atrium-streams/src/subscriptions/frames/tests.rs @@ -7,10 +7,7 @@ fn serialized_data(s: &str) -> Vec { b'a'..=b'f' => b - b'a' + 10, _ => unreachable!(), }; - s.as_bytes() - .chunks(2) - .map(|b| (b2u(b[0]) << 4) + b2u(b[1])) - .collect() + s.as_bytes().chunks(2).map(|b| (b2u(b[0]) << 4) + b2u(b[1])).collect() } #[test] @@ -21,9 +18,7 @@ fn deserialize_message_frame_header() { let result = FrameHeader::try_from(ipld); assert_eq!( result.expect("failed to deserialize"), - FrameHeader::Message { - t: String::from("#commit") - } + FrameHeader::Message { t: String::from("#commit") } ); } diff --git a/atrium-streams/src/subscriptions/mod.rs b/atrium-streams/src/subscriptions/mod.rs index 67beca0a..46f113b7 100644 --- a/atrium-streams/src/subscriptions/mod.rs +++ b/atrium-streams/src/subscriptions/mod.rs @@ -53,10 +53,7 @@ pub struct ProcessedPayload { /// Helper function to convert between payload kinds. impl ProcessedPayload { pub fn map NewKind>(self, f: F) -> ProcessedPayload { - ProcessedPayload { - seq: self.seq, - data: f(self.data), - } + ProcessedPayload { seq: self.seq, data: f(self.data) } } }