Skip to content

Commit 749ca84

Browse files
authored
refactor: make fuse_amend implementation more convenient for on-premise deployments (#17147)
* tweak cargo.toml * use native_tls in fuse_amend * sync Cargo.lock * add settings
1 parent 58d4f8e commit 749ca84

File tree

11 files changed

+137
-13
lines changed

11 files changed

+137
-13
lines changed

Cargo.lock

Lines changed: 20 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -234,11 +234,11 @@ async-compression = { git = "https://github.com/datafuse-extras/async-compressio
234234
"all-algorithms",
235235
] }
236236
async-recursion = "1.1.1"
237-
async-std = "1.12"
238237
async-stream = "0.3.3"
239238
async-trait = { version = "0.1.77" }
240-
aws-config = "1.1.7"
239+
aws-config = { version = "1.1.7", features = ["behavior-version-latest"] }
241240
aws-sdk-s3 = "1.17.0"
241+
aws-smithy-runtime = { version = "1.6.2", default-features = false }
242242
backoff = "0.4" # FIXME: use backon to replace this.
243243
backon = "1"
244244
backtrace = "0.3.73"
@@ -318,7 +318,9 @@ hostname = "0.3.1"
318318
http = "1"
319319
humantime = "2.1.0"
320320
hyper = "1"
321+
hyper-tls = "0.5.0"
321322
hyper-util = { version = "0.1.9", features = ["client", "client-legacy", "tokio", "service"] }
323+
hyper_v014 = { package = "hyper", version = "0.14" }
322324
iceberg = { version = "0.3.0", git = "https://github.com/Xuanwo/iceberg-rust/", rev = "01d706a1" }
323325
iceberg-catalog-glue = { version = "0.3.0", git = "https://github.com/Xuanwo/iceberg-rust/", rev = "01d706a1" }
324326
iceberg-catalog-hms = { version = "0.3.0", git = "https://github.com/Xuanwo/iceberg-rust/", rev = "01d706a1" }

src/common/grpc/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ anyerror = { workspace = true }
1818
hickory-resolver = { workspace = true }
1919
hyper = { workspace = true }
2020
hyper-util = { workspace = true }
21+
hyper_v014 = { workspace = true }
2122
jwt-simple = { workspace = true }
2223
log = { workspace = true }
2324
serde = { workspace = true }

src/common/grpc/src/dns_resolver.rs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ use hickory_resolver::TokioAsyncResolver;
3232
use hyper::Uri;
3333
use hyper_util::client::legacy::connect::dns::Name;
3434
use hyper_util::client::legacy::connect::HttpConnector;
35+
use hyper_v014::client::connect::dns::Name as Hyperv014Name;
3536
use log::info;
3637
use serde::Deserialize;
3738
use serde::Serialize;
@@ -109,6 +110,31 @@ impl tower_service::Service<Name> for DNSService {
109110
}
110111
}
111112

113+
// resolve Name of hyper version 0.14
114+
impl tower_service::Service<Hyperv014Name> for DNSService {
115+
type Response = DNSServiceAddrs;
116+
type Error = ErrorCode;
117+
type Future = DNSServiceFuture;
118+
119+
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<std::result::Result<(), Self::Error>> {
120+
Poll::Ready(Ok(()))
121+
}
122+
123+
fn call(&mut self, name: Hyperv014Name) -> Self::Future {
124+
let blocking = runtime::spawn(async move {
125+
let resolver = DNSResolver::instance()?;
126+
match resolver.resolve(name.to_string()).await {
127+
Err(err) => Err(err),
128+
Ok(addrs) => Ok(DNSServiceAddrs {
129+
inner: addrs.into_iter(),
130+
}),
131+
}
132+
});
133+
134+
DNSServiceFuture { inner: blocking }
135+
}
136+
}
137+
112138
pub struct DNSServiceFuture {
113139
inner: JoinHandle<Result<DNSServiceAddrs>>,
114140
}

src/query/ee/Cargo.toml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ test = true
1414
[dependencies]
1515
async-backtrace = { workspace = true }
1616
async-trait = { workspace = true }
17+
aws-smithy-runtime = { workspace = true }
1718
base64 = { workspace = true }
1819
chrono = { workspace = true }
1920
chrono-tz = { workspace = true }
@@ -25,6 +26,7 @@ databend-common-config = { workspace = true }
2526
databend-common-exception = { workspace = true }
2627
databend-common-expression = { workspace = true }
2728
databend-common-functions = { workspace = true }
29+
databend-common-grpc = { workspace = true }
2830
databend-common-io = { workspace = true }
2931
databend-common-license = { workspace = true }
3032
databend-common-meta-api = { workspace = true }
@@ -61,13 +63,15 @@ databend-storages-common-table-meta = { workspace = true }
6163
derive-visitor = { workspace = true }
6264
futures = { workspace = true }
6365
futures-util = { workspace = true }
66+
hyper-tls = { workspace = true }
67+
hyper_v014 = { workspace = true }
6468
jwt-simple = { workspace = true }
6569
log = { workspace = true }
6670
opendal = { workspace = true }
6771
tempfile = { workspace = true }
6872

