Skip to content

Commit 1b0aa58

Browse files
committed
Optionally prepending s3 compatible storage's key with a hash of the key.
The point is to workaround S3 rate limiting. Since it is based on keys, our ULID naming scheme can lead to hotspot in the keyspace. This solution has a downside. External scripts listing files will have a their job multiplied. For this reason, the prefix cardinality is configurable. Closes #4824
1 parent e08eb9f commit 1b0aa58

File tree

5 files changed

+110
-18
lines changed

5 files changed

+110
-18
lines changed

quickwit/Cargo.lock

+1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

quickwit/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,7 @@ matches = "0.1.9"
152152
md5 = "0.7"
153153
mime_guess = "2.0.4"
154154
mockall = "0.11"
155+
murmurhash32 = "0.3"
155156
mrecordlog = { git = "https://github.com/quickwit-oss/mrecordlog", rev = "306c0a7" }
156157
new_string_template = "1.5.1"
157158
nom = "7.1.3"

quickwit/quickwit-config/src/storage_config.rs

+35-1
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ use itertools::Itertools;
2626
use quickwit_common::get_bool_from_env;
2727
use serde::{Deserialize, Serialize};
2828
use serde_with::{serde_as, EnumMap};
29+
use tracing::warn;
2930

3031
/// Lists the storage backends supported by Quickwit.
3132
#[derive(Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Hash, Serialize, Deserialize)]
@@ -93,6 +94,9 @@ impl StorageConfigs {
9394
}
9495

9596
pub fn validate(&self) -> anyhow::Result<()> {
97+
for storage_config in self.0.iter() {
98+
storage_config.validate()?;
99+
}
96100
let backends: Vec<StorageBackend> = self
97101
.0
98102
.iter()
@@ -216,6 +220,16 @@ impl StorageConfig {
216220
_ => None,
217221
}
218222
}
223+
224+
pub fn validate(&self) -> anyhow::Result<()> {
225+
if let StorageConfig::S3(config) = self {
226+
config.validate()
227+
} else {
228+
Ok(())
229+
}
230+
231+
}
232+
219233
}
220234

221235
impl From<AzureStorageConfig> for StorageConfig {
@@ -313,6 +327,9 @@ impl fmt::Debug for AzureStorageConfig {
313327
}
314328
}
315329

