Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add http forwarder #29

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 8 additions & 10 deletions daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,17 @@ import (
"sync"
"time"

aa_log "github.com/aaronland/go-log/v2"
"github.com/aaronland/go-http-server"
server "github.com/aaronland/go-http-server"
thisisaaronland marked this conversation as resolved.
Show resolved Hide resolved
aa_log "github.com/aaronland/go-log/v2"
"github.com/whosonfirst/go-webhookd/v3"
"github.com/whosonfirst/go-webhookd/v3/config"
"github.com/whosonfirst/go-webhookd/v3/dispatcher"
"github.com/whosonfirst/go-webhookd/v3/receiver"
"github.com/whosonfirst/go-webhookd/v3/transformation"
"github.com/whosonfirst/go-webhookd/v3/webhook"
"github.com/whosonfirst/go-webhookd/v3/webhook"
)

// type WebhookDaemon is a struct that implements a long-running daemon to listen for and process webhooks.
// type WebhookDaemon is a struct that implements a long-running daemon to listen for and process webhooks.
type WebhookDaemon struct {
// server is a `aaronland/go-http-server.Server` instance that handles HTTP requests and responses.
server server.Server
Expand Down Expand Up @@ -198,7 +198,7 @@ func (d *WebhookDaemon) AddWebhook(ctx context.Context, wh webhook.Webhook) erro
_, ok := d.webhooks[endpoint]

if ok {
return fmt.Errorf("endpoint already configured")
return fmt.Errorf("Endpoint already configured")
}

d.webhooks[endpoint] = wh
Expand Down Expand Up @@ -357,8 +357,8 @@ func (d *WebhookDaemon) HandlerFuncWithLogger(logger *log.Logger) (http.HandlerF
aa_log.Debug(logger, "Time to receive: %v", ttr)
aa_log.Debug(logger, "Time to transform: %v", ttt)
aa_log.Debug(logger, "Time to dispatch: %v", ttd)
aa_log.Debug(logger, "Time to process: %v", t2)
aa_log.Debug(logger, "Time to process: %v", t2)

rsp.Header().Set("X-Webhookd-Time-To-Receive", fmt.Sprintf("%v", ttr))
rsp.Header().Set("X-Webhookd-Time-To-Transform", fmt.Sprintf("%v", ttt))
rsp.Header().Set("X-Webhookd-Time-To-Dispatch", fmt.Sprintf("%v", ttd))
Expand All @@ -375,8 +375,6 @@ func (d *WebhookDaemon) HandlerFuncWithLogger(logger *log.Logger) (http.HandlerF
rsp.Write(body)
}
}

return
}

return http.HandlerFunc(handler), nil
Expand All @@ -402,7 +400,7 @@ func (d *WebhookDaemon) StartWithLogger(ctx context.Context, logger *log.Logger)

svr := d.server

aa_log.Info(logger, "webhookd listening for requests on %s\n", svr.Address())
aa_log.Info(logger, "Webhookd listening for requests on %s\n", svr.Address())

err = svr.ListenAndServe(ctx, mux)

Expand Down
5 changes: 3 additions & 2 deletions dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@ package dispatcher
import (
"context"
"fmt"
"github.com/aaronland/go-roster"
"github.com/whosonfirst/go-webhookd/v3"
"net/url"
"sort"

"github.com/aaronland/go-roster"
"github.com/whosonfirst/go-webhookd/v3"
)

// dispatcher is a `aaronland/go-roster.Roster` instance used to maintain a list of registered `webhookd.WebhookDispatcher` initialization functions.
Expand Down
132 changes: 132 additions & 0 deletions dispatcher/http.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
package dispatcher

import (
"bytes"
"context"
"fmt"
"io"
"log"
"net/http"
"net/url"

"github.com/whosonfirst/go-webhookd/v3"
)

func init() {

ctx := context.Background()
err := RegisterDispatcher(ctx, "http", NewHTTPDispatcher)
thisisaaronland marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
panic(err)
}

err = RegisterDispatcher(ctx, "https", NewHTTPDispatcher)
if err != nil {
panic(err)
}
}

// GET and POST are the only supported HTTP methods
const GET = "GET"
const POST = "POST"

// HTTPDispatcher implements the `webhookd.WebhookDispatcher` interface for dispatching messages to a `log.Logger` instance `http.get` or `http.post`.
type HTTPDispatcher struct {
webhookd.WebhookDispatcher
// logger is the `log.Logger` instance associated with the dispatcher.
logger *log.Logger
// url to send the message to
url url.URL
// method to use when sending the message
method string
// client to use when sending the message
client HTTPClient
}

// HTTPClient is an interface for `http.Client` to allow for mocking in tests.
type HTTPClient interface {
thisisaaronland marked this conversation as resolved.
Show resolved Hide resolved
Get(url string) (*http.Response, error)
Post(url string, contentType string, body io.Reader) (*http.Response, error)
}

