Skip to content

Commit 7da8bdf

Browse files
authored
Modify shared_queue example: use define multiple plugins in singleton. (tetratelabs#191)
Signed-off-by: Takeshi Yoneda <[email protected]>
1 parent ad0b327 commit 7da8bdf

File tree

9 files changed

+267
-31
lines changed

9 files changed

+267
-31
lines changed

e2e/e2e_test.go

+9-1
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,12 @@ func Test_shared_queue(t *testing.T) {
243243
defer kill()
244244
require.Eventually(t, func() bool {
245245
res, err := http.Get("http://localhost:18000")
246+
if err != nil || res.StatusCode != http.StatusOK {
247+
return false
248+
}
249+
defer res.Body.Close()
250+
251+
res, err = http.Get("http://localhost:18001")
246252
if err != nil {
247253
return false
248254
}
@@ -252,7 +258,9 @@ func Test_shared_queue(t *testing.T) {
252258
require.Eventually(t, func() bool {
253259
return checkMessage(stdErr.String(), []string{
254260
`enqueued data: {"key": ":method","value": "GET"}`,
255-
`dequeued data: {"key": ":method","value": "GET"}`,
261+
`dequeued data from http_request_headers`,
262+
`dequeued data from http_response_headers`,
263+
`dequeued data from tcp_data_hashes`,
256264
}, nil)
257265
}, 5*time.Second, time.Millisecond, stdErr.String())
258266
}

examples/dispatch_call_on_tick/main.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ type pluginContext struct {
4444
}
4545

4646
// Override types.DefaultPluginContext.
47-
func (ctx *pluginContext) OnPluginStart(vmConfigurationSize int) types.OnPluginStartStatus {
47+
func (ctx *pluginContext) OnPluginStart(pluginConfigurationSize int) types.OnPluginStartStatus {
4848
if err := proxywasm.SetTickPeriodMilliSeconds(tickMilliseconds); err != nil {
4949
proxywasm.LogCriticalf("failed to set tick period: %v", err)
5050
return types.OnPluginStartStatusFailed

examples/foreign_call_on_tick/main.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ type pluginContext struct {
4747
}
4848

4949
// Override types.DefaultPluginContext.
50-
func (ctx *pluginContext) OnPluginStart(vmConfigurationSize int) types.OnPluginStartStatus {
50+
func (ctx *pluginContext) OnPluginStart(pluginConfigurationSize int) types.OnPluginStartStatus {
5151
if err := proxywasm.SetTickPeriodMilliSeconds(tickMilliseconds); err != nil {
5252
proxywasm.LogCriticalf("failed to set tick period: %v", err)
5353
return types.OnPluginStartStatusFailed

examples/helloworld/main.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ type helloWorld struct {
4747
}
4848

4949
// Override types.DefaultPluginContext.
50-
func (ctx *helloWorld) OnPluginStart(vmConfigurationSize int) types.OnPluginStartStatus {
50+
func (ctx *helloWorld) OnPluginStart(pluginConfigurationSize int) types.OnPluginStartStatus {
5151
rand.Seed(time.Now().UnixNano())
5252

5353
proxywasm.LogInfo("proxy_on_vm_start from Go!")

examples/shared_queue/README.md

+40-11
Original file line numberDiff line numberDiff line change
@@ -7,28 +7,57 @@ There are two Wasm VMs are configured (See `envoy.yaml` for detail):
77
1. The one with `vm_id="receiver"` and the binary of `receiver/main.go`.
88
2. Another one with `vm_id="sender"` and the binary of `sender/main.go`.
99

10-
`receiver` VM runs as a singleton [Wasm Service](https://www.envoyproxy.io/docs/envoy/latest/configuration/other_features/wasm_service.html) which runs in the main thread, and registered a shared queue named `http_headers` by calling the `RegisterSharedQueue` host call.
10+
`receiver` VM runs as a singleton [Wasm Service](https://www.envoyproxy.io/docs/envoy/latest/configuration/other_features/wasm_service.html) which runs in the main thread, and there are **three** plugin configurations are given. These configuration values are `http_response_headers`, `http_request_headers` and `tcp_data_hashes`.
11+
Each of these corresponding PluginContext registers a shared queue whose name equals that configuration respectively.
1112

12-
`sender` VM runs in the http filter chain on worker threads, and enqueue request headers to the shared queue resolved by the `ResolveSharedQueue` with the args of `vm_id=receiver` and `name=http_headers`.
13+
`sender` VM runs in a http filter and a network filter chain on worker threads, and
14+
- enqueue request headers to the shared queue resolved by the `ResolveSharedQueue` with the args of (`vm_id=receiver`,`name=http_request_headers`) and (`vm_id=receiver`,`name=http_response_headers`).
15+
- enqueue hash values of tcp data frames to the shared queue resolved by the `ResolveSharedQueue` with the args of (`vm_id=receiver`,`name=tcp_data_hashes`).
1316

1417
See [this talk](https://www.youtube.com/watch?v=XdWmm_mtVXI&t=1171s) for detail.
1518

1619

1720
```bash
21+
wasm log receiver: queue "http_request_headers" registered as queueID=1 by contextID=1
22+
wasm log receiver: queue "http_request_headers" registered as queueID=1 by contextID=1
23+
wasm log receiver: queue "http_response_headers" registered as queueID=2 by contextID=2
24+
wasm log receiver: queue "tcp_data_hashes" registered as queueID=3 by contextID=3
25+
all clusters initialized. initializing init manager
26+
all dependencies initialized. starting workers
27+
wasm log sender: contextID=1 is configured for http
28+
wasm log sender: contextID=2 is configured for tcp
29+
wasm log sender: contextID=1 is configured for http
30+
wasm log sender: contextID=2 is configured for tcp
31+
32+
....
33+
34+
# curl localhost:18000
35+
1836
wasm log sender: enqueued data: {"key": ":authority","value": "localhost:18000"}
19-
wasm log receiver: dequeued data: {"key": ":authority","value": "localhost:18000"}
2037
wasm log sender: enqueued data: {"key": ":path","value": "/"}
21-
wasm log receiver: dequeued data: {"key": ":path","value": "/"}
2238
wasm log sender: enqueued data: {"key": ":method","value": "GET"}
23-
wasm log receiver: dequeued data: {"key": ":method","value": "GET"}
2439
wasm log sender: enqueued data: {"key": ":scheme","value": "http"}
25-
wasm log receiver: dequeued data: {"key": ":scheme","value": "http"}
2640
wasm log sender: enqueued data: {"key": "user-agent","value": "curl/7.68.0"}
27-
wasm log receiver: dequeued data: {"key": "user-agent","value": "curl/7.68.0"}
2841
wasm log sender: enqueued data: {"key": "accept","value": "*/*"}
29-
wasm log receiver: dequeued data: {"key": "accept","value": "*/*"}
3042
wasm log sender: enqueued data: {"key": "x-forwarded-proto","value": "http"}
31-
wasm log receiver: dequeued data: {"key": "x-forwarded-proto","value": "http"}
32-
wasm log sender: enqueued data: {"key": "x-request-id","value": "73a13840-6ca2-4f9f-a639-91b014c8d485"}
33-
wasm log receiver: dequeued data: {"key": "x-request-id","value": "73a13840-6ca2-4f9f-a639-91b014c8d485"}
43+
wasm log sender: enqueued data: {"key": "x-request-id","value": "57d77551-02e8-455c-bf86-45a0d7308a0e"}
44+
wasm log receiver: (contextID=1) dequeued data from http_request_headers(queueID=1): {"key": ":authority","value": "localhost:18000"}
45+
wasm log receiver: (contextID=1) dequeued data from http_request_headers(queueID=1): {"key": ":path","value": "/"}
46+
wasm log receiver: (contextID=1) dequeued data from http_request_headers(queueID=1): {"key": ":method","value": "GET"}
47+
wasm log receiver: (contextID=1) dequeued data from http_request_headers(queueID=1): {"key": ":scheme","value": "http"}
48+
wasm log receiver: (contextID=1) dequeued data from http_request_headers(queueID=1): {"key": "user-agent","value": "curl/7.68.0"}
49+
wasm log receiver: (contextID=1) dequeued data from http_request_headers(queueID=1): {"key": "accept","value": "*/*"}
50+
wasm log receiver: (contextID=1) dequeued data from http_request_headers(queueID=1): {"key": "x-forwarded-proto","value": "http"}
51+
wasm log receiver: (contextID=1) dequeued data from http_request_headers(queueID=1): {"key": "x-request-id","value": "57d77551-02e8-455c-bf86-45a0d7308a0e"}
52+
wasm log sender: (contextID=3) enqueued data: {"key": ":status","value": "200"}
53+
wasm log receiver: (contextID=2) dequeued data from http_response_headers(queueID=2): {"key": ":status","value": "200"}
54+
wasm log sender: (contextID=3) enqueued data: {"key": "content-length","value": "13"}
55+
wasm log receiver: (contextID=2) dequeued data from http_response_headers(queueID=2): {"key": "content-length","value": "13"}
56+
wasm log sender: (contextID=3) enqueued data: {"key": "content-type","value": "text/plain"}
57+
wasm log receiver: (contextID=2) dequeued data from http_response_headers(queueID=2): {"key": "content-type","value": "text/plain"}
58+
59+
# curl localhost:18001
60+
61+
wasm log sender: (contextID=4) enqueued data: 7d1a184bc958cdb9f1fee6591a3f2ae2
62+
wasm log receiver: (contextID=3) dequeued data from tcp_data_hashes(queueID=3): 7d1a184bc958cdb9f1fee6591a3f2ae2
3463
```

examples/shared_queue/envoy.yaml

+87-1
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,45 @@ bootstrap_extensions:
44
"@type": type.googleapis.com/envoy.extensions.wasm.v3.WasmService
55
singleton: true
66
config:
7+
# Used as a queue name
8+
configuration:
9+
"@type": type.googleapis.com/google.protobuf.StringValue
10+
value: "http_request_headers"
11+
# Use the same vm_config as below, so we can reuse the same VM for multiple queues.
12+
vm_config:
13+
vm_id: "receiver"
14+
runtime: "envoy.wasm.runtime.v8"
15+
code:
16+
local:
17+
filename: "./examples/shared_queue/receiver/main.go.wasm"
18+
19+
- name: envoy.bootstrap.wasm
20+
typed_config:
21+
"@type": type.googleapis.com/envoy.extensions.wasm.v3.WasmService
22+
singleton: true
23+
config:
24+
# Used as a queue name
25+
configuration:
26+
"@type": type.googleapis.com/google.protobuf.StringValue
27+
value: "http_response_headers"
28+
# Use the same vm_config as above, so we can reuse the same VM for multiple queues.
29+
vm_config:
30+
vm_id: "receiver"
31+
runtime: "envoy.wasm.runtime.v8"
32+
code:
33+
local:
34+
filename: "./examples/shared_queue/receiver/main.go.wasm"
35+
36+
- name: envoy.bootstrap.wasm
37+
typed_config:
38+
"@type": type.googleapis.com/envoy.extensions.wasm.v3.WasmService
39+
singleton: true
40+
config:
41+
configuration:
42+
# Used as a queue name
43+
"@type": type.googleapis.com/google.protobuf.StringValue
44+
value: "tcp_data_hashes"
45+
# Use the same vm_config as above, so we can reuse the same VM for multiple queues.
746
vm_config:
847
vm_id: "receiver"
948
runtime: "envoy.wasm.runtime.v8"
@@ -13,7 +52,7 @@ bootstrap_extensions:
1352

1453
static_resources:
1554
listeners:
16-
- name: main
55+
- name: http
1756
address:
1857
socket_address:
1958
address: 0.0.0.0
@@ -43,13 +82,60 @@ static_resources:
4382
typed_config:
4483
"@type": type.googleapis.com/envoy.extensions.filters.http.wasm.v3.Wasm
4584
config:
85+
# Instruct that the PluginContext should behave as a Http filter.
86+
configuration:
87+
"@type": type.googleapis.com/google.protobuf.StringValue
88+
value: "http"
89+
# Use the same vm_config as below, so we can reuse the same VM for multiple queues.
4690
vm_config:
4791
vm_id: "sender"
4892
runtime: "envoy.wasm.runtime.v8"
4993
code:
5094
local:
5195
filename: "./examples/shared_queue/sender/main.go.wasm"
5296
- name: envoy.filters.http.router
97+
- name: tcp
98+
address:
99+
socket_address:
100+
address: 0.0.0.0
101+
port_value: 18001
102+
filter_chains:
103+
- filters:
104+
- name: envoy.filters.network.wasm
105+
typed_config:
106+
"@type": type.googleapis.com/envoy.extensions.filters.network.wasm.v3.Wasm
107+
config:
108+
# Instruct that the PluginContext should behave as a Tcp filter.
109+
configuration:
110+
"@type": type.googleapis.com/google.protobuf.StringValue
111+
value: "tcp"
112+
# Use the same vm_config as above, so we can reuse the same VM for multiple queues.
113+
vm_config:
114+
vm_id: "sender"
115+
runtime: "envoy.wasm.runtime.v8"
116+
code:
117+
local:
118+
filename: "./examples/shared_queue/sender/main.go.wasm"
119+
120+
- name: envoy.tcp_proxy
121+
typed_config:
122+
"@type": type.googleapis.com/envoy.extensions.filters.network.tcp_proxy.v3.TcpProxy
123+
stat_prefix: ingress
124+
cluster: admin
125+
clusters:
126+
- name: admin
127+
connect_timeout: 5000s
128+
type: strict_dns
129+
lb_policy: round_robin
130+
load_assignment:
131+
cluster_name: admin
132+
endpoints:
133+
- lb_endpoints:
134+
- endpoint:
135+
address:
136+
socket_address:
137+
address: 0.0.0.0
138+
port_value: 8001
53139

54140
admin:
55141
access_log_path: "/dev/null"

examples/shared_queue/receiver/main.go

+19-8
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,12 @@
1515
package main
1616

1717
import (
18+
"fmt"
19+
1820
"github.com/tetratelabs/proxy-wasm-go-sdk/proxywasm"
1921
"github.com/tetratelabs/proxy-wasm-go-sdk/proxywasm/types"
2022
)
2123

22-
const queueName = "http_headers"
23-
2424
func main() {
2525
proxywasm.SetVMContext(&vmContext{})
2626
}
@@ -32,23 +32,34 @@ type vmContext struct {
3232
}
3333

3434
// Override types.DefaultVMContext.
35-
func (*vmContext) NewPluginContext(uint32) types.PluginContext {
36-
return &receiverPluginContext{}
35+
func (*vmContext) NewPluginContext(contextID uint32) types.PluginContext {
36+
return &receiverPluginContext{contextID: contextID}
3737
}
3838

3939
type receiverPluginContext struct {
4040
// Embed the default plugin context here,
4141
// so that we don't need to reimplement all the methods.
42+
contextID uint32
4243
types.DefaultPluginContext
44+
queueName string
4345
}
4446

4547
// Override types.DefaultPluginContext.
46-
func (ctx *receiverPluginContext) OnPluginStart(vmConfigurationSize int) types.OnPluginStartStatus {
47-
queueID, err := proxywasm.RegisterSharedQueue(queueName)
48+
func (ctx *receiverPluginContext) OnPluginStart(pluginConfigurationSize int) types.OnPluginStartStatus {
49+
// Get Plugin configuration.
50+
config, err := proxywasm.GetPluginConfiguration(pluginConfigurationSize)
51+
if err != nil {
52+
panic(fmt.Sprintf("failed to get plugin config: %v", err))
53+
}
54+
55+
// Treat the config as the queue name for receiving.
56+
ctx.queueName = string(config)
57+
58+
queueID, err := proxywasm.RegisterSharedQueue(ctx.queueName)
4859
if err != nil {
4960
panic("failed register queue")
5061
}
51-
proxywasm.LogInfof("queue \"%s\" registered as id=%d", queueName, queueID)
62+
proxywasm.LogInfof("queue \"%s\" registered as queueID=%d by contextID=%d", ctx.queueName, queueID, ctx.contextID)
5263
return types.OnPluginStartStatusOK
5364
}
5465

@@ -59,7 +70,7 @@ func (ctx *receiverPluginContext) OnQueueReady(queueID uint32) {
5970
case types.ErrorStatusEmpty:
6071
return
6172
case nil:
62-
proxywasm.LogInfof("dequeued data: %s", string(data))
73+
proxywasm.LogInfof("(contextID=%d) dequeued data from %s(queueID=%d): %s", ctx.contextID, ctx.queueName, queueID, string(data))
6374
default:
6475
proxywasm.LogCriticalf("error retrieving data from queue %d: %v", queueID, err)
6576
}

0 commit comments

Comments
 (0)