330+
331+
const MAX_S3_HASH_PREFIX_CARDINALITY: usize = 16usize.pow(3);
332+
316333
#[derive(Clone, Default, Eq, PartialEq, Serialize, Deserialize)]
317334
#[serde(deny_unknown_fields)]
318335
pub struct S3StorageConfig {
@@ -334,9 +351,26 @@ pub struct S3StorageConfig {
334351
pub disable_multi_object_delete: bool,
335352
#[serde(default)]
336353
pub disable_multipart_upload: bool,
354+
#[serde(default)]
355+
#[serde(skip_serializing_if = "lower_than_2")]
356+
pub hash_prefix_cardinality: usize,
357+
}
358+
359+
fn lower_than_2(n: &usize) -> bool {
360+
*n < 2
337361
}
338362

339363
impl S3StorageConfig {
364+
fn validate(&self) -> anyhow::Result<()> {
365+
if self.hash_prefix_cardinality == 1 {
366+
warn!("A hash prefix of 1 will be ignored.");
367+
}
368+
if self.hash_prefix_cardinality > MAX_S3_HASH_PREFIX_CARDINALITY {
369+
anyhow::bail!("hash_prefix_cardinality can take values of at most {MAX_S3_HASH_PREFIX_CARDINALITY}, currently set to {}", self.hash_prefix_cardinality);
370+
}
371+
Ok(())
372+
}
373+
340374
fn apply_flavor(&mut self) {
341375
match self.flavor {
342376
Some(StorageBackendFlavor::DigitalOcean) => {
@@ -383,7 +417,7 @@ impl S3StorageConfig {
383417
}
384418

385419
impl fmt::Debug for S3StorageConfig {
386-
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
420+
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
387421
f.debug_struct("S3StorageConfig")
388422
.field("access_key_id", &self.access_key_id)
389423
.field(

quickwit/quickwit-storage/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ once_cell = { workspace = true }
2626
pin-project = { workspace = true }
2727
rand = { workspace = true }
2828
regex = { workspace = true }
29+
murmurhash32 = { workspace = true }
2930
serde = { workspace = true }
3031
serde_json = { workspace = true }
3132
tantivy = { workspace = true }

quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs

+72-17
Original file line numberDiff line numberDiff line change
@@ -86,11 +86,13 @@ pub struct S3CompatibleObjectStorage {
8686
s3_client: S3Client,
8787
uri: Uri,
8888
bucket: String,
89-
prefix: PathBuf,
89+
prefix: String,
9090
multipart_policy: MultiPartPolicy,
9191
retry_params: RetryParams,
9292
disable_multi_object_delete: bool,
9393
disable_multipart_upload: bool,
94+
// If 0, we don't have any prefix
95+
hash_prefix_cardinality: usize,
9496
}
9597

9698
impl fmt::Debug for S3CompatibleObjectStorage {
@@ -99,6 +101,7 @@ impl fmt::Debug for S3CompatibleObjectStorage {
99101
.debug_struct("S3CompatibleObjectStorage")
100102
.field("bucket", &self.bucket)
101103
.field("prefix", &self.prefix)
104+
.field("hash_prefix_cardinality", &self.hash_prefix_cardinality)
102105
.finish()
103106
}
104107
}
@@ -181,19 +184,20 @@ impl S3CompatibleObjectStorage {
181184
s3_client,
182185
uri: uri.clone(),
183186
bucket,
184-
prefix,
187+
prefix: prefix.to_string_lossy().to_string(),
185188
multipart_policy: MultiPartPolicy::default(),
186189
retry_params,
187190
disable_multi_object_delete,
188191
disable_multipart_upload,
192+
hash_prefix_cardinality: s3_storage_config.hash_prefix_cardinality,
189193
})
190194
}
191195

192196
/// Sets a specific for all buckets.
193197
///
194198
/// This method overrides any existing prefix. (It does NOT
195199
/// append the argument to any existing prefix.)
196-
pub fn with_prefix(self, prefix: PathBuf) -> Self {
200+
pub fn with_prefix(self, prefix: String) -> Self {
197201
Self {
198202
s3_client: self.s3_client,
199203
uri: self.uri,
@@ -203,6 +207,7 @@ impl S3CompatibleObjectStorage {
203207
retry_params: self.retry_params,
204208
disable_multi_object_delete: self.disable_multi_object_delete,
205209
disable_multipart_upload: self.disable_multipart_upload,
210+
hash_prefix_cardinality: self.hash_prefix_cardinality,
206211
}
207212
}
208213

@@ -262,12 +267,47 @@ async fn compute_md5<T: AsyncRead + std::marker::Unpin>(mut read: T) -> io::Resu
262267
}
263268
}
264269
}
270+
const HEX_ALPHABET: [u8; 16] = *b"0123456789abcdef";
271+
const UNINITIALIZED_HASH_PREFIX: &str = "00000000";
272+
273+
fn build_key(prefix: &str, relative_path: &str, hash_prefix_cardinality: usize) -> String {
274+
let mut key = String::with_capacity(UNINITIALIZED_HASH_PREFIX.len() + 1 + prefix.len() + 1 + relative_path.len());
275+
if hash_prefix_cardinality > 1 {
276+
key.push_str(UNINITIALIZED_HASH_PREFIX);
277+
key.push('/');
278+
}
279+
key.push_str(prefix);
280+
if key.as_bytes().last().copied() != Some(b'/') {
281+
key.push('/');
282+
}
283+
key.push_str(relative_path);
284+
// We then set up the prefix.
285+
if hash_prefix_cardinality > 1 {
286+
let key_without_prefix = &key.as_bytes()[UNINITIALIZED_HASH_PREFIX.len() + 1..];
287+
let mut prefix_hash: usize =
288+
murmurhash32::murmurhash3(key_without_prefix) as usize % hash_prefix_cardinality;
289+
unsafe {
290+
let prefix_buf: &mut [u8] = &mut key.as_bytes_mut()[..UNINITIALIZED_HASH_PREFIX.len()];
291+
for prefix_byte in prefix_buf.iter_mut() {
292+
let hex: u8 = HEX_ALPHABET[(prefix_hash % 16) as usize];
293+
*prefix_byte = hex;
294+
if prefix_hash < 16 {
295+
break;
296+
}
297+
prefix_hash /= 16;
298+
}
299+
}
300+
}
301+
key
302+
}
265303

266304
impl S3CompatibleObjectStorage {
267305
fn key(&self, relative_path: &Path) -> String {
268-
// FIXME: This may not work on Windows.
269-
let key_path = self.prefix.join(relative_path);
270-
key_path.to_string_lossy().to_string()
306+
build_key(
307+
&self.prefix,
308+
relative_path.to_string_lossy().as_ref(),
309+
self.hash_prefix_cardinality,
310+
)
271311
}
272312

273313
fn relative_path(&self, key: &str) -> PathBuf {
@@ -945,13 +985,13 @@ mod tests {
945985
let s3_client = S3Client::new(&sdk_config);
946986
let uri = Uri::for_test("s3://bucket/indexes");
947987
let bucket = "bucket".to_string();
948-
let prefix = PathBuf::new();
949988

950989
let mut s3_storage = S3CompatibleObjectStorage {
951990
s3_client,
952991
uri,
953992
bucket,
954-
prefix,
993+
prefix: String::new(),
994+
hash_prefix_cardinality: 0,
955995
multipart_policy: MultiPartPolicy::default(),
956996
retry_params: RetryParams::for_test(),
957997
disable_multi_object_delete: false,
@@ -962,7 +1002,7 @@ mod tests {
9621002
PathBuf::from("indexes/foo")
9631003
);
9641004

965-
s3_storage.prefix = PathBuf::from("indexes");
1005+
s3_storage.prefix = "indexes".to_string();
9661006

9671007
assert_eq!(
9681008
s3_storage.relative_path("indexes/foo"),
@@ -1000,13 +1040,13 @@ mod tests {
10001040
let s3_client = S3Client::from_conf(config);
10011041
let uri = Uri::for_test("s3://bucket/indexes");
10021042
let bucket = "bucket".to_string();
1003-
let prefix = PathBuf::new();
10041043

10051044
let s3_storage = S3CompatibleObjectStorage {
10061045
s3_client,
10071046
uri,
10081047
bucket,
1009-
prefix,
1048+
prefix: String::new(),
1049+
hash_prefix_cardinality: 0,
10101050
multipart_policy: MultiPartPolicy::default(),
10111051
retry_params: RetryParams::for_test(),
10121052
disable_multi_object_delete: true,
@@ -1041,13 +1081,13 @@ mod tests {
10411081
let s3_client = S3Client::from_conf(config);
10421082
let uri = Uri::for_test("s3://bucket/indexes");
10431083
let bucket = "bucket".to_string();
1044-
let prefix = PathBuf::new();
10451084

10461085
let s3_storage = S3CompatibleObjectStorage {
10471086
s3_client,
10481087
uri,
10491088
bucket,
1050-
prefix,
1089+
prefix: String::new(),
1090+
hash_prefix_cardinality: 0,
10511091
multipart_policy: MultiPartPolicy::default(),
10521092
retry_params: RetryParams::for_test(),
10531093
disable_multi_object_delete: false,
@@ -1123,13 +1163,13 @@ mod tests {
11231163
let s3_client = S3Client::from_conf(config);
11241164
let uri = Uri::for_test("s3://bucket/indexes");
11251165
let bucket = "bucket".to_string();
1126-
let prefix = PathBuf::new();
11271166

11281167
let s3_storage = S3CompatibleObjectStorage {
11291168
s3_client,
11301169
uri,
11311170
bucket,
1132-
prefix,
1171+
prefix: String::new(),
1172+
hash_prefix_cardinality: 0,
11331173
multipart_policy: MultiPartPolicy::default(),
11341174
retry_params: RetryParams::for_test(),
11351175
disable_multi_object_delete: false,
@@ -1216,13 +1256,13 @@ mod tests {
12161256
let s3_client = S3Client::from_conf(config);
12171257
let uri = Uri::for_test("s3://bucket/indexes");
12181258
let bucket = "bucket".to_string();
1219-
let prefix = PathBuf::new();
12201259

12211260
let s3_storage = S3CompatibleObjectStorage {
12221261
s3_client,
12231262
uri,
12241263
bucket,
1225-
prefix,
1264+
prefix: String::new(),
1265+
hash_prefix_cardinality: 0,
12261266
multipart_policy: MultiPartPolicy::default(),
12271267
retry_params: RetryParams::for_test(),
12281268
disable_multi_object_delete: false,
@@ -1233,4 +1273,19 @@ mod tests {
12331273
.await
12341274
.unwrap();
12351275
}
1276+
1277+
#[test]
1278+
fn test_build_key() {
1279+
assert_eq!(build_key("hello", "coucou", 0), "hello/coucou");
1280+
assert_eq!(build_key("hello/", "coucou", 0), "hello/coucou");
1281+
assert_eq!(build_key("hello/", "coucou", 1), "hello/coucou");
1282+
assert_eq!(build_key("hello", "coucou", 1), "hello/coucou");
1283+
assert_eq!(build_key("hello/", "coucou", 2), "10000000/hello/coucou");
1284+
assert_eq!(build_key("hello", "coucou", 2), "10000000/hello/coucou");
1285+
assert_eq!(build_key("hello/", "coucou", 16), "d0000000/hello/coucou");
1286+
assert_eq!(build_key("hello", "coucou", 16), "d0000000/hello/coucou");
1287+
assert_eq!(build_key("hello/", "coucou", 17), "50000000/hello/coucou");
1288+
assert_eq!(build_key("hello", "coucou", 17), "50000000/hello/coucou");
1289+
assert_eq!(build_key("hello/", "coucou", 70), "f0000000/hello/coucou");
1290+
}
12361291
}

0 commit comments

Comments
 (0)