Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changelog/158.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
```release-note:bug
Fixes a bug in the webhook output where an error when reading the response body was ignored.
```

```release-note:bug
Resolved a potential context leak in the GCS output by ensuring the cancel function is always called.
```
1 change: 1 addition & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ linters:
staticcheck:
checks:
- all
- '-QF1008' # https://staticcheck.dev/docs/checks/#QF1008
exclusions:
generated: lax
presets:
Expand Down
11 changes: 7 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,18 +1,21 @@
LICENSE := ASL2-Short
VERSION ?= local

GOIMPORTS := go run golang.org/x/tools/cmd/goimports
GOLANGCI_LINT:= go run github.com/golangci/golangci-lint/v2/cmd/golangci-lint
GOLICENSER := go run github.com/elastic/go-licenser

check-fmt:
@${GOLICENSER} -d -license ${LICENSE}
@${GOIMPORTS} -l -e -local github.com/elastic . | read && echo "Code differs from gofmt's style. Run 'gofmt -w .'" 1>&2 && exit 1 || true
@${GOLANGCI_LINT} fmt --diff > /dev/null || (echo "Please run 'make fmt' to fix the formatting issues" 1>&2 && exit 1)

docker:
docker build -t docker.elastic.co/observability/stream:${VERSION} .

fmt:
${GOLICENSER} -license ${LICENSE}
${GOIMPORTS} -l -w -local github.com/elastic .
${GOLANGCI_LINT} fmt ./...

.PHONY: check-fmt docker fmt
lint:
@${GOLANGCI_LINT} run ./...

.PHONY: check-fmt docker fmt lint
284 changes: 246 additions & 38 deletions go.mod

Large diffs are not rendered by default.

1,053 changes: 956 additions & 97 deletions go.sum

Large diffs are not rendered by default.

7 changes: 7 additions & 0 deletions internal/cmdutil/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
// Elasticsearch B.V. licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information.

// Package cmdutil provides utility functions and argument validators
// to facilitate the creation and validation of CLI command arguments
// when using the spf13/cobra library.
package cmdutil

