Skip to content

Commit bc56227

Browse files
committed
tasks: complete background tasks implementation, add integration test
1 parent c3be619 commit bc56227

File tree

6 files changed

+53
-9
lines changed

6 files changed

+53
-9
lines changed

apps/fx-test-app/src/lib.rs

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use {
1313
sleep,
1414
HttpRequest,
1515
HttpResponse,
16-
io::{http::{fetch, HttpBody}, blob::BlobGetError, kv::{self, KvSetNxPxError}, env},
16+
io::{http::{fetch, HttpBody}, blob::BlobGetError, kv::{self, KvSetNxPxError}, env, tasks},
1717
StatusCode,
1818
utils::{axum::handle_request, migrations::{Migrations, Migration, SqlMigrationError}},
1919
random,
@@ -88,6 +88,8 @@ pub async fn http(mut req: HttpRequest) -> HttpResponse {
8888
.route("/test/kv/distributed-lock", get(kv_distributed_lock))
8989
.route("/test/kv/pubsub/subscribe", get(kv_pubsub_subscribe))
9090
.route("/test/kv/pubsub/publish", post(kv_pubsub_publish))
91+
.route("/test/tasks/background/start", post(kv_tasks_background_start))
92+
.route("/test/tasks/background/status", get(kv_tasks_background_status))
9193
.route("/_fx/cron", get(handle_cron))
9294
.route("/", get(home))
9395
.layer(Extension(Metrics::new())),
@@ -567,6 +569,22 @@ async fn kv_pubsub_publish(Json(req): Json<KvPubsubPublishRequest>) -> &'static
567569
"ok.\n"
568570
}
569571

572+
async fn kv_tasks_background_start() -> &'static str {
573+
tasks::run_in_background(async {
574+
sleep(Duration::from_secs(1)).await;
575+
kv::Kv::new("test-namespace").set("background_task_status", "done").await;
576+
});
577+
578+
"ok.\n"
579+
}
580+
581+
async fn kv_tasks_background_status() -> (StatusCode, &'static str) {
582+
match kv::Kv::new("test-namespace").get("background_task_status").await {
583+
Some(_) => (StatusCode::OK, "done."),
584+
None => (StatusCode::NOT_FOUND, "not done yet.")
585+
}
586+
}
587+
570588
#[derive(Clone)]
571589
struct Metrics {
572590
test_counter: Counter,

fx-runtime/tests/integration_test.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -700,6 +700,28 @@ async fn kv_pubsub() {
700700
assert_eq!("result: 129", result.text().await.unwrap());
701701
}
702702

703+
#[tokio::test]
704+
async fn task_background() {
705+
let client = init_fx_server().await;
706+
707+
// before task is started, kv key does not exist
708+
let status = client.get("/test/tasks/background/status").send().await.unwrap();
709+
assert!(status.status().as_u16() == 404);
710+
711+
// start task
712+
let task = client.post("/test/tasks/background/start").send().await.unwrap();
713+
assert!(task.status().is_success());
714+
715+
// after task is started, kv key does not exist immediately
716+
let status = client.get("/test/tasks/background/status").send().await.unwrap();
717+
assert!(status.status().as_u16() == 404);
718+
719+
// but after some time, it should be present
720+
sleep(Duration::from_secs(2)).await;
721+
let status = client.get("/test/tasks/background/status").send().await.unwrap();
722+
assert!(status.status().is_success());
723+
}
724+
703725
pub struct TestClient {
704726
client: reqwest::Client,
705727
base_url: String,

fx-sdk/src/api/tasks.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
use {
22
futures::FutureExt,
3-
crate::sys::FunctionResource,
3+
crate::sys::{FunctionResource, add_function_resource, fx_tasks_background_spawn},
44
};
55

66
/// runs a task in background after request processing is finished
77
pub fn run_in_background<F>(future: F) where F: Future<Output = ()> + 'static {
8-
let task_resource = FunctionResource::BackgroundTask(future.boxed_local());
9-
todo!()
8+
let resource_id = add_function_resource(FunctionResource::BackgroundTask(future.boxed_local())).as_u64();
9+
unsafe { fx_tasks_background_spawn(resource_id) };
1010
}

fx-sdk/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ use {
3535
};
3636

3737
pub mod io {
38-
pub use crate::api::{blob, http, kv, env};
38+
pub use crate::api::{blob, http, kv, env, tasks};
3939
}
4040

4141
pub mod sys;

fx-sdk/src/sys/mod.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ pub extern "C" fn _fx_future_poll(future_resource_id: u64) -> i64 {
4747
match &mut *future_resource {
4848
FunctionResource::FunctionResponseFuture(v) => v
4949
.poll_unpin(&mut context)
50-
.map(|v| FunctionResource::from(v)),
50+
.map(|v| Some(FunctionResource::from(v))),
5151
FunctionResource::HttpBody(v) => {
5252
let body = std::mem::replace(&mut v.0, HttpBodyInner::Empty);
5353

@@ -84,15 +84,20 @@ pub extern "C" fn _fx_future_poll(future_resource_id: u64) -> i64 {
8484

8585
v.0 = body;
8686

87-
poll
87+
poll.map(|v| Some(v))
88+
},
89+
FunctionResource::BackgroundTask(v) => match v.poll_unpin(&mut context) {
90+
std::task::Poll::Pending => std::task::Poll::Pending,
91+
std::task::Poll::Ready(_) => std::task::Poll::Ready(None),
8892
},
8993
_other => panic!("resource is not future"),
9094
}
9195
});
9296

9397
(match future_poll_result {
9498
Poll::Pending => FuturePollResult::Pending,
95-
Poll::Ready(resource) => {
99+
Poll::Ready(None) => FuturePollResult::Ready,
100+
Poll::Ready(Some(resource)) => {
96101
replace_function_resource(&future_resource_id, resource);
97102
FuturePollResult::Ready
98103
}

fx-sdk/src/sys/resource.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -279,7 +279,6 @@ pub fn replace_function_resource_with<F: FnOnce(FunctionResource) -> FunctionRes
279279
})
280280
}
281281

282-
283282
/// returns length of serialized resource
284283
pub fn serialize_function_resource(resource_id: &FunctionResourceId) -> u64 {
285284
FUNCTION_RESOURCES.with_borrow_mut(|resources| {

0 commit comments

Comments
 (0)