Skip to content

Commit ae2d1ff

Browse files
committed
Kotlin promise coroutines:
commit 973112c Author: Tiago de Freitas Lima <[email protected]> Date: Mon May 23 15:15:56 2022 -0300 DefaultPromise starts to take a Executor in order to run CompletableFuture operations; KtorHTTPClient relies on CoroutineDispatcher from client commit 6b024b9 Author: Tiago de Freitas Lima <[email protected]> Date: Mon May 23 10:17:44 2022 -0300 (wip)
1 parent f69cef2 commit ae2d1ff

File tree

3 files changed

+35
-16
lines changed

3 files changed

+35
-16
lines changed

core/src/main/java/com/github/ljtfreitas/julian/DefaultPromise.java

+18-11
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.util.concurrent.CompletableFuture;
2626
import java.util.concurrent.CompletionException;
2727
import java.util.concurrent.ExecutionException;
28+
import java.util.concurrent.Executor;
2829
import java.util.function.BiConsumer;
2930
import java.util.function.BiFunction;
3031
import java.util.function.Consumer;
@@ -34,19 +35,25 @@
3435
class DefaultPromise<T> implements Promise<T> {
3536

3637
private final CompletableFuture<T> future;
38+
private final Executor executor;
3739

3840
DefaultPromise(CompletableFuture<T> future) {
41+
this(future, future.defaultExecutor());
42+
}
43+
44+
DefaultPromise(CompletableFuture<T> future, Executor executor) {
3945
this.future = future;
46+
this.executor = executor;
4047
}
4148

4249
@Override
4350
public Promise<T> onSuccess(Consumer<? super T> fn) {
44-
return new DefaultPromise<>(future.whenCompleteAsync((r, e) -> { if (e == null) fn.accept(r); }));
51+
return new DefaultPromise<>(future.whenCompleteAsync((r, e) -> { if (e == null) fn.accept(r); }, executor), executor);
4552
}
4653

4754
@Override
4855
public <R> Promise<R> then(Function<? super T, R> fn) {
49-
return new DefaultPromise<>(future.thenApplyAsync(fn));
56+
return new DefaultPromise<>(future.thenApplyAsync(fn, executor), executor);
5057
}
5158

5259
@Override
@@ -56,23 +63,23 @@ public <R> R fold(Function<? super T, R> success, Function<? super Exception, R>
5663
return failure.apply((Exception) e);
5764
else
5865
return success.apply(r);
59-
})).join().unsafe();
66+
}, executor), executor).join().unsafe();
6067

6168
}
6269

6370
@Override
6471
public <R> Promise<R> bind(Function<? super T, Promise<R>> fn) {
65-
return new DefaultPromise<>(future.thenComposeAsync(t -> fn.apply(t).future()));
72+
return new DefaultPromise<>(future.thenComposeAsync(t -> fn.apply(t).future(), executor), executor);
6673
}
6774

6875
@Override
6976
public <T2, R> Promise<R> zip(Promise<T2> other, BiFunction<? super T, ? super T2, R> fn) {
70-
return new DefaultPromise<>(future.thenCombineAsync(other.future(), fn));
77+
return new DefaultPromise<>(future.thenCombineAsync(other.future(), fn, executor), executor);
7178
}
7279

7380
@Override
7481
public Promise<T> recover(Function<? super Exception, T> fn) {
75-
return new DefaultPromise<>(future.exceptionally(t -> fn.apply((Exception) t)));
82+
return new DefaultPromise<>(future.exceptionally(t -> fn.apply((Exception) t)), executor);
7683
}
7784

7885
@Override
@@ -83,7 +90,7 @@ public <Err extends Exception> Promise<T> recover(Class<? extends Err> expected,
8390
return fn.apply(expected.cast(cause));
8491
else
8592
return r;
86-
}));
93+
}, executor), executor);
8794
}
8895

8996
@Override
@@ -94,7 +101,7 @@ public Promise<T> recover(Predicate<? super Exception> p, Function<? super Excep
94101
return fn.apply(cause);
95102
else
96103
return r;
97-
}));
104+
}, executor), executor);
98105
}
99106

100107
@Override
@@ -105,7 +112,7 @@ public <Err extends Exception> Promise<T> failure(Function<? super Exception, Er
105112
throw failure(fn.apply(cause));
106113
else
107114
return r;
108-
}));
115+
}, executor), executor);
109116
}
110117

111118
@Override
@@ -114,7 +121,7 @@ public Promise<T> onFailure(Consumer<? super Exception> fn) {
114121
Exception cause = deep(e);
115122
if (cause != null)
116123
fn.accept(cause);
117-
}));
124+
}, executor), executor);
118125
}
119126