// NewHTTPDispatcher returns a new `HTTPDispatcher` instance configured by 'uri' in the form of:
//
// http://
// https://
//
// Messages are dispatched to the default `log.Default()` instance along with the uri parsed.
func NewHTTPDispatcher(ctx context.Context, uri string) (webhookd.WebhookDispatcher, error) {
logger := log.Default()

parsed, err := url.Parse(uri)

if err != nil {
return nil, fmt.Errorf("Failed to parse URI, %w", err)
}

return NewHTTPDispatcherWithLogger(ctx, logger, *parsed, &http.Client{})
}

// NewHTTPDispatcher returns a new `HTTPDispatcher` instance that dispatches messages to `http.Get` or `http.Post`.
func NewHTTPDispatcherWithLogger(ctx context.Context, logger *log.Logger, url url.URL, client HTTPClient) (webhookd.WebhookDispatcher, error) {
bradstimpson marked this conversation as resolved.
Show resolved Hide resolved
display := fmt.Sprintf("%s://%s%s", url.Scheme, url.Host, url.Path)
if len(url.Query()) > 0 {
display += fmt.Sprintf("?%s", url.RawQuery)
}
logger.Print("Parsed dispatcher URL: ", display)
bradstimpson marked this conversation as resolved.
Show resolved Hide resolved

method := url.Query().Get("method")
if method != GET {
method = POST
}

d := HTTPDispatcher{
logger: logger,
url: url,
method: method,
client: client,
}

return &d, nil
}

// Dispatch sends 'body' to the `log.Logger` and `http.Get`/`http.Post` that 'd' has been instantiated with.
func (d *HTTPDispatcher) Dispatch(ctx context.Context, body []byte) *webhookd.WebhookError {
var resp *http.Response
var err error

if d.method == GET {
d.logger.Println("Dispatching GET:", d.url.String(), "not forwarding body: ", string(body))
resp, err = d.client.Get(d.url.String())
} else {
d.logger.Println("Dispatching POST:", d.url.String(), "forwarding body: ", string(body))
resp, err = d.client.Post(d.url.String(), "application/json", bytes.NewBuffer(body))
}

// if we get a nil response the destination is unreachable
if resp == nil {
code := http.StatusRequestTimeout
message := "Timeout likely destination unreachable"
whErr := &webhookd.WebhookError{Code: code, Message: message}
return whErr
}

// if we get any other status code than 200
if resp.StatusCode != http.StatusOK {
code := resp.StatusCode
message := fmt.Sprintf("Failed to dispatch message: %s", resp.Status)
whErr := &webhookd.WebhookError{Code: code, Message: message}
return whErr
}

defer resp.Body.Close()
bradstimpson marked this conversation as resolved.
Show resolved Hide resolved

if err != nil {
code := http.StatusInternalServerError
message := err.Error()
whErr := &webhookd.WebhookError{Code: code, Message: message}
return whErr
}

return nil
}
71 changes: 71 additions & 0 deletions dispatcher/http_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package dispatcher

import (
"bytes"
"context"
"io"
"io/ioutil"
"log"
"net/http"
"net/url"
"strings"
"testing"
)

type MockHTTPClient struct {
Resp *http.Response
Error error
}

func (m *MockHTTPClient) Get(url string) (*http.Response, error) {
return m.Resp, m.Error
}

func (m *MockHTTPClient) Post(url string, contentType string, body io.Reader) (*http.Response, error) {
return m.Resp, m.Error
}

func TestNewHTTPDispatcherWithLogger(t *testing.T) {

ctx := context.Background()

var buf bytes.Buffer

logger := log.New(&buf, "testing ", log.Lshortfile)

parsed, err := url.Parse("http://testing?method=GET")
if err != nil {
t.Fatalf("Failed to parse url, %v", err)
}

// Create a mock response
mockResponse := &http.Response{
StatusCode: http.StatusOK,
Body: ioutil.NopCloser(strings.NewReader("Mock response body")),
}

// Create a mock HTTP client with the desired behavior
mockClient := &MockHTTPClient{
Resp: mockResponse,
Error: nil,
}

d, err := NewHTTPDispatcherWithLogger(ctx, logger, *parsed, mockClient)

if err != nil {
t.Fatalf("Failed to create new http dispatcher with logger, %v", err)
}

err2 := d.Dispatch(ctx, []byte("hello world"))

if err2 != nil {
t.Fatalf("Failed to dispatch message, %v", err2)
}

expected := "testing http.go:76: Parsed dispatcher URL: http://testing?method=GET\ntesting http.go:99: Dispatching GET: http://testing?method=GET not forwarding body: hello world"
output := strings.TrimSpace(buf.String())

if output != expected {
t.Fatalf("Unexpected output from custom writer: '%s'", output)
}
}
3 changes: 2 additions & 1 deletion dispatcher/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ package dispatcher

import (
"context"
"github.com/whosonfirst/go-webhookd/v3"
"log"

"github.com/whosonfirst/go-webhookd/v3"
)

func init() {
Expand Down
1 change: 1 addition & 0 deletions staticcheck.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
checks = ["all", "-ST1005", "-ST1003", "-ST1020","-ST1021"]