Skip to content

Commit 83a4313

Browse files
Fix: request counter race for concurrent requests matching the same interaction (#46)
For each interaction, we keep track of the number of times it has been called. That counter can be used in the modifiers to change the response returned to the client. For example it is possible to change the return status on the 2nd invocation of a specific interaction. The code has a bug when concurrent requests happen for the same interaction. The change modifies the way the concurrency is managed just for the request counter. As part of this change, we have added a new test that can be used to prove that the original code has the issue and the proposed version fixes it. Co-authored-by: Andrea Rosa <[email protected]> Co-authored-by: Shreya Garge <[email protected]>
1 parent 7388a82 commit 83a4313

File tree

6 files changed

+81
-17
lines changed

6 files changed

+81
-17
lines changed

internal/app/concurrent_proxy_stage_test.go

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package app
33
import (
44
"context"
55
"fmt"
6+
"io"
67
"net/http"
78
"strings"
89
"sync"
@@ -152,6 +153,34 @@ func (s *ConcurrentProxyStage) the_concurrent_requests_are_sent() {
152153
s.assert.NoError(err)
153154
}
154155

156+
func (s *ConcurrentProxyStage) the_concurrent_requests_are_sent_with_attempt_based_modifier() {
157+
err := s.pact.Verify(func() (err error) {
158+
attempt := 2
159+
s.proxy.ForInteraction(postNamePactWithAnyName).AddModifier("$.status", fmt.Sprintf("%d", http.StatusConflict), &attempt)
160+
s.proxy.ForInteraction(postNamePactWithAnyName).AddModifier("$.body.name", "Form3", &attempt)
161+
162+
wg := sync.WaitGroup{}
163+
164+
wg.Add(1)
165+
go func() {
166+
defer wg.Done()
167+
s.makeUserRequest()
168+
}()
169+
170+
wg.Add(1)
171+
go func() {
172+
defer wg.Done()
173+
s.makeUserRequest()
174+
}()
175+
176+
wg.Wait()
177+
178+
return nil
179+
})
180+
181+
s.assert.NoError(err)
182+
}
183+
155184
func (s *ConcurrentProxyStage) makeUserRequest() {
156185
u := fmt.Sprintf("http://localhost:%s/users", proxyURL.Port())
157186
req, err := http.NewRequest("POST", u, strings.NewReader(`{"name":"jim"}`))
@@ -208,6 +237,25 @@ func (s *ConcurrentProxyStage) all_the_user_responses_should_have_the_right_stat
208237
return s
209238
}
210239

240+
func (s *ConcurrentProxyStage) the_second_user_response_should_have_the_right_status_code_and_body() *ConcurrentProxyStage {
241+
statuses := make(map[int]int)
242+
bodies := make(map[string]int)
243+
for _, res := range s.userResponses {
244+
statuses[res.StatusCode] += 1
245+
bd, err := io.ReadAll(res.Body)
246+
s.assert.NoError(err)
247+
res.Body.Close()
248+
bodies[strings.ReplaceAll(strings.TrimSpace(string(bd)), "\"", "")] += 1
249+
}
250+
s.assert.Len(statuses, 2)
251+
s.assert.Len(bodies, 2)
252+
s.assert.Equal(1, statuses[http.StatusConflict])
253+
s.assert.Equal(1, statuses[http.StatusOK])
254+
s.assert.Equal(1, bodies["{name:any}"])
255+
s.assert.Equal(1, bodies["{name:Form3}"])
256+
return s
257+
}
258+
211259
func (s *ConcurrentProxyStage) all_the_address_responses_should_have_the_right_status_code() *ConcurrentProxyStage {
212260
expectedLen := s.concurrentAddressRequestsPerSecond * int(s.concurrentAddressRequestsDuration/time.Second)
213261
s.assert.Len(s.addressResponses, expectedLen, "number of address responses is not as expected")

internal/app/concurrent_proxy_test.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,16 @@ func TestConcurrentRequestsForDifferentModifiersHaveTheCorrectResponses(t *testi
2424
all_the_address_responses_should_have_the_right_status_code()
2525
}
2626

27+
func TestConcurrentRequestsForSameModifierBasedOnAttempt(t *testing.T) {
28+
given, when, then := NewConcurrentProxyStage(t)
29+
given.
30+
a_pact_that_allows_any_names()
31+
when.
32+
the_concurrent_requests_are_sent_with_attempt_based_modifier()
33+
then.
34+
the_second_user_response_should_have_the_right_status_code_and_body()
35+
}
36+
2737
func TestConcurrentRequestsWaitForAllPacts(t *testing.T) {
2838
given, when, then := NewConcurrentProxyStage(t)
2939

internal/app/pactproxy/interaction.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -375,7 +375,7 @@ func (i *Interaction) EvaluateConstraints(request requestDocument, interactions
375375
return result, violations
376376
}
377377

378-
func (i *Interaction) StoreRequest(request requestDocument) {
378+
func (i *Interaction) StoreRequest(request requestDocument) int {
379379
i.mu.Lock()
380380
defer i.mu.Unlock()
381381
i.LastRequest = request
@@ -384,6 +384,7 @@ func (i *Interaction) StoreRequest(request requestDocument) {
384384
if i.recordHistory {
385385
i.RequestHistory = append(i.RequestHistory, request)
386386
}
387+
return i.RequestCount
387388
}
388389

389390
func (i *Interaction) HasRequests(count int) bool {

internal/app/pactproxy/modifier.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,8 @@ func (ims *interactionModifiers) Modifiers() []*interactionModifier {
4141
return result
4242
}
4343

44-
func (ims *interactionModifiers) modifyBody(b []byte) ([]byte, error) {
44+
func (ims *interactionModifiers) modifyBody(b []byte, requestCount int) ([]byte, error) {
4545
for _, m := range ims.Modifiers() {
46-
requestCount := ims.interaction.getRequestCount()
4746
if m.Path == "$.bytes.body" {
4847
if v, ok := m.Value.(string); ok && m.Attempt == nil || *m.Attempt == requestCount {
4948
var err error
@@ -66,9 +65,8 @@ func (ims *interactionModifiers) modifyBody(b []byte) ([]byte, error) {
6665
return b, nil
6766
}
6867

69-
func (ims *interactionModifiers) modifyStatusCode() (bool, int) {
68+
func (ims *interactionModifiers) modifyStatusCode(requestCount int) (bool, int) {
7069
for _, m := range ims.Modifiers() {
71-
requestCount := ims.interaction.getRequestCount()
7270
if m.Path == "$.status" {
7371
if m.Attempt == nil || *m.Attempt == requestCount {
7472
code, err := strconv.Atoi(fmt.Sprintf("%v", m.Value))

internal/app/pactproxy/proxy.go

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -251,6 +251,11 @@ func (a *api) interactionsWaitHandler(c echo.Context) error {
251251
return c.NoContent(http.StatusOK)
252252
}
253253

254+
type matchedInteraction struct {
255+
interaction *Interaction
256+
attemptCount int
257+
}
258+
254259
func (a *api) indexHandler(c echo.Context) error {
255260
req := c.Request()
256261
log.Infof("proxying %s %s %+v", req.Method, req.URL.Path, req.Header)
@@ -299,12 +304,14 @@ func (a *api) indexHandler(c echo.Context) error {
299304
request["headers"] = h
300305

301306
unmatched := make(map[string][]string)
302-
matched := make([]*Interaction, 0)
307+
matched := make([]matchedInteraction, 0)
303308
for _, interaction := range allInteractions {
304309
ok, info := interaction.EvaluateConstraints(request, a.interactions)
305310
if ok {
306-
interaction.StoreRequest(request)
307-
matched = append(matched, interaction)
311+
matched = append(matched, matchedInteraction{
312+
interaction: interaction,
313+
attemptCount: interaction.StoreRequest(request),
314+
})
308315
} else {
309316
unmatched[interaction.Description] = info
310317
}
@@ -319,7 +326,7 @@ func (a *api) indexHandler(c echo.Context) error {
319326
}
320327

321328
a.notify.Notify()
322-
a.proxy.ServeHTTP(&ResponseModificationWriter{res: c.Response(), interactions: matched}, req)
329+
a.proxy.ServeHTTP(&ResponseModificationWriter{res: c.Response(), matchedInteractions: matched}, req)
323330
return nil
324331
}
325332

internal/app/pactproxy/response_modification_writer.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,10 @@ import (
77
)
88

99
type ResponseModificationWriter struct {
10-
res http.ResponseWriter
11-
interactions []*Interaction
12-
originalResponse []byte
13-
statusCode int
10+
res http.ResponseWriter
11+
matchedInteractions []matchedInteraction
12+
originalResponse []byte
13+
statusCode int
1414
}
1515

1616
func (m *ResponseModificationWriter) Header() http.Header {
@@ -29,8 +29,8 @@ func (m *ResponseModificationWriter) Write(b []byte) (int, error) {
2929
}
3030

3131
var modifiedBody []byte
32-
for _, i := range m.interactions {
33-
modifiedBody, err = i.modifiers.modifyBody(m.originalResponse)
32+
for _, i := range m.matchedInteractions {
33+
modifiedBody, err = i.interaction.modifiers.modifyBody(m.originalResponse, i.attemptCount)
3434
if err != nil {
3535
return 0, err
3636
}
@@ -51,8 +51,8 @@ func (m *ResponseModificationWriter) Write(b []byte) (int, error) {
5151

5252
func (m *ResponseModificationWriter) WriteHeader(statusCode int) {
5353
m.statusCode = statusCode
54-
for _, i := range m.interactions {
55-
ok, code := i.modifiers.modifyStatusCode()
54+
for _, i := range m.matchedInteractions {
55+
ok, code := i.interaction.modifiers.modifyStatusCode(i.attemptCount)
5656
if ok {
5757
m.statusCode = code
5858
break

0 commit comments

Comments
 (0)