Skip to content

Commit 2d04683

Browse files
committed
messages
1 parent c4edaa7 commit 2d04683

File tree

2 files changed

+72
-43
lines changed

2 files changed

+72
-43
lines changed

src/datatrove/pipeline/dedup/fast_mh3/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ edition = "2021"
55

66
[dependencies]
77
indicatif = "0.17.7"
8+
tokio-retry = "0.3"
89
# AWS SDK
910
aws-config = { version = "1.1.1", features = ["behavior-version-latest"] }
1011
aws-sdk-s3 = "1.1.1"

src/datatrove/pipeline/dedup/fast_mh3/src/main.rs

Lines changed: 71 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,24 @@ use clap::Parser;
99
use indicatif::{ProgressBar, ProgressStyle};
1010
use tokio::task;
1111
use std::sync::{Arc, Mutex};
12+
use tokio_retry::Retry;
13+
use tokio_retry::strategy::{ExponentialBackoff, jitter};
14+
use std::time::Duration;
15+
16+
async fn with_retry<F, Fut, T>(f: F) -> Result<T>
17+
where
18+
F: Fn() -> Fut,
19+
Fut: std::future::Future<Output = Result<T>>,
20+
{
21+
let retry_strategy = ExponentialBackoff::from_millis(1000)
22+
.max_delay(Duration::from_secs(30))
23+
.map(jitter)
24+
.take(3);
25+
26+
Retry::spawn(retry_strategy, || async {
27+
f().await
28+
}).await
29+
}
1230

1331
#[derive(Parser, Debug)]
1432
#[command(version, about, long_about = None)]
@@ -116,13 +134,15 @@ impl S3StreamWriter {
116134
key: &str,
117135
buffer_threshold: usize,
118136
) -> Result<Self> {
119-
let create_multipart_upload_output = client
120-
.create_multipart_upload()
121-
.bucket(bucket)
122-
.key(key)
123-
.send()
124-
.await
125-
.context("Failed to create multipart upload")?;
137+
let create_multipart_upload_output = with_retry(|| async {
138+
client
139+
.create_multipart_upload()
140+
.bucket(bucket)
141+
.key(key)
142+
.send()
143+
.await
144+
.context("Failed to create multipart upload")
145+
}).await?;
126146

127147
Ok(Self {
128148
client: client.clone(),
@@ -151,18 +171,20 @@ impl S3StreamWriter {
151171
return Ok(());
152172
}
153173

154-
let part_body = ByteStream::from(self.buffer.clone());
155-
let upload_part_output = self
156-
.client
157-
.upload_part()
158-
.bucket(&self.bucket)
159-
.key(&self.key)
160-
.upload_id(&self.upload_id)
161-
.part_number(self.part_number)
162-
.body(part_body)
163-
.send()
164-
.await
165-
.context("Failed to upload part")?;
174+
let buffer_clone = self.buffer.clone();
175+
let upload_part_output = with_retry(|| async {
176+
let part_body = ByteStream::from(buffer_clone.clone());
177+
self.client
178+
.upload_part()
179+
.bucket(&self.bucket)
180+
.key(&self.key)
181+
.upload_id(&self.upload_id)
182+
.part_number(self.part_number)
183+
.body(part_body)
184+
.send()
185+
.await
186+
.context("Failed to upload part")
187+
}).await?;
166188

167189
let completed_part = CompletedPart::builder()
168190
.e_tag(upload_part_output.e_tag().unwrap_or_default())
@@ -180,31 +202,35 @@ impl S3StreamWriter {
180202
self.flush().await?;
181203

182204
let completed_multipart_upload = CompletedMultipartUpload::builder()
183-
.set_parts(Some(self.completed_parts))
205+
.set_parts(Some(self.completed_parts.clone()))
184206
.build();
185207

186-
self.client
187-
.complete_multipart_upload()
188-
.bucket(&self.bucket)
189-
.key(&self.key)
190-
.upload_id(&self.upload_id)
191-
.multipart_upload(completed_multipart_upload)
192-
.send()
193-
.await
194-
.context("Failed to complete multipart upload")?;
208+
with_retry(|| async {
209+
self.client
210+
.complete_multipart_upload()
211+
.bucket(&self.bucket)
212+
.key(&self.key)
213+
.upload_id(&self.upload_id)
214+
.multipart_upload(completed_multipart_upload.clone())
215+
.send()
216+
.await
217+
.context("Failed to complete multipart upload")
218+
}).await?;
195219

196220
Ok(())
197221
}
198222
}
199223

200224
async fn list_s3_files(client: &Client, s3_path: &S3Path, total_files: usize) -> Result<Vec<String>> {
201-
let resp = client
202-
.list_objects_v2()
203-
.bucket(&s3_path.bucket)
204-
.prefix(&s3_path.prefix)
205-
.send()
206-
.await
207-
.context("Failed to list S3 objects")?;
225+
let resp = with_retry(|| async {
226+
client
227+
.list_objects_v2()
228+
.bucket(&s3_path.bucket)
229+
.prefix(&s3_path.prefix)
230+
.send()
231+
.await
232+
.context("Failed to list S3 objects")
233+
}).await?;
208234

209235
let files: Vec<String> = resp
210236
.contents()
@@ -229,13 +255,15 @@ async fn list_s3_files(client: &Client, s3_path: &S3Path, total_files: usize) ->
229255
async fn download_and_parse_file(client: &Client, file_path: &str) -> Result<Vec<(u32, u32, u32, u32)>> {
230256
let s3_path = S3Path::from_path(file_path)?;
231257

232-
let resp = client
233-
.get_object()
234-
.bucket(&s3_path.bucket)
235-
.key(&s3_path.prefix)
236-
.send()
237-
.await
238-
.context("Failed to download S3 object")?;
258+
let resp = with_retry(|| async {
259+
client
260+
.get_object()
261+
.bucket(&s3_path.bucket)
262+
.key(&s3_path.prefix)
263+
.send()
264+
.await
265+
.context("Failed to download S3 object")
266+
}).await?;
239267

240268
let body = resp.body.collect().await?.into_bytes();
241269
let mut reader = Cursor::new(body);

0 commit comments

Comments
 (0)