Skip to content

Commit 783d155

Browse files
feat(query): support direct the spill data to a different bucket (#17159)
* add storage.spill bucket * fix * fix * fix * remove strict * vacuum_temporary_files * SpillConfig.storage * test * fix * fix * test * update * fix * fix * update test --------- Co-authored-by: sundyli <[email protected]>
1 parent 52794c1 commit 783d155

File tree

26 files changed

+242
-151
lines changed

26 files changed

+242
-151
lines changed

โ€Ž.github/actions/test_sqllogic_standalone_linux_minio/action.yml

+1
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ runs:
4545
export AWS_EC2_METADATA_DISABLED=true
4646
4747
aws --endpoint-url http://127.0.0.1:9900/ s3 mb s3://testbucket
48+
aws --endpoint-url http://127.0.0.1:9900/ s3 mb s3://spillbucket
4849
4950
- name: Run sqllogic Tests with Standalone mode
5051
if: inputs.storage-format == 'all' || inputs.storage-format == 'parquet'

โ€Žscripts/ci/ci-run-sqllogic-tests-minio.sh

+14
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,20 @@ export STORAGE_S3_ACCESS_KEY_ID=minioadmin
1818
export STORAGE_S3_SECRET_ACCESS_KEY=minioadmin
1919
export STORAGE_ALLOW_INSECURE=true
2020

21+
export SPILL_SPILL_LOCAL_DISK_PATH=''
22+
config="[spill.storage]
23+
type = \"s3\"
24+
25+
[spill.storage.s3]
26+
bucket = \"spillbucket\"
27+
root = \"admin\"
28+
endpoint_url = \"http://127.0.0.1:9900\"
29+
access_key_id = \"minioadmin\"
30+
secret_access_key = \"minioadmin\"
31+
allow_insecure = true"
32+
33+
echo "$config" >> ./scripts/ci/deploy/config/databend-query-node-1.toml
34+
2135
echo "Starting standalone DatabendQuery and DatabendMeta"
2236
./scripts/ci/deploy/databend-query-standalone.sh
2337

โ€Žsrc/common/storage/src/lib.rs

+1
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ pub use http_client::StorageHttpClient;
4040

4141
mod operator;
4242
pub use operator::build_operator;
43+
pub use operator::check_operator;
4344
pub use operator::init_operator;
4445
pub use operator::DataOperator;
4546

โ€Žsrc/common/storage/src/operator.rs

+75-34
Original file line numberDiff line numberDiff line change
@@ -407,7 +407,9 @@ impl RetryInterceptor for DatabendRetryInterceptor {
407407
#[derive(Clone, Debug)]
408408
pub struct DataOperator {
409409
operator: Operator,
410+
spill_operator: Option<Operator>,
410411
params: StorageParams,
412+
spill_params: Option<StorageParams>,
411413
}
412414

413415
impl DataOperator {
@@ -416,60 +418,69 @@ impl DataOperator {
416418
self.operator.clone()
417419
}
418420

421+
pub fn spill_operator(&self) -> Operator {
422+
match &self.spill_operator {
423+
Some(op) => op.clone(),
424+
None => self.operator.clone(),
425+
}
426+
}
427+
428+
pub fn spill_params(&self) -> Option<&StorageParams> {
429+
self.spill_params.as_ref()
430+
}
431+
419432
pub fn params(&self) -> StorageParams {
420433
self.params.clone()
421434
}
422435

423436
#[async_backtrace::framed]
424-
pub async fn init(conf: &StorageConfig) -> databend_common_exception::Result<()> {
425-
GlobalInstance::set(Self::try_create(&conf.params).await?);
437+
pub async fn init(
438+
conf: &StorageConfig,
439+
spill_params: Option<StorageParams>,
440+
) -> databend_common_exception::Result<()> {
441+
GlobalInstance::set(Self::try_create(conf, spill_params).await?);
426442

427443
Ok(())
428444
}
429445

430446
/// Create a new data operator without check.
431-
pub fn try_new(sp: &StorageParams) -> databend_common_exception::Result<DataOperator> {
432-
let operator = init_operator(sp)?;
447+
pub fn try_new(
448+
conf: &StorageConfig,
449+
spill_params: Option<StorageParams>,
450+
) -> databend_common_exception::Result<DataOperator> {
451+
let operator = init_operator(&conf.params)?;
452+
let spill_operator = spill_params.as_ref().map(init_operator).transpose()?;
433453

434454
Ok(DataOperator {
435455
operator,
436-
params: sp.clone(),
456+
params: conf.params.clone(),
457+
spill_operator,
458+
spill_params,
437459
})
438460
}
439461

440462
#[async_backtrace::framed]
441-
pub async fn try_create(sp: &StorageParams) -> databend_common_exception::Result<DataOperator> {
442-
let sp = sp.clone();
443-
444-
let operator = init_operator(&sp)?;
445-
446-
// OpenDAL will send a real request to underlying storage to check whether it works or not.
447-
// If this check failed, it's highly possible that the users have configured it wrongly.
448-
//
449-
// Make sure the check is called inside GlobalIORuntime to prevent
450-
// IO hang on reuse connection.
451-
let op = operator.clone();
452-
if let Err(cause) = GlobalIORuntime::instance()
453-
.spawn(async move {
454-
let res = op.stat("/").await;
455-
match res {
456-
Ok(_) => Ok(()),
457-
Err(e) if e.kind() == opendal::ErrorKind::NotFound => Ok(()),
458-
Err(e) => Err(e),
459-
}
460-
})
461-
.await
462-
.expect("join must succeed")
463-
{
464-
return Err(ErrorCode::StorageUnavailable(format!(
465-
"current configured storage is not available: config: {:?}, cause: {cause}",
466-
sp
467-
)));
468-
}
463+
pub async fn try_create(
464+
conf: &StorageConfig,
465+
spill_params: Option<StorageParams>,
466+
) -> databend_common_exception::Result<DataOperator> {
467+
let operator = init_operator(&conf.params)?;
468+
check_operator(&operator, &conf.params).await?;
469+
470+
let spill_operator = match &spill_params {
471+
Some(params) => {
472+
let op = init_operator(params)?;
473+
check_operator(&op, params).await?;
474+
Some(op)
475+
}
476+
None => None,
477+
};
469478

470479
Ok(DataOperator {
471480
operator,
472-
params: sp.clone(),
481+
params: conf.params.clone(),
482+
spill_operator,
483+
spill_params,
473484
})
474485
}
475486

@@ -485,3 +496,33 @@ impl DataOperator {
485496
GlobalInstance::get()
486497
}
487498
}
499+
500+
pub async fn check_operator(
501+
operator: &Operator,
502+
params: &StorageParams,
503+
) -> databend_common_exception::Result<()> {
504+
// OpenDAL will send a real request to underlying storage to check whether it works or not.
505+
// If this check failed, it's highly possible that the users have configured it wrongly.
506+
//
507+
// Make sure the check is called inside GlobalIORuntime to prevent
508+
// IO hang on reuse connection.
509+
let op = operator.clone();
510+
511+
GlobalIORuntime::instance()
512+
.spawn(async move {
513+
let res = op.stat("/").await;
514+
match res {
515+
Ok(_) => Ok(()),
516+
Err(e) if e.kind() == opendal::ErrorKind::NotFound => Ok(()),
517+
Err(e) => Err(e),
518+
}
519+
})
520+
.await
521+
.expect("join must succeed")
522+
.map_err(|cause| {
523+
ErrorCode::StorageUnavailable(format!(
524+
"current configured storage is not available: config: {:?}, cause: {cause}",
525+
params
526+
))
527+
})
528+
}

โ€Žsrc/query/config/src/config.rs

+42-37
Original file line numberDiff line numberDiff line change
@@ -2972,6 +2972,10 @@ pub struct SpillConfig {
29722972
#[clap(long, value_name = "VALUE", default_value = "18446744073709551615")]
29732973
/// Allow space in bytes to spill to local disk.
29742974
pub spill_local_disk_max_bytes: u64,
2975+
2976+
// TODO: We need to fix StorageConfig so that it supports environment variables and command line injections.
2977+
#[clap(skip)]
2978+
pub storage: Option<StorageConfig>,
29752979
}
29762980

29772981
impl Default for SpillConfig {
@@ -2981,8 +2985,6 @@ impl Default for SpillConfig {
29812985
}
29822986

29832987
mod cache_config_converters {
2984-
use std::path::PathBuf;
2985-
29862988
use log::warn;
29872989

29882990
use super::*;
@@ -3024,7 +3026,7 @@ mod cache_config_converters {
30243026
storage,
30253027
catalog,
30263028
cache,
3027-
mut spill,
3029+
spill,
30283030
background,
30293031
catalogs: input_catalogs,
30303032
..
@@ -3044,18 +3046,7 @@ mod cache_config_converters {
30443046
catalogs.insert(CATALOG_HIVE.to_string(), catalog);
30453047
}
30463048

3047-
// Trick for cloud, perhaps we should introduce a new configuration for the local writeable root.
3048-
if cache.disk_cache_config.path != inner::DiskCacheConfig::default().path
3049-
&& spill.spill_local_disk_path == inner::SpillConfig::default().path
3050-
{
3051-
spill.spill_local_disk_path = PathBuf::from(&cache.disk_cache_config.path)
3052-
.join("temp/_query_spill")
3053-
.into_os_string()
3054-
.into_string()
3055-
.map_err(|s| {
3056-
ErrorCode::Internal(format!("failed to convert os string to string: {s:?}"))
3057-
})?
3058-
};
3049+
let spill = convert_local_spill_config(spill, &cache.disk_cache_config)?;
30593050

30603051
Ok(InnerConfig {
30613052
subcommand,
@@ -3066,7 +3057,7 @@ mod cache_config_converters {
30663057
storage: storage.try_into()?,
30673058
catalogs,
30683059
cache: cache.try_into()?,
3069-
spill: spill.try_into()?,
3060+
spill,
30703061
background: background.try_into()?,
30713062
})
30723063
}
@@ -3129,37 +3120,51 @@ mod cache_config_converters {
31293120
}
31303121
}
31313122

3132-
impl TryFrom<SpillConfig> for inner::SpillConfig {
3133-
type Error = ErrorCode;
3123+
fn convert_local_spill_config(
3124+
spill: SpillConfig,
3125+
cache: &DiskCacheConfig,
3126+
) -> Result<inner::SpillConfig> {
3127+
// Trick for cloud, perhaps we should introduce a new configuration for the local writeable root.
3128+
let local_writeable_root = if cache.path != DiskCacheConfig::default().path
3129+
&& spill.spill_local_disk_path.is_empty()
3130+
{
3131+
Some(cache.path.clone())
3132+
} else {
3133+
None
3134+
};
31343135

3135-
fn try_from(value: SpillConfig) -> std::result::Result<Self, Self::Error> {
3136-
let SpillConfig {
3137-
spill_local_disk_path,
3138-
spill_local_disk_reserved_space_percentage: reserved,
3139-
spill_local_disk_max_bytes,
3140-
} = value;
3141-
if !reserved.is_normal()
3142-
|| reserved.is_sign_negative()
3143-
|| reserved > OrderedFloat(100.0)
3144-
{
3145-
Err(ErrorCode::InvalidArgument(format!(
3146-
"invalid spill_local_disk_reserved_space_percentage: {reserved}"
3147-
)))?;
3148-
}
3149-
Ok(Self {
3150-
path: spill_local_disk_path,
3151-
reserved_disk_ratio: reserved / 100.0,
3152-
global_bytes_limit: spill_local_disk_max_bytes,
3136+
let storage_params = spill
3137+
.storage
3138+
.map(|storage| {
3139+
let storage: InnerStorageConfig = storage.try_into()?;
3140+
Ok::<_, ErrorCode>(storage.params)
31533141
})
3154-
}
3142+
.transpose()?;
3143+
3144+
Ok(inner::SpillConfig {
3145+
local_writeable_root,
3146+
path: spill.spill_local_disk_path,
3147+
reserved_disk_ratio: spill.spill_local_disk_reserved_space_percentage / 100.0,
3148+
global_bytes_limit: spill.spill_local_disk_max_bytes,
3149+
storage_params,
3150+
})
31553151
}
31563152

31573153
impl From<inner::SpillConfig> for SpillConfig {
31583154
fn from(value: inner::SpillConfig) -> Self {
3155+
let storage = value.storage_params.map(|params| {
3156+
InnerStorageConfig {
3157+
params,
3158+
..Default::default()
3159+
}
3160+
.into()
3161+
});
3162+
31593163
Self {
31603164
spill_local_disk_path: value.path,
31613165
spill_local_disk_reserved_space_percentage: value.reserved_disk_ratio * 100.0,
31623166
spill_local_disk_max_bytes: value.global_bytes_limit,
3167+
storage,
31633168
}
31643169
}
31653170
}

โ€Žsrc/query/config/src/inner.rs

+37-2
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use std::fmt;
1717
use std::fmt::Debug;
1818
use std::fmt::Display;
1919
use std::fmt::Formatter;
20+
use std::path::PathBuf;
2021
use std::str::FromStr;
2122
use std::time::Duration;
2223

@@ -28,6 +29,7 @@ use databend_common_exception::Result;
2829
use databend_common_grpc::RpcClientConf;
2930
use databend_common_grpc::RpcClientTlsConfig;
3031
use databend_common_meta_app::principal::UserSettingValue;
32+
use databend_common_meta_app::storage::StorageParams;
3133
use databend_common_meta_app::tenant::Tenant;
3234
use databend_common_meta_app::tenant::TenantQuota;
3335
use databend_common_storage::StorageConfig;
@@ -717,22 +719,55 @@ impl Default for CacheConfig {
717719

718720
#[derive(Clone, Debug, PartialEq, Eq)]
719721
pub struct SpillConfig {
720-
/// Path of spill to local disk. disable if it's empty.
721-
pub path: String,
722+
pub(crate) local_writeable_root: Option<String>,
723+
pub(crate) path: String,
722724

723725
/// Ratio of the reserve of the disk space.
724726
pub reserved_disk_ratio: OrderedFloat<f64>,
725727

726728
/// Allow bytes use of disk space.
727729
pub global_bytes_limit: u64,
730+
731+
pub storage_params: Option<StorageParams>,
732+
}
733+
734+
impl SpillConfig {
735+
/// Path of spill to local disk.
736+
pub fn local_path(&self) -> Option<PathBuf> {
737+
if self.global_bytes_limit == 0 {
738+
return None;
739+
}
740+
741+
if !self.path.is_empty() {
742+
return Some(self.path.clone().into());
743+
}
744+
745+
if let Some(root) = &self.local_writeable_root {
746+
return Some(PathBuf::from(root).join("temp/_query_spill"));
747+
}
748+
749+
None
750+
}
751+
752+
pub fn new_for_test(path: String, reserved_disk_ratio: f64, global_bytes_limit: u64) -> Self {
753+
Self {
754+
local_writeable_root: None,
755+
path,
756+
reserved_disk_ratio: OrderedFloat(reserved_disk_ratio),
757+
global_bytes_limit,
758+
storage_params: None,
759+
}
760+
}
728761
}
729762

730763
impl Default for SpillConfig {
731764
fn default() -> Self {
732765
Self {
766+
local_writeable_root: None,
733767
path: "".to_string(),
734768
reserved_disk_ratio: OrderedFloat(0.3),
735769
global_bytes_limit: u64::MAX,
770+
storage_params: None,
736771
}
737772
}
738773
}

0 commit comments

Comments
ย (0)