6973
# aws sdk
70-
aws-config = { workspace = true, features = ["behavior-version-latest"] }
74+
aws-config = { workspace = true }
7175
aws-sdk-s3 = { workspace = true }
7276

7377
[dev-dependencies]

src/query/ee/src/fail_safe/handler.rs

Lines changed: 49 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::env;
1516
use std::sync::Arc;
1617
use std::time::Duration;
1718

@@ -22,13 +23,16 @@ use aws_sdk_s3::config::Credentials;
2223
use aws_sdk_s3::config::Region;
2324
use aws_sdk_s3::config::SharedCredentialsProvider;
2425
use aws_sdk_s3::Client;
26+
use aws_smithy_runtime::client::http::hyper_014::HyperClientBuilder;
2527
use databend_common_base::base::tokio::io::AsyncReadExt;
2628
use databend_common_base::base::GlobalInstance;
2729
use databend_common_catalog::table::Table;
30+
use databend_common_catalog::table_context::TableContext;
2831
use databend_common_config::GlobalConfig;
2932
use databend_common_exception::ErrorCode;
3033
use databend_common_exception::Result;
3134
use databend_common_expression::TableSchema;
35+
use databend_common_grpc::DNSService;
3236
use databend_common_meta_app::schema::TableInfo;
3337
use databend_common_meta_app::storage::StorageParams;
3438
use databend_common_storages_fuse::io::MetaReaders;
@@ -38,6 +42,7 @@ use databend_enterprise_fail_safe::FailSafeHandlerWrapper;
3842
use databend_storages_common_cache::LoadParams;
3943
use databend_storages_common_table_meta::meta::CompactSegmentInfo;
4044
use databend_storages_common_table_meta::meta::Location;
45+
use hyper_tls::HttpsConnector;
4146
use log::info;
4247
use log::warn;
4348
use opendal::ErrorKind;
@@ -49,7 +54,11 @@ impl RealFailSafeHandler {}
4954

