Skip to content

Commit

Permalink
Faro Exporter (#29)
Browse files Browse the repository at this point in the history
  • Loading branch information
t00mas authored Jan 24, 2025
1 parent cd96b6c commit 63d6201
Show file tree
Hide file tree
Showing 8 changed files with 499 additions and 173 deletions.
32 changes: 25 additions & 7 deletions pkg/exporter/faro/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,41 @@ package faroexporter // import "github.com/grafana/faro/pkg/exporter/faro"
import (
"errors"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/config/configretry"
"go.opentelemetry.io/collector/confmap"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"go.uber.org/multierr"
)

// FaroExporterConfig contains the configuration options for the faro exporter
type FaroExporterConfig struct {
Endpoint string `mapstructure:"endpoint"`
}

// Config contains the main configuration options for the faro exporter
type Config struct {
FaroExporter FaroExporterConfig `mapstructure:"faroexporter"`
confighttp.ClientConfig `mapstructure:",squash"`
exporterhelper.QueueConfig `mapstructure:"sending_queue"`
RetryConfig configretry.BackOffConfig `mapstructure:"retry_on_failure"`
FaroEndpoint string `mapstructure:"faro_endpoint"`
}

func (c *Config) Validate() error {
var errs error
if c.FaroExporter.Endpoint == "" {
if c.Endpoint == "" {
errs = multierr.Append(errs, errors.New("endpoint is required"))
}
return errs
}

func (c *Config) Unmarshal(component *confmap.Conf) error {
if component == nil {
return nil
}

if err := component.Unmarshal(c); err != nil {
return err
}

return nil
}

var _ component.Config = (*Config)(nil)
var _ confmap.Unmarshaler = (*Config)(nil)
129 changes: 108 additions & 21 deletions pkg/exporter/faro/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,53 +3,140 @@
package faroexporter // import "github.com/grafana/faro/pkg/exporter/faro"

import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"net/url"
"runtime"
"strconv"
"time"

httpHelper "github.com/grafana/faro/pkg/exporter/faro/internal/httphelper"
faro "github.com/grafana/faro/pkg/go"
farotranslator "github.com/grafana/faro/pkg/translator/faro"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.uber.org/zap"
)

type faroExporter struct {
config *Config
translator farotranslator.Translator
client FaroClient
logger *zap.Logger
config *Config
client *http.Client
logger *zap.Logger
settings component.TelemetrySettings
userAgent string
}

func newFaroExporter(config *Config, params exporter.Settings) *faroExporter {
faroExporter := &faroExporter{
config: config,
translator: &farotranslator.FaroTranslator{},
logger: params.Logger,
const (
headerRetryAfter = "Retry-After"
maxHTTPResponseReadBytes = 64 * 1024
jsonContentType = "application/json"
)

func newExporter(cfg component.Config, set exporter.Settings) (*faroExporter, error) {
oCfg := cfg.(*Config)

if oCfg.Endpoint != "" {
_, err := url.Parse(oCfg.Endpoint)
if err != nil {
return nil, errors.New("endpoint must be a valid URL")
}
}
return faroExporter

userAgent := fmt.Sprintf("%s/%s (%s/%s)",
set.BuildInfo.Description, set.BuildInfo.Version, runtime.GOOS, runtime.GOARCH)

return &faroExporter{
config: oCfg,
logger: set.Logger,
userAgent: userAgent,
settings: set.TelemetrySettings,
}, nil
}

func (fe *faroExporter) start(_ context.Context, host component.Host) error {
func (fe *faroExporter) start(ctx context.Context, host component.Host) error {
client, err := fe.config.ClientConfig.ToClient(ctx, host, fe.settings)
if err != nil {
return err
}
fe.client = client
return nil
}

func (fe *faroExporter) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: false}
func (fe *faroExporter) export(ctx context.Context, fp []*faro.Payload) error {
fe.logger.Debug("Preparing to make HTTP request", zap.String("endpoint", fe.config.Endpoint))
request, err := json.Marshal(fp)
if err != nil {
return consumererror.NewPermanent(err)
}
req, err := http.NewRequestWithContext(ctx, http.MethodPost, fe.config.Endpoint, bytes.NewReader(request))
if err != nil {
return consumererror.NewPermanent(err)
}
req.Header.Set("Content-Type", jsonContentType)
req.Header.Set("User-Agent", fe.userAgent)

resp, err := fe.client.Do(req)
if err != nil {
return fmt.Errorf("failed to make an HTTP request: %w", err)
}
defer resp.Body.Close()

if resp.StatusCode == http.StatusAccepted {
return nil
}

var errString string
var formattedErr error
if resp.StatusCode != 0 {
errString = fmt.Sprintf(
"error exporting items, request to %s responded with HTTP Status Code %d, Message=%s",
fe.config.Endpoint, resp.StatusCode, resp.Body)
} else {
errString = fmt.Sprintf(
"error exporting items, request to %s responded with HTTP Status Code %d",
fe.config.Endpoint, resp.StatusCode)
}
formattedErr = httpHelper.NewStatusFromMsgAndHTTPCode(errString, resp.StatusCode).Err()

if httpHelper.IsRetryableStatusCode(resp.StatusCode) {
retryAfter := 0
isThrottleError := resp.StatusCode == http.StatusTooManyRequests || resp.StatusCode == http.StatusServiceUnavailable
if val := resp.Header.Get(headerRetryAfter); isThrottleError && val != "" {
if seconds, err := strconv.Atoi(val); err == nil {
retryAfter = seconds
}
}
return exporterhelper.NewThrottleRetry(formattedErr, time.Duration(retryAfter)*time.Second)
}

return consumererror.NewPermanent(formattedErr)
}

func (fe *faroExporter) ConsumeLogs(ctx context.Context, logs plog.Logs) error {
flogs, err := fe.translator.TranslateLogs(ctx, logs)
func (fe *faroExporter) ConsumeTraces(ctx context.Context, td ptrace.Traces) error {
fp, err := farotranslator.TranslateFromTraces(ctx, td)
if err != nil {
return err
return fmt.Errorf("failed to translate traces to faro payload: %w", err)
}
return fe.client.SendLogs(flogs)
return fe.export(ctx, fp)
}

func (fe *faroExporter) ConsumeTraces(ctx context.Context, traces ptrace.Traces) error {
ftraces, err := fe.translator.TranslateTraces(ctx, traces)
func (fe *faroExporter) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
fp, err := farotranslator.TranslateFromLogs(ctx, ld)
if err != nil {
return err
return fmt.Errorf("failed to translate logs to faro payload: %w", err)
}
return fe.client.SendTraces(ftraces)
return fe.export(ctx, fp)
}

func (fe *faroExporter) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: false}
}
87 changes: 87 additions & 0 deletions pkg/exporter/faro/exporter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package faroexporter // import "github.com/grafana/faro/pkg/exporter/faro"

import (
"context"
"fmt"
"net/http"
"net/http/httptest"
"testing"

"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/exporter/exportertest"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/ptrace"
)

func TestAcceptedResponsesAndFormats(t *testing.T) {
tests := []struct {
name string
responseStatus int
responseBody string
err func(srv *httptest.Server) error
isPermErr bool
headers map[string]string
}{
{
name: "202",
responseStatus: http.StatusAccepted,
responseBody: "",
isPermErr: true,
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
srv := createBackend("/faro", func(writer http.ResponseWriter, _ *http.Request) {
for k, v := range test.headers {
writer.Header().Add(k, v)
}
writer.WriteHeader(test.responseStatus)
writer.Write([]byte(test.responseBody))
})
defer srv.Close()

cfg := &Config{
ClientConfig: confighttp.ClientConfig{
Endpoint: srv.URL + "/faro",
},
}

expT, err := createTraces(context.Background(), exportertest.NewNopSettings(), cfg)
require.NoError(t, err)

err = expT.Start(context.Background(), componenttest.NewNopHost())
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, expT.Shutdown(context.Background()))
})

traces := ptrace.NewTraces()
err = expT.ConsumeTraces(context.Background(), traces)
require.NoError(t, err)

expL, err := createLogs(context.Background(), exportertest.NewNopSettings(), cfg)
require.NoError(t, err)

err = expL.Start(context.Background(), componenttest.NewNopHost())
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, expL.Shutdown(context.Background()))
})

