Skip to content

Commit

Permalink
Add request timeout and timestamp as headers in HTTP target
Browse files Browse the repository at this point in the history
  • Loading branch information
pondzix committed Jan 29, 2025
1 parent 4bbe884 commit 859a301
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 25 deletions.
4 changes: 4 additions & 0 deletions assets/docs/configuration/targets/http-full-example.hcl
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ target {
# Optional path to the file containing template which is used to build HTTP request based on a batch of input data
template_file = "myTemplate.file"

# Optional. When enabled, configured timeout and timestamp (moment in time when the request is sent) or request are attached as HTTP headers.
include_timing_headers = true

# Optional HTTP response rules which are used to match HTTP response code/body and categorize it as either invalid data or target setup error.
# For example, we can have 2 invalid + 1 setup error rules:
response_rules {
Expand All @@ -88,5 +91,6 @@ target {
http_codes = [401, 403]
}
}

}
}
2 changes: 2 additions & 0 deletions config/component_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ func TestCreateTargetComponentHCL(t *testing.T) {
KeyFile: "",
CaFile: "",
SkipVerifyTLS: false,
IncludeTimingHeaders: false,
ResponseRules: &target.ResponseRules{
Invalid: []target.Rule{},
SetupError: []target.Rule{},
Expand Down Expand Up @@ -118,6 +119,7 @@ func TestCreateTargetComponentHCL(t *testing.T) {
SkipVerifyTLS: true,
DynamicHeaders: true,
TemplateFile: "myTemplate.file",
IncludeTimingHeaders: true,
ResponseRules: &target.ResponseRules{
Invalid: []target.Rule{
{
Expand Down
18 changes: 17 additions & 1 deletion pkg/target/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"net/http"
"net/url"
"os"
"strconv"
"strings"
"text/template"
"time"
Expand Down Expand Up @@ -61,6 +62,8 @@ type HTTPTargetConfig struct {

TemplateFile string `hcl:"template_file,optional"`
ResponseRules *ResponseRules `hcl:"response_rules,block"`

IncludeTimingHeaders bool `hcl:"include_timing_headers,optional"`
}

// ResponseRules is part of HTTP target configuration. It provides rules how HTTP respones should be handled. Response can be categerized as 'invalid' (bad data), as setup error or (if none of the rules matches) as a transient error.
Expand Down Expand Up @@ -99,6 +102,8 @@ type HTTPTarget struct {
requestTemplate *template.Template
approxTmplSize int
responseRules *ResponseRules

includeTimingHeaders bool
}

func checkURL(str string) error {
Expand Down Expand Up @@ -159,7 +164,8 @@ func newHTTPTarget(
oAuth2RefreshToken string,
oAuth2TokenURL string,
templateFile string,
responseRules *ResponseRules) (*HTTPTarget, error) {
responseRules *ResponseRules,
includeTimingHeaders bool) (*HTTPTarget, error) {
err := checkURL(httpURL)
if err != nil {
return nil, err
Expand Down Expand Up @@ -208,6 +214,8 @@ func newHTTPTarget(
requestTemplate: requestTemplate,
approxTmplSize: approxTmplSize,
responseRules: responseRules,

includeTimingHeaders: includeTimingHeaders,
}, nil
}

Expand Down Expand Up @@ -290,6 +298,7 @@ func HTTPTargetConfigFunction(c *HTTPTargetConfig) (*HTTPTarget, error) {
c.OAuth2TokenURL,
c.TemplateFile,
c.ResponseRules,
c.IncludeTimingHeaders,
)
}

Expand Down Expand Up @@ -318,6 +327,7 @@ func (f HTTPTargetAdapter) ProvideDefault() (interface{}, error) {
Invalid: []Rule{},
SetupError: []Rule{},
},
IncludeTimingHeaders: false,
}

return cfg, nil
Expand Down Expand Up @@ -383,7 +393,13 @@ func (ht *HTTPTarget) Write(messages []*models.Message) (*models.TargetWriteResu
if ht.basicAuthUsername != "" && ht.basicAuthPassword != "" { // Add basic auth if set
request.SetBasicAuth(ht.basicAuthUsername, ht.basicAuthPassword)
}

requestStarted := time.Now().UTC()
if ht.includeTimingHeaders {
request.Header.Add("Request-Timeout", strconv.FormatInt(ht.client.Timeout.Milliseconds(), 10))
request.Header.Add("Request-Timestamp", strconv.FormatInt(requestStarted.UnixMilli(), 10))
}

resp, err := ht.client.Do(request) // Make request
requestFinished := time.Now().UTC()

Expand Down
2 changes: 1 addition & 1 deletion pkg/target/http_oauth2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func runTest(t *testing.T, inputClientID string, inputClientSecret string, input
}

func oauth2Target(t *testing.T, targetURL string, inputClientID string, inputClientSecret string, inputRefreshToken string, tokenServerURL string) *HTTPTarget {
target, err := newHTTPTarget(targetURL, 5, 1, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, false, inputClientID, inputClientSecret, inputRefreshToken, tokenServerURL, "", defaultResponseRules())
target, err := newHTTPTarget(targetURL, 5, 1, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, false, inputClientID, inputClientSecret, inputRefreshToken, tokenServerURL, "", defaultResponseRules(), false)
if err != nil {
t.Fatal(err)
}
Expand Down
86 changes: 63 additions & 23 deletions pkg/target/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"net/http"
"net/http/httptest"
"reflect"
"strconv"
"sync"
"sync/atomic"
"testing"
Expand All @@ -31,11 +32,12 @@ import (
"github.com/snowplow/snowbridge/pkg/testutil"
)

func createTestServerWithResponseCode(results *[][]byte, responseCode int, responseBody string) *httptest.Server {
func createTestServerWithResponseCode(results *[][]byte, headers *http.Header, responseCode int, responseBody string) *httptest.Server {
mutex := &sync.Mutex{}
return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
defer req.Body.Close()
data, err := io.ReadAll(req.Body)
*headers = req.Header
if err != nil {
panic(err)
}
Expand All @@ -48,7 +50,8 @@ func createTestServerWithResponseCode(results *[][]byte, responseCode int, respo
}

func createTestServer(results *[][]byte) *httptest.Server {
return createTestServerWithResponseCode(results, 200, "")
var headers http.Header
return createTestServerWithResponseCode(results, &headers, 200, "")
}

func TestHTTP_GetHeaders(t *testing.T) {
Expand Down Expand Up @@ -308,20 +311,20 @@ func TestHTTP_AddHeadersToRequest_WithDynamicHeaders(t *testing.T) {
func TestHTTP_NewHTTPTarget(t *testing.T) {
assert := assert.New(t)

httpTarget, err := newHTTPTarget("http://something", 5, 1, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, false, "", "", "", "", "", defaultResponseRules())
httpTarget, err := newHTTPTarget("http://something", 5, 1, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, false, "", "", "", "", "", defaultResponseRules(), false)

assert.Nil(err)
assert.NotNil(httpTarget)

failedHTTPTarget, err1 := newHTTPTarget("something", 5, 1, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, false, "", "", "", "", "", defaultResponseRules())
failedHTTPTarget, err1 := newHTTPTarget("something", 5, 1, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, false, "", "", "", "", "", defaultResponseRules(), false)

assert.NotNil(err1)
if err1 != nil {
assert.Equal("Invalid url for HTTP target: 'something'", err1.Error())
}
assert.Nil(failedHTTPTarget)

failedHTTPTarget2, err2 := newHTTPTarget("", 5, 1, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, false, "", "", "", "", "", defaultResponseRules())
failedHTTPTarget2, err2 := newHTTPTarget("", 5, 1, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, false, "", "", "", "", "", defaultResponseRules(), false)
assert.NotNil(err2)
if err2 != nil {
assert.Equal("Invalid url for HTTP target: ''", err2.Error())
Expand All @@ -345,11 +348,12 @@ func TestHTTP_Write_Simple(t *testing.T) {
assert := assert.New(t)

var results [][]byte
var headers http.Header
wg := sync.WaitGroup{}
server := createTestServerWithResponseCode(&results, tt.ResponseCode, "")
server := createTestServerWithResponseCode(&results, &headers, tt.ResponseCode, "")
defer server.Close()

target, err := newHTTPTarget(server.URL, 5, 1, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, false, "", "", "", "", "", defaultResponseRules())
target, err := newHTTPTarget(server.URL, 5, 1, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, false, "", "", "", "", "", defaultResponseRules(), false)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -390,6 +394,9 @@ func TestHTTP_Write_Simple(t *testing.T) {
}

assert.Equal(int64(25), ackOps)

assert.Empty(headers.Get("Request-Timestamp"))
assert.Empty(headers.Get("Request-Timeout"))
})
}
}
Expand All @@ -409,11 +416,12 @@ func TestHTTP_Write_Batched(t *testing.T) {
assert := assert.New(t)

var results [][]byte
var headers http.Header
wg := sync.WaitGroup{}
server := createTestServerWithResponseCode(&results, 200, "")
server := createTestServerWithResponseCode(&results, &headers, 200, "")
defer server.Close()

target, err := newHTTPTarget(server.URL, 5, tt.BatchSize, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, false, "", "", "", "", "", defaultResponseRules())
target, err := newHTTPTarget(server.URL, 5, tt.BatchSize, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, false, "", "", "", "", "", defaultResponseRules(), false)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -472,7 +480,7 @@ func TestHTTP_Write_Concurrent(t *testing.T) {
server := createTestServer(&results)
defer server.Close()

target, err := newHTTPTarget(server.URL, 5, 1, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, false, "", "", "", "", "", defaultResponseRules())
target, err := newHTTPTarget(server.URL, 5, 1, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, false, "", "", "", "", "", defaultResponseRules(), false)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -516,7 +524,7 @@ func TestHTTP_Write_Failure(t *testing.T) {
server := createTestServer(&results)
defer server.Close()

target, err := newHTTPTarget("http://NonexistentEndpoint", 5, 1, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, false, "", "", "", "", "", defaultResponseRules())
target, err := newHTTPTarget("http://NonexistentEndpoint", 5, 1, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, false, "", "", "", "", "", defaultResponseRules(), false)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -556,9 +564,10 @@ func TestHTTP_Write_InvalidResponseCode(t *testing.T) {
assert := assert.New(t)

var results [][]byte
server := createTestServerWithResponseCode(&results, tt.ResponseCode, "")
var headers http.Header
server := createTestServerWithResponseCode(&results, &headers, tt.ResponseCode, "")
defer server.Close()
target, err := newHTTPTarget(server.URL, 5, 1, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, false, "", "", "", "", "", defaultResponseRules())
target, err := newHTTPTarget(server.URL, 5, 1, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, false, "", "", "", "", "", defaultResponseRules(), false)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -593,7 +602,7 @@ func TestHTTP_Write_Oversized(t *testing.T) {
server := createTestServer(&results)
defer server.Close()

target, err := newHTTPTarget(server.URL, 5, 1, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, false, "", "", "", "", "", defaultResponseRules())
target, err := newHTTPTarget(server.URL, 5, 1, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, false, "", "", "", "", "", defaultResponseRules(), false)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -636,7 +645,7 @@ func TestHTTP_Write_EnabledTemplating(t *testing.T) {
server := createTestServer(&results)
defer server.Close()

target, err := newHTTPTarget(server.URL, 5, 5, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, false, "", "", "", "", string(`../../integration/http/template`), defaultResponseRules())
target, err := newHTTPTarget(server.URL, 5, 5, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, false, "", "", "", "", string(`../../integration/http/template`), defaultResponseRules(), false)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -685,7 +694,9 @@ func TestHTTP_Write_Invalid(t *testing.T) {
assert := assert.New(t)

var results [][]byte
server := createTestServerWithResponseCode(&results, 400, "Request is invalid. Invalid value for field 'attribute'")
var headers http.Header

server := createTestServerWithResponseCode(&results, &headers, 400, "Request is invalid. Invalid value for field 'attribute'")
defer server.Close()

responseRules := ResponseRules{
Expand All @@ -694,7 +705,7 @@ func TestHTTP_Write_Invalid(t *testing.T) {
},
}

target, err := newHTTPTarget(server.URL, 5, 5, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, false, "", "", "", "", string(`../../integration/http/template`), &responseRules)
target, err := newHTTPTarget(server.URL, 5, 5, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, false, "", "", "", "", string(`../../integration/http/template`), &responseRules, false)
if err != nil {
t.Fatal(err)
}
Expand All @@ -713,7 +724,9 @@ func TestHTTP_Write_Setup(t *testing.T) {
assert := assert.New(t)

var results [][]byte
server := createTestServerWithResponseCode(&results, 401, "Authentication issue. Invalid token")
var headers http.Header

server := createTestServerWithResponseCode(&results, &headers, 401, "Authentication issue. Invalid token")
defer server.Close()

responseRules := ResponseRules{
Expand All @@ -722,7 +735,7 @@ func TestHTTP_Write_Setup(t *testing.T) {
},
}

target, err := newHTTPTarget(server.URL, 5, 5, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, false, "", "", "", "", string(`../../integration/http/template`), &responseRules)
target, err := newHTTPTarget(server.URL, 5, 5, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, false, "", "", "", "", string(`../../integration/http/template`), &responseRules, false)
if err != nil {
t.Fatal(err)
}
Expand All @@ -740,6 +753,33 @@ func TestHTTP_Write_Setup(t *testing.T) {
assert.Regexp(".*Got setup error, response status: '401 Unauthorized' with error details: 'Invalid token'", err.Error())
}

func TestHTTP_TimeOrientedHeadersEnabled(t *testing.T) {
assert := assert.New(t)

var results [][]byte
var headers http.Header

server := createTestServerWithResponseCode(&results, &headers, 200, "ok")
defer server.Close()

target, err := newHTTPTarget(server.URL, 5, 5, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, false, "", "", "", "", "", defaultResponseRules(), true)
if err != nil {
t.Fatal(err)
}

input := []*models.Message{{Data: []byte(`{ "attribute": "value"}`)}}

beforeRequest := time.Now().UTC().UnixMilli()
target.Write(input)
afterRequest := time.Now().UTC().UnixMilli()

assert.Equal("5000", headers.Get("Request-Timeout"))

requestTimestamp, _ := strconv.ParseInt(headers.Get("Request-Timestamp"), 10, 64)
assert.GreaterOrEqual(requestTimestamp, beforeRequest)
assert.LessOrEqual(requestTimestamp, afterRequest)
}

func TestHTTP_Write_TLS(t *testing.T) {

if testing.Short() {
Expand Down Expand Up @@ -768,7 +808,7 @@ func TestHTTP_Write_TLS(t *testing.T) {
"",
"",
"",
defaultResponseRules())
defaultResponseRules(), false)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -810,7 +850,7 @@ func TestHTTP_Write_TLS(t *testing.T) {
"",
"",
"",
defaultResponseRules())
defaultResponseRules(), false)
if err2 != nil {
t.Fatal(err2)
}
Expand Down Expand Up @@ -845,7 +885,7 @@ func TestHTTP_Write_TLS(t *testing.T) {
"",
"",
"",
defaultResponseRules())
defaultResponseRules(), false)
if err4 != nil {
t.Fatal(err4)
}
Expand Down Expand Up @@ -1000,7 +1040,7 @@ func TestHTTP_Write_GroupedRequests(t *testing.T) {
defer server.Close()

//dynamicHeaders enabled
target, err := newHTTPTarget(server.URL, 5, 5, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, true, "", "", "", "", "", defaultResponseRules())
target, err := newHTTPTarget(server.URL, 5, 5, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, true, "", "", "", "", "", defaultResponseRules(), false)
if err != nil {
t.Fatal(err)
}
Expand Down

0 comments on commit 859a301

Please sign in to comment.