Skip to content

Commit b341910

Browse files
authored
feat: runtime module (#233)
* temp runtime * POC * fix chrono * fix dep * refine module * refactor to use a deadly simple way * allow dead_code * add license * fix clippy and tests * clean code * undo * add async-std ci test * rm tokio dev-dep * make tokio dev dep * fix sort * rm tokio dev
1 parent c3b5364 commit b341910

File tree

5 files changed

+116
-3
lines changed

5 files changed

+116
-3
lines changed

.github/workflows/ci.yml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,9 @@ jobs:
7171

7272
- name: Test
7373
run: cargo test --no-fail-fast --all-targets --all-features --workspace
74-
74+
75+
- name: Async-std Test
76+
run: cargo test --no-fail-fast --all-targets --no-default-features --features "async-std" --features "storage-fs" --workspace
77+
7578
- name: Doc Test
7679
run: cargo test --no-fail-fast --doc --all-features --workspace

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ arrow-select = { version = "52" }
4646
arrow-string = { version = "52" }
4747
async-stream = "0.3.5"
4848
async-trait = "0.1"
49+
async-std = "1.12.0"
4950
aws-config = "1.1.8"
5051
aws-sdk-glue = "1.21.0"
5152
bimap = "0.6"

crates/iceberg/Cargo.toml

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,15 @@ license = { workspace = true }
2929
keywords = ["iceberg"]
3030

3131
[features]
32-
default = ["storage-fs", "storage-s3"]
32+
default = ["storage-fs", "storage-s3", "tokio"]
3333
storage-all = ["storage-fs", "storage-s3"]
3434

3535
storage-fs = ["opendal/services-fs"]
3636
storage-s3 = ["opendal/services-s3"]
3737

38+
async-std = ["dep:async-std"]
39+
tokio = ["dep:tokio"]
40+
3841
[dependencies]
3942
anyhow = { workspace = true }
4043
apache-avro = { workspace = true }
@@ -45,6 +48,7 @@ arrow-ord = { workspace = true }
4548
arrow-schema = { workspace = true }
4649
arrow-select = { workspace = true }
4750
arrow-string = { workspace = true }
51+
async-std = { workspace = true, optional = true, features = ["attributes"] }
4852
async-stream = { workspace = true }
4953
async-trait = { workspace = true }
5054
bimap = { workspace = true }
@@ -71,6 +75,7 @@ serde_derive = { workspace = true }
7175
serde_json = { workspace = true }
7276
serde_repr = { workspace = true }
7377
serde_with = { workspace = true }
78+
tokio = { workspace = true, optional = true }
7479
typed-builder = { workspace = true }
7580
url = { workspace = true }
7681
urlencoding = { workspace = true }
@@ -81,4 +86,3 @@ iceberg_test_utils = { path = "../test_utils", features = ["tests"] }
8186
pretty_assertions = { workspace = true }
8287
tempfile = { workspace = true }
8388
tera = { workspace = true }
84-
tokio = { workspace = true }

crates/iceberg/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,5 +50,7 @@ pub mod expr;
5050
pub mod transaction;
5151
pub mod transform;
5252

53+
mod runtime;
54+
5355
pub mod arrow;
5456
pub mod writer;

crates/iceberg/src/runtime/mod.rs

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
// This module contains the async runtime abstraction for iceberg.
19+
20+
use std::future::Future;
21+
use std::pin::Pin;
22+
use std::task::{Context, Poll};
23+
24+
pub enum JoinHandle<T> {
25+
#[cfg(feature = "tokio")]
26+
Tokio(tokio::task::JoinHandle<T>),
27+
#[cfg(all(feature = "async-std", not(feature = "tokio")))]
28+
AsyncStd(async_std::task::JoinHandle<T>),
29+
}
30+
31+
impl<T: Send + 'static> Future for JoinHandle<T> {
32+
type Output = T;
33+
34+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
35+
match self.get_mut() {
36+
#[cfg(feature = "tokio")]
37+
JoinHandle::Tokio(handle) => Pin::new(handle)
38+
.poll(cx)
39+
.map(|h| h.expect("tokio spawned task failed")),
40+
#[cfg(all(feature = "async-std", not(feature = "tokio")))]
41+
JoinHandle::AsyncStd(handle) => Pin::new(handle).poll(cx),
42+
}
43+
}
44+
}
45+
46+
#[allow(dead_code)]
47+
pub fn spawn<F>(f: F) -> JoinHandle<F::Output>
48+
where
49+
F: Future + Send + 'static,
50+
F::Output: Send + 'static,
51+
{
52+
#[cfg(feature = "tokio")]
53+
return JoinHandle::Tokio(tokio::task::spawn(f));
54+
55+
#[cfg(all(feature = "async-std", not(feature = "tokio")))]
56+
return JoinHandle::AsyncStd(async_std::task::spawn(f));
57+
}
58+
59+
#[allow(dead_code)]
60+
pub fn spawn_blocking<F, T>(f: F) -> JoinHandle<T>
61+
where
62+
F: FnOnce() -> T + Send + 'static,
63+
T: Send + 'static,
64+
{
65+
#[cfg(feature = "tokio")]
66+
return JoinHandle::Tokio(tokio::task::spawn_blocking(f));
67+
68+
#[cfg(all(feature = "async-std", not(feature = "tokio")))]
69+
return JoinHandle::AsyncStd(async_std::task::spawn_blocking(f));
70+
}
71+
72+
#[cfg(test)]
73+
mod tests {
74+
use super::*;
75+
76+
#[cfg(feature = "tokio")]
77+
#[tokio::test]
78+
async fn test_tokio_spawn() {
79+
let handle = spawn(async { 1 + 1 });
80+
assert_eq!(handle.await, 2);
81+
}
82+
83+
#[cfg(feature = "tokio")]
84+
#[tokio::test]
85+
async fn test_tokio_spawn_blocking() {
86+
let handle = spawn_blocking(|| 1 + 1);
87+
assert_eq!(handle.await, 2);
88+
}
89+
90+
#[cfg(all(feature = "async-std", not(feature = "tokio")))]
91+
#[async_std::test]
92+
async fn test_async_std_spawn() {
93+
let handle = spawn(async { 1 + 1 });
94+
assert_eq!(handle.await, 2);
95+
}
96+
97+
#[cfg(all(feature = "async-std", not(feature = "tokio")))]
98+
#[async_std::test]
99+
async fn test_async_std_spawn_blocking() {
100+
let handle = spawn_blocking(|| 1 + 1);
101+
assert_eq!(handle.await, 2);
102+
}
103+
}

0 commit comments

Comments
 (0)