Skip to content

Commit 13c79d9

Browse files
authored
refactor: Bump OpenDAL to 0.51 (#17161)
* refactor: Bump OpenDAL to 0.51 Signed-off-by: Xuanwo <[email protected]> * Remove not needed deps Signed-off-by: Xuanwo <[email protected]> * Fix tests Signed-off-by: Xuanwo <[email protected]> * Fix tests Signed-off-by: Xuanwo <[email protected]> * Address tests Signed-off-by: Xuanwo <[email protected]> * Fix part of stage Signed-off-by: Xuanwo <[email protected]> * Fix delta table Signed-off-by: Xuanwo <[email protected]> * Fix size Signed-off-by: Xuanwo <[email protected]> --------- Signed-off-by: Xuanwo <[email protected]>
1 parent c1e295f commit 13c79d9

File tree

16 files changed

+244
-164
lines changed

16 files changed

+244
-164
lines changed

Cargo.lock

+51-25
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+2-3
Original file line numberDiff line numberDiff line change
@@ -291,7 +291,6 @@ ethnum = { version = "1.5.0" }
291291
fallible-streaming-iterator = "0.1"
292292
faststr = "0.2"
293293
feature-set = { version = "0.1.1" }
294-
flagset = "0.4"
295294
flatbuffers = "24" # Must use the same version with arrow-ipc
296295
flate2 = "1"
297296
foreign_vec = "0.1.0"
@@ -363,10 +362,10 @@ num-derive = "0.3.3"
363362
num-traits = "0.2.19"
364363
num_cpus = "1.13.1"
365364
object = "0.36.5"
366-
object_store_opendal = "0.48.1"
365+
object_store_opendal = { git = "https://github.com/apache/opendal", package = "object_store_opendal", rev = "f7f9990" }
367366
once_cell = "1.15.0"
368367
openai_api_rust = "0.1"
369-
opendal = { version = "0.50.1", features = [
368+
opendal = { version = "0.51", git = "https://github.com/apache/opendal", rev = "f7f9990", features = [
370369
"layers-fastrace",
371370
"layers-prometheus-client",
372371
"layers-async-backtrace",

src/common/storage/Cargo.toml

-1
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ databend-common-meta-app = { workspace = true }
2424
databend-common-metrics = { workspace = true }
2525
databend-common-native = { workspace = true }
2626
databend-enterprise-storage-encryption = { workspace = true }
27-
flagset = { workspace = true }
2827
futures = { workspace = true }
2928
http = { workspace = true }
3029
log = { workspace = true }

src/common/storage/src/metrics.rs

+12
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use opendal::raw::LayeredAccess;
2525
use opendal::raw::OpList;
2626
use opendal::raw::OpRead;
2727
use opendal::raw::OpWrite;
28+
use opendal::raw::RpDelete;
2829
use opendal::raw::RpList;
2930
use opendal::raw::RpRead;
3031
use opendal::raw::RpWrite;
@@ -167,6 +168,8 @@ impl<A: Access> LayeredAccess for StorageMetricsAccessor<A> {
167168
type BlockingWriter = StorageMetricsWrapper<A::BlockingWriter>;
168169
type Lister = A::Lister;
169170
type BlockingLister = A::BlockingLister;
171+
type Deleter = A::Deleter;
172+
type BlockingDeleter = A::BlockingDeleter;
170173

171174
fn inner(&self) -> &Self::Inner {
172175
&self.inner
@@ -193,6 +196,11 @@ impl<A: Access> LayeredAccess for StorageMetricsAccessor<A> {
193196
self.inner.list(path, args).await
194197
}
195198

199+
#[async_backtrace::framed]
200+
async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
201+
self.inner.delete().await
202+
}
203+
196204
fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> {
197205
self.inner
198206
.blocking_read(path, args)
@@ -208,6 +216,10 @@ impl<A: Access> LayeredAccess for StorageMetricsAccessor<A> {
208216
fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingLister)> {
209217
self.inner.blocking_list(path, args)
210218
}
219+
220+
fn blocking_delete(&self) -> Result<(RpDelete, Self::BlockingDeleter)> {
221+
self.inner.blocking_delete()
222+
}
211223
}
212224

213225
pub struct StorageMetricsWrapper<R> {

src/common/storage/src/runtime_layer.rs

+34-3
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,8 @@ impl<A: Access> LayeredAccess for RuntimeAccessor<A> {
9393
type BlockingWriter = A::BlockingWriter;
9494
type Lister = A::Lister;
9595
type BlockingLister = A::BlockingLister;
96+
type Deleter = RuntimeIO<A::Deleter>;
97+
type BlockingDeleter = A::BlockingDeleter;
9698

9799
fn inner(&self) -> &Self::Inner {
98100
&self.inner
@@ -139,13 +141,17 @@ impl<A: Access> LayeredAccess for RuntimeAccessor<A> {
139141
.expect("join must success")
140142
}
141143

142-
async fn delete(&self, path: &str, args: OpDelete) -> Result<RpDelete> {
144+
async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
143145
let op = self.inner.clone();
144-
let path = path.to_string();
146+
145147
self.runtime
146-
.spawn(async move { op.delete(&path, args).await })
148+
.spawn(async move { op.delete().await })
147149
.await
148150
.expect("join must success")
151+
.map(|(rp, r)| {
152+
let r = RuntimeIO::new(r, self.runtime.clone());
153+
(rp, r)
154+
})
149155
}
150156

151157
async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
@@ -168,6 +174,10 @@ impl<A: Access> LayeredAccess for RuntimeAccessor<A> {
168174
fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingLister)> {
169175
self.inner.blocking_list(path, args)
170176
}
177+
178+
fn blocking_delete(&self) -> Result<(RpDelete, Self::BlockingDeleter)> {
179+
self.inner.blocking_delete()
180+
}
171181
}
172182

173183
pub struct RuntimeIO<R: 'static> {
@@ -200,3 +210,24 @@ impl<R: oio::Read> oio::Read for RuntimeIO<R> {
200210
res
201211
}
202212
}
213+
214+
impl<R: oio::Delete> oio::Delete for RuntimeIO<R> {
215+
fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
216+
self.inner.as_mut().unwrap().delete(path, args)
217+
}
218+
219+
async fn flush(&mut self) -> Result<usize> {
220+
let mut r = self.inner.take().expect("reader must be valid");
221+
let runtime = self.runtime.clone();
222+
223+
let (r, res) = runtime
224+
.spawn(async move {
225+
let res = r.flush().await;
226+
(r, res)
227+
})
228+
.await
229+
.expect("join must success");
230+
self.inner = Some(r);
231+
res
232+
}
233+
}

0 commit comments

Comments
 (0)