Skip to content

Commit 119b2e1

Browse files
authored
feat: Add examples/video (#260)
* Add examples/video * Add README
1 parent 3d117f3 commit 119b2e1

File tree

4 files changed

+292
-0
lines changed

4 files changed

+292
-0
lines changed

Diff for: Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ members = [
1414
exclude = [
1515
"examples/concurrent",
1616
"examples/firehose",
17+
"examples/video",
1718
]
1819
resolver = "2"
1920

Diff for: examples/video/Cargo.toml

+12
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
[package]
2+
name = "video"
3+
version = "0.1.0"
4+
edition = "2021"
5+
6+
[dependencies]
7+
atrium-api = { version = "0.24.8", features = ["agent"] }
8+
atrium-xrpc-client.version = "0.5.10"
9+
clap = { version = "4.5.21", features = ["derive"] }
10+
serde = { version = "1.0", features = ["derive"] }
11+
serde_html_form = { version = "0.2.6", default-features = false }
12+
tokio = { version = "1.41.1", features = ["macros", "rt-multi-thread"] }

Diff for: examples/video/README.md

+14
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
## Example code for uploading a video
2+
3+
1. First, get a token by `com.atproto.server.getServiceAuth`.
4+
5+
2. Call uploadVideo against the video service (`video.bsky.app`) with the token.
6+
7+
3. Call `app.bsky.video.getJobStatus` against the video service as well.
8+
9+
(The same goes for `app.bsky.video.getUploadLimits`, which gets a token and calls the video service with it to get the data, but the process of checking this may be omitted.)
10+
11+
In Atrium:
12+
13+
- Since `AtpAgent` cannot process XRPC requests with the token obtained by `getServiceAuth`, we need to prepare a dedicated Client and create an `AtpServiceClient` that uses it.
14+
- The `app.bsky.video.uploadVideo` endpoint is special (weird?) and requires special hacks such as adding query parameters to the request URL and modifying the response to match the schema.

Diff for: examples/video/src/main.rs

+265
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,265 @@
1+
use atrium_api::{
2+
agent::{store::MemorySessionStore, AtpAgent},
3+
client::AtpServiceClient,
4+
types::{
5+
string::{Datetime, Did},
6+
Collection, TryIntoUnknown, Union,
7+
},
8+
xrpc::{
9+
http::{uri::Builder, Request, Response},
10+
types::AuthorizationToken,
11+
HttpClient, XrpcClient,
12+
},
13+
};
14+
use atrium_xrpc_client::reqwest::ReqwestClient;
15+
use clap::Parser;
16+
use serde::Serialize;
17+
use std::{fs::File, io::Read, path::PathBuf, time::Duration};
18+
use tokio::time;
19+
20+
const VIDEO_SERVICE: &str = "https://video.bsky.app";
21+
const VIDEO_SERVICE_DID: &str = "did:web:video.bsky.app";
22+
const UPLOAD_VIDEO_PATH: &str = "/xrpc/app.bsky.video.uploadVideo";
23+
24+
/// Simple program to upload videos by ATrium API agent.
25+
#[derive(Parser, Debug)]
26+
#[command(author, version, about, long_about = None)]
27+
struct Args {
28+
/// Identifier of the login user.
29+
#[arg(short, long)]
30+
identifier: String,
31+
/// App password of the login user.
32+
#[arg(short, long)]
33+
password: String,
34+
/// Video file to upload.
35+
#[arg(long, value_name = "VIDEO FILE")]
36+
video: PathBuf,
37+
}
38+
39+
#[derive(Serialize)]
40+
struct UploadParams {
41+
did: Did,
42+
name: String,
43+
}
44+
45+
struct VideoClient {
46+
token: String,
47+
params: Option<UploadParams>,
48+
inner: ReqwestClient,
49+
}
50+
51+
impl VideoClient {
52+
fn new(token: String, params: Option<UploadParams>) -> Self {
53+
Self {
54+
token,
55+
params,
56+
inner: ReqwestClient::new(
57+
// Actually, `base_uri` returns `VIDEO_SERVICE`, so there is no need to specify this.
58+
"https://dummy.example.com",
59+
),
60+
}
61+
}
62+
}
63+
64+
impl HttpClient for VideoClient {
65+
async fn send_http(
66+
&self,
67+
mut request: Request<Vec<u8>>,
68+
) -> Result<Response<Vec<u8>>, Box<dyn std::error::Error + Send + Sync + 'static>> {
69+
let is_upload_video = request.uri().path() == UPLOAD_VIDEO_PATH;
70+
// Hack: Append query parameters
71+
if is_upload_video {
72+
if let Some(params) = &self.params {
73+
*request.uri_mut() = Builder::from(request.uri().clone())
74+
.path_and_query(format!(
75+
"{UPLOAD_VIDEO_PATH}?{}",
76+
serde_html_form::to_string(params)?
77+
))
78+
.build()?;
79+
}
80+
}
81+
let mut response = self.inner.send_http(request).await;
82+
// Hack: Formatting an incorrect response body
83+
if is_upload_video {
84+
if let Ok(res) = response.as_mut() {
85+
*res.body_mut() =
86+
[b"{\"jobStatus\":".to_vec(), res.body().to_vec(), b"}".to_vec()].concat();
87+
}
88+
}
89+
response
90+
}
91+
}
92+
93+
impl XrpcClient for VideoClient {
94+
fn base_uri(&self) -> String {
95+
VIDEO_SERVICE.to_string()
96+
}
97+
async fn authorization_token(&self, _: bool) -> Option<AuthorizationToken> {
98+
Some(AuthorizationToken::Bearer(self.token.clone()))
99+
}
100+
}
101+
102+
#[tokio::main]
103+
async fn main() -> Result<(), Box<dyn std::error::Error>> {
104+
let args = Args::parse();
105+
// Read video file
106+
let data = {
107+
let mut file = File::open(&args.video)?;
108+
let mut buf = Vec::new();
109+
file.read_to_end(&mut buf)?;
110+
buf
111+
};
112+
113+
// Login
114+
println!("Logging in...");
115+
let agent =
116+
AtpAgent::new(ReqwestClient::new("https://bsky.social"), MemorySessionStore::default());
117+
let session = agent.login(&args.identifier, &args.password).await?;
118+
119+
// Check upload limits
120+
println!("Checking upload limits...");
121+
let limits = {
122+
let service_auth = agent
123+
.api
124+
.com
125+
.atproto
126+
.server
127+
.get_service_auth(
128+
atrium_api::com::atproto::server::get_service_auth::ParametersData {
129+
aud: VIDEO_SERVICE_DID.parse().expect("invalid DID"),
130+
exp: None,
131+
lxm: atrium_api::app::bsky::video::get_upload_limits::NSID.parse().ok(),
132+
}
133+
.into(),
134+
)
135+
.await?;
136+
let client = AtpServiceClient::new(VideoClient::new(service_auth.data.token, None));
137+
client.service.app.bsky.video.get_upload_limits().await?
138+
};
139+
println!("{:?}", limits.data);
140+
if !limits.can_upload
141+
|| limits.remaining_daily_bytes.map_or(false, |remain| remain < data.len() as i64)
142+
|| limits.remaining_daily_videos.map_or(false, |remain| remain <= 0)
143+
{
144+
eprintln!("You cannot upload a video: {:?}", limits.data);
145+
return Ok(());
146+
}
147+
148+
// Upload video
149+
println!("Uploading video...");
150+
let output = {
151+
let service_auth = agent
152+
.api
153+
.com
154+
.atproto
155+
.server
156+
.get_service_auth(
157+
atrium_api::com::atproto::server::get_service_auth::ParametersData {
158+
aud: format!(
159+
"did:web:{}",
160+
agent.get_endpoint().await.strip_prefix("https://").unwrap()
161+
)
162+
.parse()
163+
.expect("invalid DID"),
164+
exp: None,
165+
lxm: atrium_api::com::atproto::repo::upload_blob::NSID.parse().ok(),
166+
}
167+
.into(),
168+
)
169+
.await?;
170+
171+
let filename = args
172+
.video
173+
.file_name()
174+
.and_then(|s| s.to_os_string().into_string().ok())
175+
.expect("failed to get filename");
176+
let client = AtpServiceClient::new(VideoClient::new(
177+
service_auth.data.token,
178+
Some(UploadParams { did: session.did.clone(), name: filename }),
179+
));
180+
client.service.app.bsky.video.upload_video(data).await?
181+
};
182+
println!("{:?}", output.job_status.data);
183+
184+
// Wait for the video to be uploaded
185+
let client = AtpServiceClient::new(ReqwestClient::new(VIDEO_SERVICE));
186+
let mut status = output.data.job_status.data;
187+
loop {
188+
status = client
189+
.service
190+
.app
191+
.bsky
192+
.video
193+
.get_job_status(
194+
atrium_api::app::bsky::video::get_job_status::ParametersData {
195+
job_id: status.job_id.clone(),
196+
}
197+
.into(),
198+
)
199+
.await?
200+
.data
201+
.job_status
202+
.data;
203+
println!("{status:?}");
204+
if status.blob.is_some()
205+
|| status.state == "JOB_STATE_CREATED"
206+
|| status.state == "JOB_STATE_FAILED"
207+
{
208+
break;
209+
}
210+
time::sleep(Duration::from_millis(100)).await;
211+
}
212+
let Some(video) = status.blob else {
213+
eprintln!("Failed to get blob: {status:?}");
214+
return Ok(());
215+
};
216+
if let Some(message) = status.message {
217+
println!("{message}");
218+
}
219+
220+
// Post to feed with the video
221+
println!("Video uploaded: {video:?}");
222+
let record = atrium_api::app::bsky::feed::post::RecordData {
223+
created_at: Datetime::now(),
224+
embed: Some(Union::Refs(
225+
atrium_api::app::bsky::feed::post::RecordEmbedRefs::AppBskyEmbedVideoMain(Box::new(
226+
atrium_api::app::bsky::embed::video::MainData {
227+
alt: Some(String::from("alt text")),
228+
aspect_ratio: None,
229+
captions: None,
230+
video,
231+
}
232+
.into(),
233+
)),
234+
)),
235+
entities: None,
236+
facets: None,
237+
labels: None,
238+
langs: None,
239+
reply: None,
240+
tags: None,
241+
text: String::new(),
242+
}
243+
.try_into_unknown()
244+
.expect("failed to convert record");
245+
let output = agent
246+
.api
247+
.com
248+
.atproto
249+
.repo
250+
.create_record(
251+
atrium_api::com::atproto::repo::create_record::InputData {
252+
collection: atrium_api::app::bsky::feed::Post::nsid(),
253+
record,
254+
repo: session.data.did.into(),
255+
rkey: None,
256+
swap_commit: None,
257+
validate: Some(true),
258+
}
259+
.into(),
260+
)
261+
.await?;
262+
println!("{:?}", output.data);
263+
264+
Ok(())
265+
}

0 commit comments

Comments
 (0)