Skip to content

Commit

Permalink
Fix firehose
Browse files Browse the repository at this point in the history
  • Loading branch information
sugyan committed Feb 8, 2024
1 parent d1c19a4 commit 6673618
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 32 deletions.
3 changes: 2 additions & 1 deletion examples/firehose/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ edition = "2021"

[dependencies]
atrium-xrpc-server = { path = "../../atrium-xrpc-server" }
atrium-api = "0.15"
atrium-api = { path = "../../atrium-api", default-features = false, features = ["dag-cbor"]}
ciborium = "0.2.1"
futures = "0.3.28"
rs-car = "0.4.1"
tokio = { version = "1.28.1", features = ["full"] }
tokio-tungstenite = { version = "0.21.0", features = ["native-tls"] }
serde_ipld_dagcbor = { git = "https://github.com/sugyan/serde_ipld_dagcbor.git", rev = "345b240" }
55 changes: 24 additions & 31 deletions examples/firehose/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
use atrium_api::app::bsky::feed::post::Record;
use atrium_api::com::atproto::sync::subscribe_repos::Message;
use atrium_api::com::atproto::sync::subscribe_repos::{Message, NSID};
use atrium_api::types::CidLink;
use atrium_xrpc_server::stream::frames::Frame;
use futures::StreamExt;
use tokio_tungstenite::{connect_async, tungstenite};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let (mut stream, _) =
connect_async("wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos").await?;

let bgs = "bsky.network";
let (mut stream, _) = connect_async(format!("wss://{bgs}/xrpc/{NSID}")).await?;
while let Some(Ok(tungstenite::Message::Binary(message))) = stream.next().await {
process_message(&message).await.unwrap();
}
Expand All @@ -18,35 +18,28 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
async fn process_message(message: &[u8]) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
match Frame::try_from(message)? {
Frame::Message(message) => {
match message.body {
Message::Commit(commit) => {
for op in commit.ops {
let collection = op.path.split('/').next().expect("op.path is empty");
if op.action != "create" || collection != "app.bsky.feed.post" {
continue;
}
let (items, _) =
rs_car::car_read_all(&mut commit.blocks.as_slice(), true).await?;
if let Some((_, item)) = items.iter().find(|(cid, _)| Some(*cid) == op.cid)
{
if let Ok(value) =
ciborium::de::from_reader::<Record, _>(&mut item.as_slice())
{
println!("{}: {}", value.created_at, value.text);
} else {
println!("FAILED: could not deserialize post from item of length: {}", item.len());

}
} else {
println!(
"FAILED: could not find item with operation cid {:?} out of {} items",
op.cid,
items.len()
);
}
if let Message::Commit(commit) = message.body {
for op in commit.ops {
let collection = op.path.split('/').next().expect("op.path is empty");
if op.action != "create" || collection != "app.bsky.feed.post" {
continue;
}
let (items, _) =
rs_car::car_read_all(&mut commit.blocks.as_slice(), true).await?;
if let Some((_, item)) =
items.iter().find(|(cid, _)| Some(CidLink(*cid)) == op.cid)
{
let record =
serde_ipld_dagcbor::from_reader::<Record, _>(&mut item.as_slice())?;
println!("{}: {}", record.created_at, record.text);
} else {
panic!(
"FAILED: could not find item with operation cid {:?} out of {} items",
op.cid,
items.len()
);
}
}
_ => unimplemented!("{:?}", message.body),
}
}
Frame::Error(err) => panic!("{err:?}"),
Expand Down

0 comments on commit 6673618

Please sign in to comment.