5055
#[async_trait::async_trait]
5156
impl FailSafeHandler for RealFailSafeHandler {
52-
async fn recover_table_data(&self, table_info: TableInfo) -> Result<()> {
57+
async fn recover_table_data(
58+
&self,
59+
ctx: Arc<dyn TableContext>,
60+
table_info: TableInfo,
61+
) -> Result<()> {
5362
let storage_params = match &table_info.meta.storage_params {
5463
// External or attached table.
5564
Some(sp) => sp.clone(),
@@ -62,7 +71,7 @@ impl FailSafeHandler for RealFailSafeHandler {
6271

6372
let fuse_table = FuseTable::do_create(table_info)?;
6473

65-
let amender = Amender::try_new(storage_params).await?;
74+
let amender = Amender::try_new(ctx, storage_params).await?;
6675

6776
amender.recover_snapshot(fuse_table).await?;
6877

@@ -86,7 +95,7 @@ struct Amender {
8695
}
8796

8897
impl Amender {
89-
async fn try_new(storage_param: StorageParams) -> Result<Self> {
98+
async fn try_new(ctx: Arc<dyn TableContext>, storage_param: StorageParams) -> Result<Self> {
9099
// TODO
91100
// - replace client with opendal operator
92101
// - supports other storage types
@@ -116,18 +125,53 @@ impl Amender {
116125
.connect_timeout(Duration::from_secs(3))
117126
.build();
118127

128+
let tls_connector = {
129+
let mut builder = hyper_tls::native_tls::TlsConnector::builder();
130+
let allow_invalid_cert = ctx
131+
.get_settings()
132+
.get_premise_deploy_danger_amend_accept_invalid_cert()?;
133+
if allow_invalid_cert {
134+
info!("allows invalid cert, and accepts invalid hostnames in cert validation");
135+
builder.danger_accept_invalid_certs(true);
136+
builder.danger_accept_invalid_hostnames(true);
137+
};
138+
builder
139+
.build()
140+
.unwrap_or_else(|e| panic!("error while creating TLS connector: {}", e))
141+
};
142+
143+
let mut http = hyper_v014::client::HttpConnector::new_with_resolver(DNSService {});
144+
// also allows http connection
145+
http.enforce_http(false);
146+
147+
let connect_timeout = env::var("_DATABEND_INTERNAL_CONNECT_TIMEOUT")
148+
.ok()
149+
.and_then(|v| v.parse::<u64>().ok())
150+
.unwrap_or(30);
151+
http.set_connect_timeout(Some(Duration::from_secs(connect_timeout)));
152+
153+
let conn = HttpsConnector::from((http, tls_connector.into()));
154+
119155
let config = aws_config::from_env()
120156
.region(region_provider)
121157
.endpoint_url(s3_config.endpoint_url)
122158
.credentials_provider(SharedCredentialsProvider::new(base_credentials))
123159
.retry_config(retry_config)
124160
.timeout_config(timeout_config)
161+
.http_client(HyperClientBuilder::new().build(conn))
125162
.load()
126163
.await;
127164

165+
let force_path_style = ctx
166+
.get_settings()
167+
.get_premise_deploy_amend_force_path_style()?;
168+
let sdk_config = aws_sdk_s3::config::Builder::from(&config)
169+
.force_path_style(force_path_style)
170+
.build();
171+
128172
let root = s3_config.root;
129173
let bucket = s3_config.bucket;
130-
let client = Client::new(&config);
174+
let client = Client::from_conf(sdk_config);
131175

132176
Ok(Self {
133177
client,
@@ -286,7 +330,7 @@ impl Amender {
286330
.send()
287331
.await
288332
.map_err(|e| {
289-
ErrorCode::StorageOther(format!("failed to list object versions. {}", e))
333+
ErrorCode::StorageOther(format!("failed to list object versions. {:?}", e))
290334
})?;
291335

292336
// find the latest version

src/query/ee_features/fail_safe/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ test = true
1414
async-backtrace = { workspace = true }
1515
async-trait = { workspace = true }
1616
databend-common-base = { workspace = true }
17+
databend-common-catalog = { workspace = true }
1718
databend-common-exception = { workspace = true }
1819
databend-common-meta-app = { workspace = true }
1920

src/query/ee_features/fail_safe/src/handler.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,17 @@
1515
use std::sync::Arc;
1616

1717
use databend_common_base::base::GlobalInstance;
18+
use databend_common_catalog::table_context::TableContext;
1819
use databend_common_exception::Result;
1920
use databend_common_meta_app::schema::TableInfo;
2021

2122
#[async_trait::async_trait]
2223
pub trait FailSafeHandler: Sync + Send {
23-
async fn recover_table_data(&self, table_info: TableInfo) -> Result<()>;
24+
async fn recover_table_data(
25+
&self,
26+
ctx: Arc<dyn TableContext>,
27+
table_info: TableInfo,
28+
) -> Result<()>;
2429
}
2530

2631
pub struct FailSafeHandlerWrapper {
@@ -33,8 +38,8 @@ impl FailSafeHandlerWrapper {
3338
}
3439

3540
#[async_backtrace::framed]
36-
pub async fn recover(&self, table_info: TableInfo) -> Result<()> {
37-
self.handler.recover_table_data(table_info).await
41+
pub async fn recover(&self, ctx: Arc<dyn TableContext>, table_info: TableInfo) -> Result<()> {
42+
self.handler.recover_table_data(ctx, table_info).await
3843
}
3944
}
4045

src/query/settings/src/settings_default.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1151,6 +1151,20 @@ impl DefaultSettings {
11511151
scope: SettingScope::Both,
11521152
range: Some(SettingRange::Numeric(0..=u64::MAX)),
11531153
}),
1154+
("premise_deploy_danger_amend_accept_invalid_cert", DefaultSettingValue {
1155+
value: UserSettingValue::UInt64(0),
1156+
desc: "Setting this to a non-zero value will allow `fuse_amend` to accept invalid TLS certificates. For diagnostic purposes only, Be very cautious before setting this to a non-zero value. If you're unsure, leave it unchanged.",
1157+
mode: SettingMode::Both,
1158+
scope: SettingScope::Both,
1159+
range: Some(SettingRange::Numeric(0..=1)),
1160+
}),
1161+
("premise_deploy_amend_force_path_style", DefaultSettingValue {
1162+
value: UserSettingValue::UInt64(1),
1163+
desc: "Setting this to a non-zero value will let `fuse_amend` use path style uri while accessing s3-compatible storage service.",
1164+
mode: SettingMode::Both,
1165+
scope: SettingScope::Both,
1166+
range: Some(SettingRange::Numeric(0..=1)),
1167+
}),
11541168
]);
11551169

11561170
Ok(Arc::new(DefaultSettings {

src/query/settings/src/settings_getter_setter.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -848,4 +848,12 @@ impl Settings {
848848
let v = self.try_get_u64("stream_consume_batch_size_hint")?;
849849
Ok(if v == 0 { None } else { Some(v) })
850850
}
851+
852+
pub fn get_premise_deploy_danger_amend_accept_invalid_cert(&self) -> Result<bool> {
853+
Ok(self.try_get_u64("premise_deploy_danger_amend_accept_invalid_cert")? != 0)
854+
}
855+
856+
pub fn get_premise_deploy_amend_force_path_style(&self) -> Result<bool> {
857+
Ok(self.try_get_u64("premise_deploy_amend_force_path_style")? != 0)
858+
}
851859
}

src/query/storages/fuse/src/table_functions/fuse_amend.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ impl SimpleTableFunc for FuseAmendTable {
8989
})?;
9090

9191
self.fail_safe_handler
92-
.recover(tbl.table_info.clone())
92+
.recover(ctx.clone(), tbl.table_info.clone())
9393
.await?;
9494

9595
let col: Vec<String> = vec!["Ok".to_owned()];

0 commit comments

Comments
 (0)