Skip to content

Commit ba72997

Browse files
committed
Add component to abstract async execution
Provides an abstraction to submit asynchronous tasks, optionally with a delay or delay + repetition and implementations based on Java's `ThreadPoolExecutor` and Vert.X.
1 parent eefbbf7 commit ba72997

File tree

27 files changed

+1334
-0
lines changed

27 files changed

+1334
-0
lines changed

bom/build.gradle.kts

+4
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,10 @@ dependencies {
3535
api(project(":polaris-version"))
3636
api(project(":polaris-persistence-varint"))
3737

38+
api(project(":polaris-async-api"))
39+
api(project(":polaris-async-java"))
40+
api(project(":polaris-async-vertx"))
41+
3842
api(project(":polaris-config-docs-annotations"))
3943
api(project(":polaris-config-docs-generator"))
4044

gradle/libs.versions.toml

+3
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,9 @@ swagger-annotations = { module = "io.swagger:swagger-annotations", version.ref =
9898
swagger-jaxrs = { module = "io.swagger:swagger-jaxrs", version.ref = "swagger" }
9999
testcontainers-bom = { module = "org.testcontainers:testcontainers-bom", version = "1.20.6" }
100100
threeten-extra = { module = "org.threeten:threeten-extra", version = "1.8.0" }
101+
vertx-core = { module = "io.vertx:vertx-core", version = "4.5.14" }
102+
weld-se-core = { module = "org.jboss.weld.se:weld-se-core", version = "6.0.2.Final" }
103+
weld-junit5 = { module = "org.jboss.weld:weld-junit5", version = "4.0.4.Final" }
101104

102105
[plugins]
103106
jandex = { id = "org.kordamp.gradle.jandex", version = "2.1.0" }

gradle/projects.main.properties

+5
Original file line numberDiff line numberDiff line change
@@ -44,3 +44,8 @@ polaris-persistence-varint=nosql/persistence/varint
4444
polaris-config-docs-annotations=tools/config-docs/annotations
4545
polaris-config-docs-generator=tools/config-docs/generator
4646
polaris-config-docs-site=tools/config-docs/site
47+
48+
# executor abstraction
49+
polaris-async-api=nosql/async/api
50+
polaris-async-java=nosql/async/java
51+
polaris-async-vertx=nosql/async/vertx

nosql/async/README.md

+31
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
<!--
2+
Licensed to the Apache Software Foundation (ASF) under one
3+
or more contributor license agreements. See the NOTICE file
4+
distributed with this work for additional information
5+
regarding copyright ownership. The ASF licenses this file
6+
to you under the Apache License, Version 2.0 (the
7+
"License"); you may not use this file except in compliance
8+
with the License. You may obtain a copy of the License at
9+
10+
http://www.apache.org/licenses/LICENSE-2.0
11+
12+
Unless required by applicable law or agreed to in writing,
13+
software distributed under the License is distributed on an
14+
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
KIND, either express or implied. See the License for the
16+
specific language governing permissions and limitations
17+
under the License.
18+
-->
19+
20+
# Async execution API
21+
22+
Provides an abstraction to submit asynchronous tasks, optionally with a delay or delay + repetition and implementations
23+
based on Java's `ThreadPoolExecutor` and Vert.X.
24+
25+
## Code structure
26+
27+
The code is structured into multiple modules. Consuming code should almost always pull in only the API module.
28+
29+
* `polaris-async-api` provides the necessary Java interfaces and immutable types.
30+
* `polaris-async-java` implementation leveraging `CompletableFuture.delayedExecutor` for delayed/scheduled invocations.
31+
* `polaris-async-vertx` implementation leveraging Vert.X for delayed/scheduled invocations.

nosql/async/api/build.gradle.kts

+53
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
plugins {
21+
alias(libs.plugins.jandex)
22+
id("polaris-server")
23+
}
24+
25+
description = "Polaris async execution API"
26+
27+
dependencies {
28+
compileOnly(libs.jakarta.annotation.api)
29+
compileOnly(libs.jakarta.validation.api)
30+
compileOnly(libs.jakarta.inject.api)
31+
compileOnly(libs.jakarta.enterprise.cdi.api)
32+
33+
compileOnly(libs.smallrye.config.core)
34+
compileOnly(platform(libs.quarkus.bom))
35+
compileOnly("io.quarkus:quarkus-core")
36+
37+
compileOnly(project(":polaris-immutables"))
38+
annotationProcessor(project(":polaris-immutables", configuration = "processor"))
39+
40+
implementation(platform(libs.jackson.bom))
41+
implementation("com.fasterxml.jackson.core:jackson-databind")
42+
43+
testFixturesCompileOnly(platform(libs.jackson.bom))
44+
testFixturesCompileOnly("com.fasterxml.jackson.core:jackson-databind")
45+
46+
testFixturesApi(libs.jakarta.annotation.api)
47+
testFixturesApi(libs.jakarta.validation.api)
48+
testFixturesApi(libs.jakarta.inject.api)
49+
testFixturesApi(libs.jakarta.enterprise.cdi.api)
50+
51+
testFixturesImplementation(libs.weld.se.core)
52+
testFixturesImplementation(libs.weld.junit5)
53+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.polaris.async;
20+
21+
import com.fasterxml.jackson.annotation.JsonFormat;
22+
import com.fasterxml.jackson.annotation.JsonInclude;
23+
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
24+
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
25+
import io.smallrye.config.ConfigMapping;
26+
import io.smallrye.config.WithDefault;
27+
import java.time.Duration;
28+
import java.util.Optional;
29+
import java.util.OptionalInt;
30+
import org.apache.polaris.immutables.PolarisImmutable;
31+
32+
/** Advanced configuration options to tune async activities. */
33+
@PolarisImmutable
34+
@ConfigMapping(prefix = "polaris.async")
35+
@JsonSerialize(as = ImmutableAsyncConfiguration.class)
36+
@JsonDeserialize(as = ImmutableAsyncConfiguration.class)
37+
public interface AsyncConfiguration {
38+
39+
String DEFAULT_THREAD_KEEP_ALIVE_STRING = "PT1S";
40+
Duration DEFAULT_THREAD_KEEP_ALIVE = Duration.parse(DEFAULT_THREAD_KEEP_ALIVE_STRING);
41+
42+
/** Duration to keep idle threads alive. */
43+
@WithDefault(DEFAULT_THREAD_KEEP_ALIVE_STRING)
44+
@JsonInclude(JsonInclude.Include.NON_ABSENT)
45+
@JsonFormat(shape = JsonFormat.Shape.STRING)
46+
Optional<Duration> threadKeepAlive();
47+
48+
/** Maximum number of threads available for asynchronous execution. Default is unlimited. */
49+
@JsonInclude(JsonInclude.Include.NON_ABSENT)
50+
OptionalInt maxThreads();
51+
52+
static ImmutableAsyncConfiguration.Builder builder() {
53+
return ImmutableAsyncConfiguration.builder();
54+
}
55+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.polaris.async;
20+
21+
import jakarta.enterprise.context.ApplicationScoped;
22+
import java.time.Duration;
23+
import java.util.concurrent.Callable;
24+
25+
/**
26+
* Abstraction for platform/environment specific scheduler implementations.
27+
*
28+
* <p>Quarkus production systems use Vert.x, tests usually use Java executors.
29+
*
30+
* <p>Implementations, like Java executors or Vert.X, are {@link
31+
* ApplicationScoped @ApplicationScoped}. There's also a CDI decorator to propagate the thread
32+
* context.
33+
*/
34+
public interface AsyncExec {
35+
36+
default <R> Cancelable<R> submit(Callable<R> callable) {
37+
return schedule(callable, Duration.ZERO);
38+
}
39+
40+
/**
41+
* Asynchronously run the given {@linkplain Callable callable} after the provided {@linkplain
42+
* Duration delay}. If the delay is not positive, the function is scheduled for immediate
43+
* execution.
44+
*/
45+
<R> Cancelable<R> schedule(Callable<R> callable, Duration delay);
46+
47+
default Cancelable<Void> schedule(Runnable runnable, Duration delay) {
48+
return schedule(
49+
() -> {
50+
runnable.run();
51+
return null;
52+
},
53+
delay);
54+
}
55+
56+
default Cancelable<Void> schedulePeriodic(Runnable runnable, Duration delay) {
57+
return schedulePeriodic(runnable, delay, delay);
58+
}
59+
60+
Cancelable<Void> schedulePeriodic(Runnable callable, Duration initialDelay, Duration delay);
61+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.polaris.async;
20+
21+
import java.util.concurrent.CompletionStage;
22+
23+
public interface Cancelable<R> {
24+
/**
25+
* Attempt to cancel the delayed execution of a callable. Already running callables are not
26+
* interrupted. A callable may still be invoked after calling this function, because of side
27+
* effects and race conditions.
28+
*/
29+
void cancel();
30+
31+
CompletionStage<R> completionStage();
32+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
<!--
2+
~ Licensed to the Apache Software Foundation (ASF) under one
3+
~ or more contributor license agreements. See the NOTICE file
4+
~ distributed with this work for additional information
5+
~ regarding copyright ownership. The ASF licenses this file
6+
~ to you under the Apache License, Version 2.0 (the
7+
~ "License"); you may not use this file except in compliance
8+
~ with the License. You may obtain a copy of the License at
9+
~
10+
~ http://www.apache.org/licenses/LICENSE-2.0
11+
~
12+
~ Unless required by applicable law or agreed to in writing,
13+
~ software distributed under the License is distributed on an
14+
~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
~ KIND, either express or implied. See the License for the
16+
~ specific language governing permissions and limitations
17+
~ under the License.
18+
-->
19+
20+
<beans xmlns="https://jakarta.ee/xml/ns/jakartaee"
21+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
22+
xsi:schemaLocation="https://jakarta.ee/xml/ns/jakartaee https://jakarta.ee/xml/ns/jakartaee/beans_4_0.xsd">
23+
<!-- File required by Weld (used for testing), not by Quarkus -->
24+
</beans>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.polaris.async;
20+
21+
import jakarta.enterprise.context.ApplicationScoped;
22+
import java.util.concurrent.atomic.AtomicInteger;
23+
24+
@ApplicationScoped
25+
class AppScopedChecker {
26+
static final AtomicInteger COUNTER = new AtomicInteger();
27+
28+
int getAndIncrement() {
29+
return COUNTER.getAndIncrement();
30+
}
31+
}

0 commit comments

Comments
 (0)