Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add request timeout and timestamp as headers in HTTP target #396

Merged
merged 1 commit into from
Jan 31, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions assets/docs/configuration/targets/http-full-example.hcl
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ target {
# Optional path to the file containing template which is used to build HTTP request based on a batch of input data
template_file = "myTemplate.file"

# Optional. When enabled, configured client timeout and timestamp of request are attached as HTTP headers.
include_timing_headers = true
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if this is the best name...

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't think of a better one 🤷


# Optional HTTP response rules which are used to match HTTP response code/body and categorize it as either invalid data or target setup error.
# For example, we can have 2 invalid + 1 setup error rules:
response_rules {
Expand Down
2 changes: 2 additions & 0 deletions config/component_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ func TestCreateTargetComponentHCL(t *testing.T) {
KeyFile: "",
CaFile: "",
SkipVerifyTLS: false,
IncludeTimingHeaders: false,
ResponseRules: &target.ResponseRules{
Invalid: []target.Rule{},
SetupError: []target.Rule{},
Expand Down Expand Up @@ -118,6 +119,7 @@ func TestCreateTargetComponentHCL(t *testing.T) {
SkipVerifyTLS: true,
DynamicHeaders: true,
TemplateFile: "myTemplate.file",
IncludeTimingHeaders: true,
ResponseRules: &target.ResponseRules{
Invalid: []target.Rule{
{
Expand Down
18 changes: 17 additions & 1 deletion pkg/target/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"net/http"
"net/url"
"os"
"strconv"
"strings"
"text/template"
"time"
Expand Down Expand Up @@ -61,6 +62,8 @@ type HTTPTargetConfig struct {

TemplateFile string `hcl:"template_file,optional"`
ResponseRules *ResponseRules `hcl:"response_rules,block"`

IncludeTimingHeaders bool `hcl:"include_timing_headers,optional"`
}

// ResponseRules is part of HTTP target configuration. It provides rules how HTTP respones should be handled. Response can be categerized as 'invalid' (bad data), as setup error or (if none of the rules matches) as a transient error.
Expand Down Expand Up @@ -99,6 +102,8 @@ type HTTPTarget struct {
requestTemplate *template.Template
approxTmplSize int
responseRules *ResponseRules

includeTimingHeaders bool
}

func checkURL(str string) error {
Expand Down Expand Up @@ -159,7 +164,8 @@ func newHTTPTarget(
oAuth2RefreshToken string,
oAuth2TokenURL string,
templateFile string,
responseRules *ResponseRules) (*HTTPTarget, error) {
responseRules *ResponseRules,
includeTimingHeaders bool) (*HTTPTarget, error) {
err := checkURL(httpURL)
if err != nil {
return nil, err
Expand Down Expand Up @@ -208,6 +214,8 @@ func newHTTPTarget(
requestTemplate: requestTemplate,
approxTmplSize: approxTmplSize,
responseRules: responseRules,

includeTimingHeaders: includeTimingHeaders,
}, nil
}

Expand Down Expand Up @@ -290,6 +298,7 @@ func HTTPTargetConfigFunction(c *HTTPTargetConfig) (*HTTPTarget, error) {
c.OAuth2TokenURL,
c.TemplateFile,
c.ResponseRules,
c.IncludeTimingHeaders,
)
}

Expand Down Expand Up @@ -318,6 +327,7 @@ func (f HTTPTargetAdapter) ProvideDefault() (interface{}, error) {
Invalid: []Rule{},
SetupError: []Rule{},
},
IncludeTimingHeaders: false,
}

return cfg, nil
Expand Down Expand Up @@ -383,7 +393,13 @@ func (ht *HTTPTarget) Write(messages []*models.Message) (*models.TargetWriteResu
if ht.basicAuthUsername != "" && ht.basicAuthPassword != "" { // Add basic auth if set
request.SetBasicAuth(ht.basicAuthUsername, ht.basicAuthPassword)
}

requestStarted := time.Now().UTC()
if ht.includeTimingHeaders {
request.Header.Add("Request-Timeout", strconv.FormatInt(ht.client.Timeout.Milliseconds(), 10))
request.Header.Add("Request-Timestamp", strconv.FormatInt(requestStarted.UnixMilli(), 10))
}

resp, err := ht.client.Do(request) // Make request
requestFinished := time.Now().UTC()

Expand Down
2 changes: 1 addition & 1 deletion pkg/target/http_oauth2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func runTest(t *testing.T, inputClientID string, inputClientSecret string, input
}

func oauth2Target(t *testing.T, targetURL string, inputClientID string, inputClientSecret string, inputRefreshToken string, tokenServerURL string) *HTTPTarget {
target, err := newHTTPTarget(targetURL, 5, 1, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, false, inputClientID, inputClientSecret, inputRefreshToken, tokenServerURL, "", defaultResponseRules())
target, err := newHTTPTarget(targetURL, 5, 1, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, false, inputClientID, inputClientSecret, inputRefreshToken, tokenServerURL, "", defaultResponseRules(), false)
if err != nil {
t.Fatal(err)
}
Expand Down
86 changes: 63 additions & 23 deletions pkg/target/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"net/http"
"net/http/httptest"
"reflect"
"strconv"
"sync"
"sync/atomic"
"testing"
Expand All @@ -31,11 +32,12 @@ import (
"github.com/snowplow/snowbridge/pkg/testutil"
)

func createTestServerWithResponseCode(results *[][]byte, responseCode int, responseBody string) *httptest.Server {
func createTestServerWithResponseCode(results *[][]byte, headers *http.Header, responseCode int, responseBody string) *httptest.Server {
mutex := &sync.Mutex{}
return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
defer req.Body.Close()
data, err := io.ReadAll(req.Body)
*headers = req.Header
if err != nil {
panic(err)
}
Expand All @@ -48,7 +50,8 @@ func createTestServerWithResponseCode(results *[][]byte, responseCode int, respo
}

func createTestServer(results *[][]byte) *httptest.Server {
return createTestServerWithResponseCode(results, 200, "")
var headers http.Header
return createTestServerWithResponseCode(results, &headers, 200, "")
}

func TestHTTP_GetHeaders(t *testing.T) {
Expand Down Expand Up @@ -308,20 +311,20 @@ func TestHTTP_AddHeadersToRequest_WithDynamicHeaders(t *testing.T) {
func TestHTTP_NewHTTPTarget(t *testing.T) {
assert := assert.New(t)

httpTarget, err := newHTTPTarget("http://something", 5, 1, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, false, "", "", "", "", "", defaultResponseRules())
httpTarget, err := newHTTPTarget("http://something", 5, 1, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, false, "", "", "", "", "", defaultResponseRules(), false)

assert.Nil(err)
assert.NotNil(httpTarget)

failedHTTPTarget, err1 := newHTTPTarget("something", 5, 1, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, false, "", "", "", "", "", defaultResponseRules())
failedHTTPTarget, err1 := newHTTPTarget("something", 5, 1, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, false, "", "", "", "", "", defaultResponseRules(), false)

assert.NotNil(err1)
if err1 != nil {
assert.Equal("Invalid url for HTTP target: 'something'", err1.Error())
}
assert.Nil(failedHTTPTarget)

failedHTTPTarget2, err2 := newHTTPTarget("", 5, 1, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, false, "", "", "", "", "", defaultResponseRules())
failedHTTPTarget2, err2 := newHTTPTarget("", 5, 1, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, false, "", "", "", "", "", defaultResponseRules(), false)
assert.NotNil(err2)
if err2 != nil {
assert.Equal("Invalid url for HTTP target: ''", err2.Error())
Expand All @@ -345,11 +348,12 @@ func TestHTTP_Write_Simple(t *testing.T) {
assert := assert.New(t)

var results [][]byte
var headers http.Header
wg := sync.WaitGroup{}
server := createTestServerWithResponseCode(&results, tt.ResponseCode, "")
server := createTestServerWithResponseCode(&results, &headers, tt.ResponseCode, "")
defer server.Close()

target, err := newHTTPTarget(server.URL, 5, 1, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, false, "", "", "", "", "", defaultResponseRules())
target, err := newHTTPTarget(server.URL, 5, 1, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, false, "", "", "", "", "", defaultResponseRules(), false)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -390,6 +394,9 @@ func TestHTTP_Write_Simple(t *testing.T) {
}

assert.Equal(int64(25), ackOps)

assert.Empty(headers.Get("Request-Timestamp"))
assert.Empty(headers.Get("Request-Timeout"))
})
}
}
Expand All @@ -409,11 +416,12 @@ func TestHTTP_Write_Batched(t *testing.T) {
assert := assert.New(t)

var results [][]byte
var headers http.Header
wg := sync.WaitGroup{}
server := createTestServerWithResponseCode(&results, 200, "")
server := createTestServerWithResponseCode(&results, &headers, 200, "")
defer server.Close()

target, err := newHTTPTarget(server.URL, 5, tt.BatchSize, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, false, "", "", "", "", "", defaultResponseRules())
target, err := newHTTPTarget(server.URL, 5, tt.BatchSize, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, false, "", "", "", "", "", defaultResponseRules(), false)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -472,7 +480,7 @@ func TestHTTP_Write_Concurrent(t *testing.T) {
server := createTestServer(&results)
defer server.Close()

target, err := newHTTPTarget(server.URL, 5, 1, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, false, "", "", "", "", "", defaultResponseRules())
target, err := newHTTPTarget(server.URL, 5, 1, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, false, "", "", "", "", "", defaultResponseRules(), false)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -516,7 +524,7 @@ func TestHTTP_Write_Failure(t *testing.T) {
server := createTestServer(&results)
defer server.Close()

target, err := newHTTPTarget("http://NonexistentEndpoint", 5, 1, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, false, "", "", "", "", "", defaultResponseRules())
target, err := newHTTPTarget("http://NonexistentEndpoint", 5, 1, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, false, "", "", "", "", "", defaultResponseRules(), false)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -556,9 +564,10 @@ func TestHTTP_Write_InvalidResponseCode(t *testing.T) {
assert := assert.New(t)

var results [][]byte
server := createTestServerWithResponseCode(&results, tt.ResponseCode, "")
var headers http.Header
server := createTestServerWithResponseCode(&results, &headers, tt.ResponseCode, "")
defer server.Close()
target, err := newHTTPTarget(server.URL, 5, 1, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, false, "", "", "", "", "", defaultResponseRules())
target, err := newHTTPTarget(server.URL, 5, 1, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, false, "", "", "", "", "", defaultResponseRules(), false)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -593,7 +602,7 @@ func TestHTTP_Write_Oversized(t *testing.T) {
server := createTestServer(&results)
defer server.Close()

target, err := newHTTPTarget(server.URL, 5, 1, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, false, "", "", "", "", "", defaultResponseRules())
target, err := newHTTPTarget(server.URL, 5, 1, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, false, "", "", "", "", "", defaultResponseRules(), false)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -636,7 +645,7 @@ func TestHTTP_Write_EnabledTemplating(t *testing.T) {
server := createTestServer(&results)
defer server.Close()

target, err := newHTTPTarget(server.URL, 5, 5, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, false, "", "", "", "", string(`../../integration/http/template`), defaultResponseRules())
target, err := newHTTPTarget(server.URL, 5, 5, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, false, "", "", "", "", string(`../../integration/http/template`), defaultResponseRules(), false)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -685,7 +694,9 @@ func TestHTTP_Write_Invalid(t *testing.T) {
assert := assert.New(t)

var results [][]byte
server := createTestServerWithResponseCode(&results, 400, "Request is invalid. Invalid value for field 'attribute'")
var headers http.Header

server := createTestServerWithResponseCode(&results, &headers, 400, "Request is invalid. Invalid value for field 'attribute'")
defer server.Close()

responseRules := ResponseRules{
Expand All @@ -694,7 +705,7 @@ func TestHTTP_Write_Invalid(t *testing.T) {
},
}

target, err := newHTTPTarget(server.URL, 5, 5, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, false, "", "", "", "", string(`../../integration/http/template`), &responseRules)
target, err := newHTTPTarget(server.URL, 5, 5, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, false, "", "", "", "", string(`../../integration/http/template`), &responseRules, false)
if err != nil {
t.Fatal(err)
}
Expand All @@ -713,7 +724,9 @@ func TestHTTP_Write_Setup(t *testing.T) {
assert := assert.New(t)

var results [][]byte
server := createTestServerWithResponseCode(&results, 401, "Authentication issue. Invalid token")
var headers http.Header

server := createTestServerWithResponseCode(&results, &headers, 401, "Authentication issue. Invalid token")
defer server.Close()

responseRules := ResponseRules{
Expand All @@ -722,7 +735,7 @@ func TestHTTP_Write_Setup(t *testing.T) {
},
}

target, err := newHTTPTarget(server.URL, 5, 5, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, false, "", "", "", "", string(`../../integration/http/template`), &responseRules)
target, err := newHTTPTarget(server.URL, 5, 5, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, false, "", "", "", "", string(`../../integration/http/template`), &responseRules, false)
if err != nil {
t.Fatal(err)
}
Expand All @@ -740,6 +753,33 @@ func TestHTTP_Write_Setup(t *testing.T) {
assert.Regexp(".*Got setup error, response status: '401 Unauthorized' with error details: 'Invalid token'", err.Error())
}

func TestHTTP_TimeOrientedHeadersEnabled(t *testing.T) {
assert := assert.New(t)

var results [][]byte
var headers http.Header

server := createTestServerWithResponseCode(&results, &headers, 200, "ok")
defer server.Close()

target, err := newHTTPTarget(server.URL, 5, 5, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, false, "", "", "", "", "", defaultResponseRules(), true)
if err != nil {
t.Fatal(err)
}

input := []*models.Message{{Data: []byte(`{ "attribute": "value"}`)}}

beforeRequest := time.Now().UTC().UnixMilli()
target.Write(input)
afterRequest := time.Now().UTC().UnixMilli()

assert.Equal("5000", headers.Get("Request-Timeout"))

requestTimestamp, _ := strconv.ParseInt(headers.Get("Request-Timestamp"), 10, 64)
assert.GreaterOrEqual(requestTimestamp, beforeRequest)
assert.LessOrEqual(requestTimestamp, afterRequest)
}

func TestHTTP_Write_TLS(t *testing.T) {

if testing.Short() {
Expand Down Expand Up @@ -768,7 +808,7 @@ func TestHTTP_Write_TLS(t *testing.T) {
"",
"",
"",
defaultResponseRules())
defaultResponseRules(), false)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -810,7 +850,7 @@ func TestHTTP_Write_TLS(t *testing.T) {
"",
"",
"",
defaultResponseRules())
defaultResponseRules(), false)
if err2 != nil {
t.Fatal(err2)
}
Expand Down Expand Up @@ -845,7 +885,7 @@ func TestHTTP_Write_TLS(t *testing.T) {
"",
"",
"",
defaultResponseRules())
defaultResponseRules(), false)
if err4 != nil {
t.Fatal(err4)
}
Expand Down Expand Up @@ -1000,7 +1040,7 @@ func TestHTTP_Write_GroupedRequests(t *testing.T) {
defer server.Close()

//dynamicHeaders enabled
target, err := newHTTPTarget(server.URL, 5, 5, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, true, "", "", "", "", "", defaultResponseRules())
target, err := newHTTPTarget(server.URL, 5, 5, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, true, "", "", "", "", "", defaultResponseRules(), false)
if err != nil {
t.Fatal(err)
}
Expand Down
Loading