logs := plog.NewLogs()
err = expL.ConsumeLogs(context.Background(), logs)
require.NoError(t, err)
})
}
}

func createBackend(endpoint string, handler func(writer http.ResponseWriter, request *http.Request)) *httptest.Server {
mux := http.NewServeMux()
mux.HandleFunc(endpoint, handler)
srv := httptest.NewServer(mux)
fmt.Printf("Server URL: %s\n", srv.URL)
return srv
}
53 changes: 42 additions & 11 deletions pkg/exporter/faro/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,12 @@ package faroexporter // import "github.com/grafana/faro/pkg/exporter/faro"

import (
"context"
"time"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configcompression"
"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/config/configretry"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterhelper"

Expand All @@ -17,27 +21,54 @@ func NewFactory() exporter.Factory {
return exporter.NewFactory(
metadata.Type,
createDefaultConfig,
exporter.WithTraces(createTracesExporter, metadata.TracesStability),
exporter.WithLogs(createLogsExporter, metadata.LogsStability),
exporter.WithTraces(createTraces, metadata.TracesStability),
exporter.WithLogs(createLogs, metadata.LogsStability),
)
}

func createDefaultConfig() component.Config {
clientConfig := confighttp.NewDefaultClientConfig()
clientConfig.Timeout = 30 * time.Second
clientConfig.Compression = configcompression.TypeGzip
clientConfig.WriteBufferSize = 512 * 1024

return &Config{
FaroExporter: FaroExporterConfig{
Endpoint: "",
},
RetryConfig: configretry.NewDefaultBackOffConfig(),
QueueConfig: exporterhelper.NewDefaultQueueConfig(),
ClientConfig: clientConfig,
}
}

func createLogsExporter(ctx context.Context, params exporter.Settings, config component.Config) (exporter.Logs, error) {
faroExporter := newFaroExporter(config.(*Config), params)
func createTraces(ctx context.Context, set exporter.Settings, cfg component.Config) (exporter.Traces, error) {
oce, err := newExporter(cfg, set)
if err != nil {
return nil, err
}
oCfg := cfg.(*Config)

return exporterhelper.NewLogs(ctx, params, config, faroExporter.ConsumeLogs, exporterhelper.WithStart(faroExporter.start))
return exporterhelper.NewTraces(ctx, set, cfg,
oce.ConsumeTraces,
exporterhelper.WithStart(oce.start),
exporterhelper.WithCapabilities(oce.Capabilities()),
exporterhelper.WithTimeout(exporterhelper.TimeoutConfig{Timeout: oCfg.Timeout}),
exporterhelper.WithRetry(oCfg.RetryConfig),
exporterhelper.WithQueue(oCfg.QueueConfig),
)
}

func createTracesExporter(ctx context.Context, params exporter.Settings, config component.Config) (exporter.Traces, error) {
faroExporter := newFaroExporter(config.(*Config), params)
func createLogs(ctx context.Context, set exporter.Settings, cfg component.Config) (exporter.Logs, error) {
oce, err := newExporter(cfg, set)
if err != nil {
return nil, err
}
oCfg := cfg.(*Config)

return exporterhelper.NewTraces(ctx, params, config, faroExporter.ConsumeTraces, exporterhelper.WithStart(faroExporter.start))
return exporterhelper.NewLogs(ctx, set, cfg,
oce.ConsumeLogs,
exporterhelper.WithStart(oce.start),
exporterhelper.WithCapabilities(oce.Capabilities()),
exporterhelper.WithTimeout(exporterhelper.TimeoutConfig{Timeout: oCfg.Timeout}),
exporterhelper.WithRetry(oCfg.RetryConfig),
exporterhelper.WithQueue(oCfg.QueueConfig),
)
}
Loading

0 comments on commit 63d6201

Please sign in to comment.