Skip to content

Commit 7a58958

Browse files
authored
chore: E2E tests for Serving source (#2460)
1 parent f554623 commit 7a58958

File tree

7 files changed

+319
-3
lines changed

7 files changed

+319
-3
lines changed

Diff for: .github/workflows/ci.yaml

+1
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,7 @@ jobs:
217217
idle-source-e2e,
218218
monovertex-e2e,
219219
builtin-source-e2e,
220+
serving-e2e,
220221
]
221222
include:
222223
- driver: redis

Diff for: test/e2e-api/main.go

+4
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,10 @@ func main() {
3939
httpController := NewHttpController()
4040
http.HandleFunc("/http/send-message", httpController.SendMessage)
4141

42+
servingController := NewServingController()
43+
http.HandleFunc("/serving/send-message", servingController.SendMessage)
44+
http.HandleFunc("/serving/fetch-results", servingController.FetchResults)
45+
4246
// initialize NATS handler
4347
natsController := NewNatsController("nats", "testingtoken")
4448
http.HandleFunc("/nats/pump-subject", natsController.PumpSubject)

Diff for: test/e2e-api/serving.go

+133
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
/*
2+
Copyright 2022 The Numaproj Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package main
18+
19+
import (
20+
"bytes"
21+
"crypto/tls"
22+
"fmt"
23+
"io"
24+
"log"
25+
"net/http"
26+
)
27+
28+
type ServingController struct {
29+
client *http.Client
30+
}
31+
32+
func NewServingController() *ServingController {
33+
return &ServingController{
34+
client: &http.Client{
35+
Transport: &http.Transport{
36+
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
37+
},
38+
},
39+
}
40+
}
41+
42+
func (h *ServingController) SendMessage(w http.ResponseWriter, r *http.Request) {
43+
host := r.URL.Query().Get("host")
44+
sync := r.URL.Query().Get("sync")
45+
reqId := r.URL.Query().Get("reqId")
46+
reqBytes, err := io.ReadAll(r.Body)
47+
if err != nil {
48+
log.Println(err)
49+
http.Error(w, err.Error(), http.StatusBadRequest)
50+
return
51+
}
52+
53+
uri := fmt.Sprintf("https://%s:8443/v1/process/sync", host)
54+
if sync == "false" {
55+
uri = fmt.Sprintf("https://%s:8443/v1/process/async", host)
56+
}
57+
58+
postReq, err := http.NewRequest("POST", uri, bytes.NewBuffer(reqBytes))
59+
if err != nil {
60+
log.Println(err)
61+
http.Error(w, err.Error(), http.StatusInternalServerError)
62+
return
63+
}
64+
if reqId != "" {
65+
postReq.Header.Set("X-Numaflow-Id", reqId)
66+
}
67+
68+
resp, err := h.client.Do(postReq)
69+
if err != nil {
70+
log.Println(err)
71+
http.Error(w, err.Error(), http.StatusInternalServerError)
72+
return
73+
}
74+
75+
if resp.StatusCode != http.StatusOK {
76+
log.Printf("[ERROR] Expected %d, Got %d", http.StatusOK, resp.StatusCode)
77+
http.Error(w, fmt.Sprintf("Bad status: %s", resp.Status), http.StatusInternalServerError)
78+
return
79+
}
80+
81+
body, err := io.ReadAll(resp.Body)
82+
if err != nil {
83+
log.Printf("[ERROR] Reading response body from Serving source: %v", err)
84+
http.Error(w, err.Error(), http.StatusInternalServerError)
85+
return
86+
}
87+
_, _ = w.Write(body)
88+
}
89+
90+
func (h *ServingController) FetchResults(w http.ResponseWriter, r *http.Request) {
91+
host := r.URL.Query().Get("host")
92+
reqId := r.URL.Query().Get("reqId")
93+
94+
if reqId == "" {
95+
http.Error(w, "request id can not be empty for fetch API", http.StatusInternalServerError)
96+
return
97+
}
98+
99+
uri := fmt.Sprintf("https://%s:8443/v1/process/fetch?id=%s", host, reqId)
100+
101+
req, err := http.NewRequest("GET", uri, nil)
102+
if err != nil {
103+
log.Println(err)
104+
http.Error(w, err.Error(), http.StatusInternalServerError)
105+
return
106+
}
107+
108+
resp, err := h.client.Do(req)
109+
if err != nil {
110+
log.Println(err)
111+
http.Error(w, err.Error(), http.StatusInternalServerError)
112+
return
113+
}
114+
115+
if resp.StatusCode != http.StatusOK {
116+
log.Printf("[ERROR] Expected %d, Got %d", http.StatusOK, resp.StatusCode)
117+
http.Error(w, fmt.Sprintf("Bad status: %s", resp.Status), http.StatusInternalServerError)
118+
return
119+
}
120+
121+
body, err := io.ReadAll(resp.Body)
122+
if err != nil {
123+
log.Printf("[ERROR] Reading response body from Serving source: %v", err)
124+
http.Error(w, err.Error(), http.StatusInternalServerError)
125+
return
126+
}
127+
_, _ = w.Write(body)
128+
}
129+
130+
// Close closes the http client
131+
func (h *ServingController) Close() {
132+
h.client.CloseIdleConnections()
133+
}

Diff for: test/fixtures/e2eapi.go

+5-3
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,8 @@ func InvokeE2EAPIPOST(format string, body string, args ...interface{}) string {
7272
if err == nil && resp.StatusCode < 300 {
7373
return true, nil
7474
}
75-
fmt.Printf("Got error %v, response %v, retrying.\n", err, *resp)
75+
body, _ := io.ReadAll(resp.Body)
76+
log.Printf("Got error %v, response_text: %s,response: %+v, retrying.\n", err, body, *resp)
7677
return false, nil
7778
})
7879

@@ -84,16 +85,17 @@ func InvokeE2EAPIPOST(format string, body string, args ...interface{}) string {
8485
_ = Body.Close()
8586
}(resp.Body)
8687

88+
var respBody string
8789
for s := bufio.NewScanner(resp.Body); s.Scan(); {
8890
x := s.Text()
8991
if strings.Contains(x, "ERROR") { // hacky way to return an error from an octet-stream
9092
panic(errors.New(x))
9193
}
9294
log.Printf("> %s\n", x)
93-
body += x
95+
respBody += x
9496
}
9597
if resp.StatusCode >= 300 {
9698
panic(errors.New(resp.Status))
9799
}
98-
return body
100+
return respBody
99101
}

Diff for: test/fixtures/serving.go

+25
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/*
2+
Copyright 2022 The Numaproj Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package fixtures
18+
19+
func SendServingMessage(host, reqId, payload string, sync bool) string {
20+
return InvokeE2EAPIPOST("/serving/send-message?host=%s&reqId=%s&sync=%t", payload, host, reqId, sync)
21+
}
22+
23+
func FetchServingResult(host, reqId string) string {
24+
return InvokeE2EAPIPOST("/serving/fetch-results?host=%s&reqId=%s", "", host, reqId)
25+
}

Diff for: test/serving-e2e/serving_test.go

+112
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
//go:build test
2+
3+
/*
4+
Copyright 2022 The Numaproj Authors.
5+
Licensed under the Apache License, Version 2.0 (the "License");
6+
you may not use this file except in compliance with the License.
7+
You may obtain a copy of the License at
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
Unless required by applicable law or agreed to in writing, software
10+
distributed under the License is distributed on an "AS IS" BASIS,
11+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
See the License for the specific language governing permissions and
13+
limitations under the License.
14+
*/
15+
16+
package serving_e2e
17+
18+
import (
19+
"encoding/json"
20+
"fmt"
21+
"testing"
22+
"time"
23+
24+
"github.com/stretchr/testify/assert"
25+
"github.com/stretchr/testify/require"
26+
"github.com/stretchr/testify/suite"
27+
28+
"github.com/numaproj/numaflow/test/fixtures"
29+
)
30+
31+
//go:generate kubectl -n numaflow-system set env deploy/numaflow-controller NUMAFLOW_EXECUTE_RUST_BINARY=true
32+
//go:generate kubectl delete -f testdata/serving-pipeline-cat.yaml -n numaflow-system --ignore-not-found=true
33+
//go:generate kubectl -n numaflow-system wait --for=condition=ready pod -l app.kubernetes.io/name=controller-manager --timeout 30s
34+
35+
const Namespace = "numaflow-system"
36+
37+
type ServingSuite struct {
38+
fixtures.E2ESuite
39+
}
40+
41+
// {"message":"Successfully published message","id":"0195859e-b05f-78d3-b0b6-6946a77f842e","code":200,"timestamp":"2025-03-11T14:32:05.455629106Z"}
42+
type asyncAPIResponse struct {
43+
Message string `json:"message"`
44+
Id string `json:"id"`
45+
Code int `json:"code"`
46+
Timestamp time.Time `json:"timestamp"`
47+
}
48+
49+
func (resp *asyncAPIResponse) validate(expReqId string) error {
50+
if resp.Message != "Successfully published message" {
51+
return fmt.Errorf("message field = %q, expected='Successfully published message'", resp.Message)
52+
}
53+
if resp.Id != expReqId {
54+
return fmt.Errorf("value of id field should be %q, current value: %q", expReqId, resp.Id)
55+
}
56+
var defaultTime time.Time
57+
if resp.Timestamp == defaultTime {
58+
return fmt.Errorf("invalid value for timestamp field: %q", resp.Timestamp)
59+
}
60+
if resp.Code != 200 {
61+
return fmt.Errorf("expected value of 'code' field is 200, got %d", resp.Code)
62+
}
63+
return nil
64+
}
65+
66+
func (ss *ServingSuite) TestServingSource() {
67+
w := ss.Given().Pipeline("@testdata/serving-pipeline-cat.yaml").
68+
When().
69+
CreatePipelineAndWait()
70+
defer w.DeletePipelineAndWait()
71+
72+
w.Expect().VertexPodsRunning()
73+
74+
pipelineName := "serving-source"
75+
serviceName := "serving-source-serving-in"
76+
// Check Service
77+
cmd := fmt.Sprintf("kubectl -n %s get svc -lnumaflow.numaproj.io/pipeline-name=%s,numaflow.numaproj.io/vertex-name=%s | grep -v CLUSTER-IP | grep -v headless", Namespace, pipelineName, "serving-in")
78+
w.Exec("sh", []string{"-c", cmd}, fixtures.OutputRegexp(serviceName))
79+
80+
// Send a request using sync API
81+
syncResp := fixtures.SendServingMessage(serviceName, "", "test data", true)
82+
assert.Equal(ss.T(), "test data", syncResp)
83+
84+
// Send a request using async API
85+
const reqId = "req-12345"
86+
asyncRespText := fixtures.SendServingMessage(serviceName, reqId, "test data", false)
87+
var asyncResp asyncAPIResponse
88+
err := json.Unmarshal([]byte(asyncRespText), &asyncResp)
89+
require.NoError(ss.T(), err)
90+
require.NoError(ss.T(), asyncResp.validate(reqId))
91+
92+
// Use fetch API to retrieve the results of the above async request
93+
attempt := 0
94+
var asyncResult string
95+
for {
96+
if attempt > 5 {
97+
ss.T().Fatalf("Processing for async request is still in-progress")
98+
break
99+
}
100+
asyncResult = fixtures.FetchServingResult(serviceName, reqId)
101+
if asyncResult != `{"status":"in-progress"}` {
102+
break
103+
}
104+
attempt++
105+
time.Sleep(2 * time.Second)
106+
}
107+
assert.Equal(ss.T(), "test data", asyncResult)
108+
}
109+
110+
func TestServingSuite(t *testing.T) {
111+
suite.Run(t, new(ServingSuite))
112+
}

Diff for: test/serving-e2e/testdata/serving-pipeline-cat.yaml

+39
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
apiVersion: numaflow.numaproj.io/v1alpha1
2+
kind: Pipeline
3+
metadata:
4+
name: serving-source
5+
spec:
6+
vertices:
7+
- name: serving-in
8+
servingStoreName: default
9+
scale:
10+
min: 1
11+
max: 1
12+
source:
13+
serving:
14+
service: true
15+
msgIDHeaderKey: "X-Numaflow-Id"
16+
17+
- name: cat
18+
scale:
19+
min: 1
20+
max: 1
21+
udf:
22+
container:
23+
image: quay.io/numaio/numaflow-go/map-forward-message:stable
24+
25+
- name: serve-sink
26+
servingStoreName: default
27+
scale:
28+
min: 1
29+
max: 1
30+
sink:
31+
udsink:
32+
container:
33+
image: quay.io/numaio/numaflow-go/sink-serve:stable
34+
35+
edges:
36+
- from: serving-in
37+
to: cat
38+
- from: cat
39+
to: serve-sink

0 commit comments

Comments
 (0)