Skip to content

Commit 6a2c77a

Browse files
authored
s3: generated data for benchmarks (MaterializeInc#5503)
1 parent c2314d2 commit 6a2c77a

File tree

4 files changed

+289
-0
lines changed

4 files changed

+289
-0
lines changed

Cargo.lock

Lines changed: 77 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ members = [
3939
"test/metabase/smoketest",
4040
"test/performance/perf-kinesis",
4141
"test/performance/perf-upsert",
42+
"test/performance/s3-datagen",
4243
"test/smith",
4344
"test/test-util",
4445
]
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
[package]
2+
name = "s3-datagen"
3+
description = "Generate data in S3 for testing our S3 sources"
4+
version = "0.0.0"
5+
edition = "2018"
6+
publish = false
7+
8+
[dependencies]
9+
anyhow = "1.0.38"
10+
aws-util = { path = "../../../src/aws-util" }
11+
bytefmt = "0.1.7"
12+
futures = "0.3.12"
13+
indicatif = "0.15.0"
14+
ore = { path = "../../../src/ore" }
15+
rusoto_core = { git = "https://github.com/rusoto/rusoto.git" }
16+
rusoto_credential = { git = "https://github.com/rusoto/rusoto.git" }
17+
rusoto_s3 = { git = "https://github.com/rusoto/rusoto.git" }
18+
structopt = "0.3.21"
19+
tokio = { version = "1.0.0", features = ["macros", "net", "rt", "time"] }
20+
tracing = "0.1.22"
21+
tracing-subscriber = { version = "0.2.7", default-features = false, features = ["env-filter", "fmt"] }
Lines changed: 190 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,190 @@
1+
// Copyright Materialize, Inc. All rights reserved.
2+
//
3+
// Use of this software is governed by the Business Source License
4+
// included in the LICENSE file.
5+
//
6+
// As of the Change Date specified in that file, in accordance with
7+
// the Business Source License, use of this software will be governed
8+
// by the Apache License, Version 2.0.
9+
10+
use std::convert::TryInto;
11+
use std::default::Default;
12+
use std::io;
13+
use std::iter;
14+
15+
use anyhow::{anyhow, Context};
16+
use rusoto_core::RusotoError;
17+
use rusoto_s3::{
18+
CreateBucketConfiguration, CreateBucketError, CreateBucketRequest, PutObjectRequest, S3,
19+
};
20+
use structopt::StructOpt;
21+
use tracing::{error, info, Level};
22+
use tracing_subscriber::filter::EnvFilter;
23+
use tracing_subscriber::fmt;
24+
use tracing_subscriber::layer::SubscriberExt;
25+
use tracing_subscriber::util::SubscriberInitExt;
26+
27+
/// Generate meaningless data in S3 to test download speeds
28+
#[derive(StructOpt)]
29+
struct Args {
30+
/// How large to make each line (record) in Bytes
31+
#[structopt(short = "l", long)]
32+
line_bytes: usize,
33+
34+
/// How large to make each object, e.g. `1 KiB`
35+
#[structopt(
36+
short = "s",
37+
long,
38+
parse(try_from_str = parse_object_size)
39+
)]
40+
object_size: usize,
41+
42+
/// How many objects to create
43+
#[structopt(short = "c", long)]
44+
object_count: usize,
45+
46+
/// All objects will be inserted into this prefix
47+
#[structopt(short = "p", long)]
48+
key_prefix: String,
49+
50+
/// All objects will be inserted into this bucket
51+
#[structopt(short = "b", long)]
52+
bucket: String,
53+
54+
/// Which region to operate in
55+
#[structopt(short = "r", long, default_value = "us-east-2")]
56+
region: String,
57+
58+
/// Number of copy operations to run concurrently
59+
#[structopt(long, default_value = "50")]
60+
concurrent_copies: usize,
61+
}
62+
63+
#[tokio::main]
64+
async fn main() {
65+
if let Err(e) = run().await {
66+
error!("{:#}", e);
67+
std::process::exit(1);
68+
}
69+
}
70+
71+
async fn run() -> anyhow::Result<()> {
72+
let args: Args = ore::cli::parse_args();
73+
let env_filter = EnvFilter::try_from_env("MZ_LOG").or_else(|_| EnvFilter::try_new("info"))?;
74+
tracing_subscriber::registry()
75+
.with(env_filter)
76+
.with(fmt::layer().with_writer(io::stderr))
77+
.init();
78+
79+
info!(
80+
"starting up to create {} of data across {} objects in {}/{}",
81+
bytefmt::format((args.object_size * args.object_count).try_into()?),
82+
args.object_count,
83+
args.bucket,
84+
args.key_prefix
85+
);
86+
87+
let line = iter::repeat('A')
88+
.take(args.line_bytes)
89+
.chain(iter::once('\n'))
90+
.collect::<String>();
91+
let mut object_size = 0;
92+
let line_size = line.len();
93+
let object = iter::repeat(line)
94+
.take_while(|_| {
95+
object_size += line_size;
96+
object_size < args.object_size
97+
})
98+
.collect::<String>();
99+
100+
let conn_info =
101+
aws_util::aws::ConnectInfo::new(rusoto_core::Region::default(), None, None, None)?;
102+
103+
let client = aws_util::s3::client(conn_info)
104+
.await
105+
.context("creating s3 client")?;
106+
107+
let first_object_key = format!("{}{:>05}", args.key_prefix, 0);
108+
109+
let progressbar = indicatif::ProgressBar::new(args.object_count as u64);
110+
111+
client
112+
.create_bucket(CreateBucketRequest {
113+
bucket: args.bucket.clone(),
114+
create_bucket_configuration: Some(CreateBucketConfiguration {
115+
location_constraint: Some(args.region.clone()),
116+
}),
117+
..Default::default()
118+
})
119+
.await
120+
.map(|_| info!("created s3 bucket {}", args.bucket))
121+
.or_else(|e| {
122+
if matches!(
123+
e,
124+
RusotoError::Service(CreateBucketError::BucketAlreadyOwnedByYou(_))
125+
) {
126+
tracing::event!(Level::INFO, bucket = %args.bucket, "reusing existing bucket");
127+
Ok(())
128+
} else {
129+
Err(e)
130+
}
131+
})?;
132+
133+
let mut total_created = 0;
134+
client
135+
.put_object(PutObjectRequest {
136+
bucket: args.bucket.clone(),
137+
key: first_object_key.clone(),
138+
body: Some(object.into_bytes().into()),
139+
..Default::default()
140+
})
141+
.await?;
142+
total_created += 1;
143+
progressbar.inc(1);
144+
145+
let copy_source = format!("{}/{}", args.bucket, first_object_key.clone());
146+
147+
let mut copy_reqs = Vec::new();
148+
let pool_size = args.concurrent_copies;
149+
for i in 1..pool_size + 1 {
150+
copy_reqs.push(client.copy_object(rusoto_s3::CopyObjectRequest {
151+
bucket: args.bucket.clone(),
152+
copy_source: copy_source.clone(),
153+
key: format!("{}{:>05}", args.key_prefix, i),
154+
..Default::default()
155+
}));
156+
progressbar.inc(1);
157+
total_created += 1;
158+
}
159+
160+
for i in (pool_size + 1)..(args.object_count - pool_size) {
161+
let (_resp, _, copies) = futures::future::select_all(copy_reqs).await;
162+
copy_reqs = copies;
163+
copy_reqs.push(client.copy_object(rusoto_s3::CopyObjectRequest {
164+
bucket: args.bucket.clone(),
165+
copy_source: copy_source.clone(),
166+
key: format!("{}{:>05}", args.key_prefix, i),
167+
..Default::default()
168+
}));
169+
progressbar.inc(1);
170+
total_created += 1;
171+
}
172+
while !copy_reqs.is_empty() {
173+
let (_resp, _, copies) = futures::future::select_all(copy_reqs).await;
174+
copy_reqs = copies;
175+
total_created += 1;
176+
progressbar.inc(1);
177+
}
178+
drop(progressbar);
179+
180+
info!("created {} objects", total_created);
181+
assert_eq!(total_created, args.object_count);
182+
183+
Ok(())
184+
}
185+
186+
fn parse_object_size(s: &str) -> anyhow::Result<usize> {
187+
bytefmt::parse(s)
188+
.map_err(|e| anyhow!("{}", e))
189+
.and_then(|b| Ok(b.try_into()?))
190+
}

0 commit comments

Comments
 (0)