import (
Expand Down Expand Up @@ -42,6 +45,10 @@ func RegularFiles(_ *cobra.Command, args []string) error {
return nil
}

// ExpandGlobPatternsFromArgs expands each argument in args as a glob pattern,
// returning a slice containing all matching file paths. If any pattern is
// invalid, an error is returned. Patterns that do not match any files are
// silently ignored.
func ExpandGlobPatternsFromArgs(args []string) ([]string, error) {
var paths []string
for _, pat := range args {
Expand Down
3 changes: 2 additions & 1 deletion internal/command/httpserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,15 @@ func newHTTPServerRunner(options *httpserver.Options, logger *zap.Logger) *cobra
},
}

r.cmd.RunE = func(_ *cobra.Command, args []string) error {
r.cmd.RunE = func(_ *cobra.Command, _ []string) error {
r.logger = logger.Sugar().With("address", options.Addr)
return r.Run()
}

return r.cmd
}

// Run executes the http-server command.
func (r *httpServerRunner) Run() error {
r.logger.Debug("mock server running...")
server, err := httpserver.New(r.opts, r.logger)
Expand Down
3 changes: 2 additions & 1 deletion internal/command/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@ func newLogRunner(options *output.Options, logger *zap.Logger) *cobra.Command {
return r.cmd
}

// Run executes the log command.
func (r *logRunner) Run(args []string) error {
out, err := output.Initialize(r.out, r.logger, r.cmd.Context())
out, err := output.Initialize(r.cmd.Context(), r.out, r.logger)
if err != nil {
return err
}
Expand Down
3 changes: 2 additions & 1 deletion internal/command/pcap.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,9 @@ func newPCAPRunner(options *output.Options, logger *zap.Logger) *cobra.Command {
return r.cmd
}

// Run executes the pcap command.
func (r *pcapRunner) Run(files []string) error {
out, err := output.Initialize(r.out, r.logger, r.cmd.Context())
out, err := output.Initialize(r.cmd.Context(), r.out, r.logger)
if err != nil {
return err
}
Expand Down
55 changes: 34 additions & 21 deletions internal/command/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@
// Elasticsearch B.V. licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information.

// Package command provides the CLI interface and subcommands for the stream
// utility. It defines the root command and all supported subcommands,
// configuring command-line flags, argument validation, and the main entry point
// for invoking stream's features.
package command

import (
Expand Down Expand Up @@ -38,6 +42,8 @@ import (
_ "github.com/elastic/stream/internal/output/webhook"
)

// Execute calls ExecuteContext with a context that is cancelled when
// SIGINT is received.
func Execute() error {
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
Expand All @@ -50,6 +56,9 @@ func Execute() error {
return ExecuteContext(ctx)
}

// ExecuteContext executes the stream command. It sets up the command-line
// flags and initializes them with data from environment variables, before
// executing the command.
func ExecuteContext(ctx context.Context) error {
logger, err := log.NewLogger()
if err != nil {
Expand Down Expand Up @@ -97,10 +106,10 @@ func ExecuteContext(ctx context.Context) error {
rootCmd.PersistentFlags().StringVar(&opts.KafkaOptions.Topic, "kafka-topic", "test", "Kafka topic name")

// GCS output flags.
rootCmd.PersistentFlags().StringVar(&opts.GcsOptions.Bucket, "gcs-bucket", "testbucket", "GCS Bucket name")
rootCmd.PersistentFlags().StringVar(&opts.GcsOptions.Object, "gcs-object", "testobject", "GCS Object name")
rootCmd.PersistentFlags().StringVar(&opts.GcsOptions.ObjectContentType, "gcs-content-type", "application/json", "The Content type of the object to be uploaded to GCS.")
rootCmd.PersistentFlags().StringVar(&opts.GcsOptions.ProjectID, "gcs-projectid", "testproject", "GCS Project name")
rootCmd.PersistentFlags().StringVar(&opts.GCSOptions.Bucket, "gcs-bucket", "testbucket", "GCS Bucket name")
rootCmd.PersistentFlags().StringVar(&opts.GCSOptions.Object, "gcs-object", "testobject", "GCS Object name")
rootCmd.PersistentFlags().StringVar(&opts.GCSOptions.ObjectContentType, "gcs-content-type", "application/json", "The Content type of the object to be uploaded to GCS.")
rootCmd.PersistentFlags().StringVar(&opts.GCSOptions.ProjectID, "gcs-projectid", "testproject", "GCS Project name")

// Lumberjack output flags.
rootCmd.PersistentFlags().BoolVar(&opts.LumberjackOptions.ParseJSON, "lumberjack-parse-json", false, "Parse the input data as JSON and send the structured data as a Lumberjack batch.")
Expand All @@ -126,23 +135,23 @@ func ExecuteContext(ctx context.Context) error {
rootCmd.AddCommand(versionCmd)

// Add common start-up delay logic.
rootCmd.PersistentPreRunE = func(cmd *cobra.Command, args []string) error {
rootCmd.PersistentPreRunE = func(cmd *cobra.Command, _ []string) error {
return multierr.Combine(
waitForStartSignal(&opts, cmd.Context(), logger),
waitForDelay(&opts, cmd.Context(), logger),
waitForStartSignal(cmd.Context(), &opts, logger),
waitForDelay(cmd.Context(), &opts, logger),
)
}

// Automatically set flags based on environment variables.
rootCmd.PersistentFlags().VisitAll(setFlagFromEnv)
rootCmd.PersistentFlags().VisitAll(setFlagFromEnv(logger))
for _, cmd := range rootCmd.Commands() {
cmd.PersistentFlags().VisitAll(setFlagFromEnv)
cmd.PersistentFlags().VisitAll(setFlagFromEnv(logger))
}

return rootCmd.ExecuteContext(ctx)
}

func waitForStartSignal(opts *output.Options, parent context.Context, logger *zap.Logger) error {
func waitForStartSignal(ctx context.Context, opts *output.Options, logger *zap.Logger) error {
if opts.StartSignal == "" {
return nil
}
Expand All @@ -154,30 +163,34 @@ func waitForStartSignal(opts *output.Options, parent context.Context, logger *za

// Wait for the signal or the command context to be done.
logger.Sugar().Infow("Waiting for signal.", "start-signal", opts.StartSignal)
startCtx, _ := osctx.WithSignal(parent, os.Signal(num))
startCtx, _ := osctx.WithSignal(ctx, os.Signal(num))
<-startCtx.Done()
return nil
}

func waitForDelay(opts *output.Options, parent context.Context, logger *zap.Logger) error {
func waitForDelay(ctx context.Context, opts *output.Options, logger *zap.Logger) error {
if opts.Delay <= 0 {
return nil
}

logger.Sugar().Infow("Delaying connection.", "delay", opts.Delay)
if err := timed.Wait(parent, opts.Delay); err != nil {
if err := timed.Wait(ctx, opts.Delay); err != nil {
return fmt.Errorf("delay waiting period was interrupted: %w", err)
}
return nil
}

func setFlagFromEnv(flag *pflag.Flag) {
envVar := strings.ToUpper(flag.Name)
envVar = strings.ReplaceAll(envVar, "-", "_")
envVar = "STREAM_" + envVar

flag.Usage = fmt.Sprintf("%v [env %v]", flag.Usage, envVar)
if value := os.Getenv(envVar); value != "" {
flag.Value.Set(value)
func setFlagFromEnv(l *zap.Logger) func(*pflag.Flag) {
return func(flag *pflag.Flag) {
envVar := strings.ToUpper(flag.Name)
envVar = strings.ReplaceAll(envVar, "-", "_")
envVar = "STREAM_" + envVar

flag.Usage = fmt.Sprintf("%v [env %v]", flag.Usage, envVar)
if value := os.Getenv(envVar); value != "" {
if err := flag.Value.Set(value); err != nil {
l.Error("Failed to set flag from env", zap.String("env", envVar), zap.Error(err))
}
}
}
}
2 changes: 1 addition & 1 deletion internal/command/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ var versionCmd = &cobra.Command{
Use: "version",
Short: "Output version",
Args: cobra.NoArgs,
Run: func(cmd *cobra.Command, args []string) {
Run: func(_ *cobra.Command, _ []string) {
fmt.Printf("stream %s\n", version)
},
}
3 changes: 3 additions & 0 deletions internal/httpserver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ type tpl struct {
*template.Template
}

// Unpack implements the go-ucfg StringUnpacker interface for tpl. It parses the
// provided string as a Go template, registering custom helper functions for use
// within the template, and assigns the resulting template to the receiver.
func (t *tpl) Unpack(in string) error {
parsed, err := template.New("").
Option("missingkey=zero").
Expand Down
30 changes: 21 additions & 9 deletions internal/httpserver/httpserver.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,20 @@
// Licensed to Elasticsearch B.V. under one or more agreements.
// Elasticsearch B.V. licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information.

// Package httpserver provides a configurable mock HTTP server for testing and
// development purposes. It allows users to define request matching rules and
// dynamic templated responses via configuration files. Features include support
// for request sequencing, custom headers, authentication, TLS, and advanced
// response templating.
package httpserver

import (
"bytes"
"context"
"errors"
"fmt"
"io/ioutil"
"io"
"net"
"net/http"
"os"
Expand All @@ -24,6 +30,7 @@ import (
"github.com/elastic/stream/internal/output"
)

// Server is an HTTP server for mocking HTTP responses.
type Server struct {
logger *zap.SugaredLogger
opts *Options
Expand All @@ -32,6 +39,7 @@ type Server struct {
ctx context.Context
}

// Options are the options for the HTTP server.
type Options struct {
*output.Options
TLSCertificate string // TLS certificate file path.
Expand All @@ -46,13 +54,14 @@ type Options struct {
ExitOnUnmatchedRule bool // If true it will exit if a request does not match any rule.
}

// New creates a new HTTP server.
func New(opts *Options, logger *zap.SugaredLogger) (*Server, error) {
if opts.Addr == "" {
return nil, errors.New("a listen address is required")
}

if !(opts.TLSCertificate == "" && opts.TLSKey == "") &&
!(opts.TLSCertificate != "" && opts.TLSKey != "") {
if (opts.TLSCertificate != "" || opts.TLSKey != "") &&
(opts.TLSCertificate == "" || opts.TLSKey == "") {
return nil, errors.New("both TLS certificate and key files must be defined")
}

Expand Down Expand Up @@ -121,6 +130,7 @@ func New(opts *Options, logger *zap.SugaredLogger) (*Server, error) {
}, nil
}

// Start starts the HTTP server.
func (o *Server) Start(ctx context.Context) error {
o.ctx = ctx

Expand All @@ -144,6 +154,8 @@ func (o *Server) Start(ctx context.Context) error {
return nil
}

// Close gracefully shuts down the server without interrupting any
// active connections.
func (o *Server) Close() error {
o.logger.Info("shutting down http-server...")

Expand Down Expand Up @@ -231,7 +243,7 @@ func newHandlerFromConfig(config *config, notFoundHandler http.HandlerFunc, logg
route.Queries(key, v)
}
}
route.MatcherFunc(func(r *http.Request, rm *mux.RouteMatch) bool {
route.MatcherFunc(func(r *http.Request, _ *mux.RouteMatch) bool {
for key := range exclude {
if r.URL.Query().Has(key) {
return false
Expand All @@ -246,7 +258,7 @@ func newHandlerFromConfig(config *config, notFoundHandler http.HandlerFunc, logg
}
}

route.MatcherFunc(func(r *http.Request, rm *mux.RouteMatch) bool {
route.MatcherFunc(func(r *http.Request, _ *mux.RouteMatch) bool {
user, password, _ := r.BasicAuth()
if rule.User != "" && user != rule.User {
return false
Expand All @@ -266,15 +278,15 @@ func newHandlerFromConfig(config *config, notFoundHandler http.HandlerFunc, logg
logger.Errorf("compiling body match regexp: %s", re, err)
}
}
route.MatcherFunc(func(r *http.Request, rm *mux.RouteMatch) bool {
route.MatcherFunc(func(r *http.Request, _ *mux.RouteMatch) bool {
if rule.RequestBody == "" {
return true
}
body, err := ioutil.ReadAll(r.Body)
body, err := io.ReadAll(r.Body)
if err != nil {
return false
}
r.Body = ioutil.NopCloser(bytes.NewBuffer(body))
r.Body = io.NopCloser(bytes.NewBuffer(body))
if bodyRE != nil {
return bodyRE.Match(body)
}
Expand Down Expand Up @@ -304,7 +316,7 @@ func strRequest(r *http.Request) string {
b.WriteString(fmt.Sprintf("'%s: %s' ", k, v))
}
b.WriteString(", Request Body: ")
body, _ := ioutil.ReadAll(r.Body)
body, _ := io.ReadAll(r.Body)
b.Write(body)
return b.String()
}
Loading
Loading