120127
private RuntimeException failure(Exception e) {
@@ -141,6 +148,6 @@ public CompletableFuture<T> future() {
141148
public Promise<T> subscribe(Subscriber<? super T> subscriber) {
142149
BiConsumer<T, Throwable> handle = (r, e) -> { if (e == null) subscriber.success(r); else subscriber.failure((Exception) e); };
143150
BiConsumer<T, Throwable> done = (r, e) -> subscriber.done();
144-
return new DefaultPromise<>(future.whenCompleteAsync(handle).whenCompleteAsync(done));
151+
return new DefaultPromise<>(future.whenCompleteAsync(handle, executor).whenCompleteAsync(done, executor), executor);
145152
}
146153
}

core/src/main/java/com/github/ljtfreitas/julian/Promise.java

+4
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,10 @@ static <T> Promise<T> pending(CompletableFuture<T> future) {
8383
return new DefaultPromise<>(future);
8484
}
8585

86+
static <T> Promise<T> pending(CompletableFuture<T> future, Executor executor) {
87+
return new DefaultPromise<>(future, executor);
88+
}
89+
8690
static <T> Promise<T> pending(Supplier<T> fn) {
8791
return new DefaultPromise<>(supplyAsync(fn));
8892
}

http-client-ktor/src/main/kotlin/com/github/ljtfreitas/julian/k/http/client/ktor/KtorHTTPClient.kt

+13-5
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@ import com.github.ljtfreitas.julian.http.MediaType
3434
import com.github.ljtfreitas.julian.http.client.HTTPClient
3535
import com.github.ljtfreitas.julian.http.client.HTTPClientRequest
3636
import com.github.ljtfreitas.julian.http.client.HTTPClientResponse
37-
import com.github.ljtfreitas.julian.k.http.client.ktor.KtorHTTPClient.Companion.invoke
3837
import io.ktor.client.HttpClient
3938
import io.ktor.client.HttpClientConfig
4039
import io.ktor.client.call.receive
@@ -51,12 +50,17 @@ import io.ktor.http.HttpMethod
5150
import io.ktor.http.content.OutgoingContent
5251
import io.ktor.util.toMap
5352
import io.ktor.utils.io.ByteWriteChannel
53+
import kotlinx.coroutines.CoroutineDispatcher
5454
import kotlinx.coroutines.DelicateCoroutinesApi
5555
import kotlinx.coroutines.GlobalScope
56+
import kotlinx.coroutines.asExecutor
5657
import kotlinx.coroutines.future.future
5758
import kotlinx.coroutines.jdk9.asFlow
5859
import java.io.Closeable
5960
import java.util.Optional
61+
import java.util.concurrent.Executor
62+
import kotlin.coroutines.ContinuationInterceptor
63+
import kotlin.coroutines.CoroutineContext
6064

6165
@DelicateCoroutinesApi
6266
class KtorHTTPClient private constructor(private val client: HttpClient): HTTPClient, Closeable {
@@ -67,7 +71,7 @@ class KtorHTTPClient private constructor(private val client: HttpClient): HTTPCl
6771

6872
operator fun invoke(
6973
block: HttpClientConfig<*>.() -> Unit = {}
70-
) = KtorHTTPClient(HttpClient() {
74+
) = KtorHTTPClient(HttpClient {
7175
block()
7276
expectSuccess = false
7377
})
@@ -90,7 +94,7 @@ class KtorHTTPClient private constructor(private val client: HttpClient): HTTPCl
9094
}
9195

9296
override fun request(request: HTTPRequestDefinition) = HTTPClientRequest {
93-
Promise.pending(GlobalScope.future {
97+
Promise.pending(GlobalScope.future(client.coroutineContext) {
9498
val response: HttpResponse = client.request(request.path().toURL()) {
9599
method = HttpMethod.parse(request.method().name)
96100

@@ -104,7 +108,8 @@ class KtorHTTPClient private constructor(private val client: HttpClient): HTTPCl
104108
}
105109

106110
return@future response.asHTTPClientResponse()
107-
})
111+
112+
}, client.coroutineContext.javaExecutor())
108113
}
109114

110115
private fun HTTPHeaders.contentType() = select(HTTPHeader.CONTENT_TYPE).map { MediaType.valueOf(it.value()) }
@@ -135,4 +140,7 @@ class KtorHTTPClient private constructor(private val client: HttpClient): HTTPCl
135140
}
136141

137142
override fun close() = client.close()
138-
}
143+
144+
private fun CoroutineContext.javaExecutor(): Executor = (get(ContinuationInterceptor) as CoroutineDispatcher).asExecutor()
145+
}
146+

0 commit comments

Comments
 (0)