Skip to content

Commit 0b6e4e1

Browse files
authored
APISIX-38 Allow async plugins (#313)
* APISIX-38 Allow async plugins * Fix E2eE tests * APISIX-38 Document the threading model and async plugins
1 parent f58128c commit 0b6e4e1

File tree

5 files changed

+116
-9
lines changed

5 files changed

+116
-9
lines changed

.github/workflows/runner-e2e.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ jobs:
5959
6060
- name: startup apisix
6161
run: |
62-
docker-compose -f ci/docker-compose.yml up -d
62+
docker compose -f ci/docker-compose.yml up -d
6363
sleep 5
6464
6565
- name: install ginkgo cli

docs/en/latest/the-internal-of-apisix-java-plugin-runner.md

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,3 +78,39 @@ The current type takes the following values
7878
* 2 means http_req_call
7979

8080
The binary data generated by the flatbuffer serialization is placed in the data segment.
81+
82+
## Threading model
83+
84+
Apisix plugin runner will run your plugins directly onto the event loop.
85+
86+
While this empower the best performance possible, as a plugin developer you will have the responsibility
87+
never to block threads on the event loop. Doing so would result in catastrophic performance drop.
88+
89+
Hopefully one can write asynchronous plugins easily: just call the `PluginFilterChain` as a callback once you
90+
are done.
91+
92+
For instance:
93+
94+
```java
95+
@Component
96+
public class AsyncResponseFilter implements PluginFilter {
97+
@Override
98+
public String name() {
99+
return "AyncResponseFilter";
100+
}
101+
102+
@Override
103+
public void postFilter(PostRequest request, PostResponse response, PluginFilterChain chain) {
104+
callExternalService()
105+
.thenAccept(body -> {
106+
response.setBody(body);
107+
chain.postFilter(request, response);
108+
});
109+
}
110+
111+
// This simulates calls to an external service
112+
CompletableFuture<String> callExternalService() {
113+
return CompletableFuture.completedFuture("response_body");
114+
}
115+
}
116+
```

runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/RpcCallHandler.java

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -207,11 +207,20 @@ private void doPostFilter(ChannelHandlerContext ctx) {
207207
postReq.initCtx(conf.getConfig());
208208
postReq.setVars(nginxVars);
209209

210-
PluginFilterChain chain = conf.getChain();
211-
chain.postFilter(postReq, postResp);
210+
PluginFilterChain chain = conf.getChain()
211+
.addFilter(new PluginFilter() {
212+
@Override
213+
public String name() {
214+
return null;
215+
}
212216

213-
ChannelFuture future = ctx.writeAndFlush(postResp);
214-
future.addListeners(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
217+
@Override
218+
public void postFilter(PostRequest request, PostResponse response, PluginFilterChain chain) {
219+
ChannelFuture future = ctx.writeAndFlush(postResp);
220+
future.addListeners(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
221+
}
222+
});
223+
chain.postFilter(postReq, postResp);
215224
}
216225

217226
private void handleExtraInfo(ChannelHandlerContext ctx, ExtraInfoResponse request) {
@@ -256,12 +265,21 @@ private void doFilter(ChannelHandlerContext ctx) {
256265
currReq.initCtx(currResp, conf.getConfig());
257266
currReq.setVars(nginxVars);
258267

259-
PluginFilterChain chain = conf.getChain();
260-
chain.filter(currReq, currResp);
268+
PluginFilterChain chain = conf.getChain()
269+
.addFilter(new PluginFilter() {
270+
@Override
271+
public String name() {
272+
return "writeFilter";
273+
}
261274

262-
ChannelFuture future = ctx.writeAndFlush(currResp);
263-
future.addListeners(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
275+
@Override
276+
public void filter(HttpRequest request, HttpResponse response, PluginFilterChain chain) {
277+
ChannelFuture future = ctx.writeAndFlush(currResp);
278+
future.addListeners(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
279+
}
280+
});
264281

282+
chain.filter(currReq, currResp);
265283
}
266284

267285
private void handleHttpReqCall(ChannelHandlerContext ctx, HttpRequest request) {

runner-plugin-sdk/src/main/java/org/apache/apisix/plugin/runner/filter/PluginFilterChain.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.apisix.plugin.runner.PostRequest;
2323
import org.apache.apisix.plugin.runner.PostResponse;
2424

25+
import java.util.ArrayList;
2526
import java.util.List;
2627

2728
public class PluginFilterChain {
@@ -47,6 +48,12 @@ public List<PluginFilter> getFilters() {
4748
return filters;
4849
}
4950

51+
public PluginFilterChain addFilter(PluginFilter filter) {
52+
ArrayList<PluginFilter> pluginFilters = new ArrayList<>(filters);
53+
pluginFilters.add(filter);
54+
return new PluginFilterChain(pluginFilters);
55+
}
56+
5057
public void filter(HttpRequest request, HttpResponse response) {
5158
if (this.index < filters.size()) {
5259
PluginFilter filter = filters.get(this.index);
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.apisix.plugin.runner.filter;
19+
20+
import java.util.concurrent.CompletableFuture;
21+
22+
import org.apache.apisix.plugin.runner.PostRequest;
23+
import org.apache.apisix.plugin.runner.PostResponse;
24+
import org.springframework.stereotype.Component;
25+
26+
@Component
27+
public class AsyncResponseFilter implements PluginFilter {
28+
@Override
29+
public String name() {
30+
return "AsyncResponseFilter";
31+
}
32+
33+
@Override
34+
public void postFilter(PostRequest request, PostResponse response, PluginFilterChain chain) {
35+
callExternalService()
36+
.thenAccept(body -> {
37+
response.setBody(body);
38+
chain.postFilter(request, response);
39+
});
40+
}
41+
42+
// This simulates calls to an external service
43+
CompletableFuture<String> callExternalService() {
44+
return CompletableFuture.completedFuture("response_body");
45+
}
46+
}

0 commit comments

Comments
 (0)