Skip to content

feat: Add structured logging #502

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions src/cmd/gateway/config.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package main

import (
"log/slog"

envstruct "code.cloudfoundry.org/go-envstruct"
"code.cloudfoundry.org/log-cache/internal/config"
"code.cloudfoundry.org/log-cache/internal/tls"
Expand All @@ -18,7 +20,6 @@ type Config struct {

TLS tls.TLS
MetricsServer config.MetricsServer
UseRFC339 bool `env:"USE_RFC339"`
}

// LoadConfig creates Config object from environment variables
Expand All @@ -39,7 +40,7 @@ func LoadConfig() (*Config, error) {

err := envstruct.WriteReport(&c)
if err != nil {
return nil, err
slog.Error("Failed to write report", "error", err)
}

return &c, nil
Expand Down
32 changes: 10 additions & 22 deletions src/cmd/gateway/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package main

import (
"fmt"
"log"
"log/slog"
"os"
"time"

Expand All @@ -15,35 +15,24 @@ import (
_ "net/http/pprof"

. "code.cloudfoundry.org/log-cache/internal/gateway"
"code.cloudfoundry.org/log-cache/internal/plumbing"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
)

func init() {
slog.SetDefault(slog.New(slog.NewTextHandler(os.Stderr, nil)))
}

func main() {
var metricsLoggr *log.Logger
var gatewayLoggr *log.Logger
slog.Info("Starting Log Cache Gateway...")
defer slog.Info("Log Cache Gateway stopped.")

cfg, err := LoadConfig()
if err != nil {
log.Fatalf("invalid configuration: %s", err)
}

if cfg.UseRFC339 {
metricsLoggr = log.New(new(plumbing.LogWriter), "[METRICS] ", 0)
gatewayLoggr = log.New(new(plumbing.LogWriter), "[GATEWAY] ", 0)
log.SetOutput(new(plumbing.LogWriter))
log.SetFlags(0)
} else {
metricsLoggr = log.New(os.Stderr, "[METRICS] ", log.LstdFlags)
gatewayLoggr = log.New(os.Stderr, "[GATEWAY] ", log.LstdFlags)
log.SetFlags(log.LstdFlags | log.Lmicroseconds)
panic(err)
}

log.Print("Starting Log Cache Gateway...")
defer log.Print("Closing Log Cache Gateway.")

metricServerOption := metrics.WithTLSServer(
int(cfg.MetricsServer.Port),
cfg.MetricsServer.CertFile,
Expand All @@ -54,7 +43,7 @@ func main() {
metricServerOption = metrics.WithPublicServer(int(cfg.MetricsServer.Port))
}
m := metrics.NewRegistry(
metricsLoggr,
slog.NewLogLogger(slog.Default().Handler(), slog.LevelInfo),
metricServerOption,
)
if cfg.MetricsServer.DebugMetrics {
Expand All @@ -64,11 +53,10 @@ func main() {
Handler: http.DefaultServeMux,
ReadHeaderTimeout: 2 * time.Second,
}
go func() { log.Println("PPROF SERVER STOPPED " + pprofServer.ListenAndServe().Error()) }()
go func() { slog.Info("pprof server stopped", "error", pprofServer.ListenAndServe().Error()) }()
}

gatewayOptions := []GatewayOption{
WithGatewayLogger(gatewayLoggr),
WithGatewayVersion(cfg.Version),
WithGatewayBlock(),
}
Expand Down
35 changes: 12 additions & 23 deletions src/internal/gateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package gateway
import (
"fmt"
"io"
"log"
"log/slog"
"net"
"net/http"
"time"
Expand All @@ -21,8 +21,6 @@ import (

// Gateway provides a RESTful API into LogCache's gRPC API.
type Gateway struct {
log *log.Logger

logCacheAddr string
logCacheVersion string
uptimeFn func() int64
Expand All @@ -40,7 +38,6 @@ type Gateway struct {
// invoked before using the Gateway.
func NewGateway(logCacheAddr, gatewayAddr string, opts ...GatewayOption) *Gateway {
g := &Gateway{
log: log.New(io.Discard, "", 0),
logCacheAddr: logCacheAddr,
gatewayAddr: gatewayAddr,
uptimeFn: uptimeInSeconds,
Expand All @@ -56,14 +53,6 @@ func NewGateway(logCacheAddr, gatewayAddr string, opts ...GatewayOption) *Gatewa
// GatewayOption configures a Gateway.
type GatewayOption func(*Gateway)

// WithGatewayLogger returns a GatewayOption that configures the logger for
// the Gateway. It defaults to no logging.
func WithGatewayLogger(l *log.Logger) GatewayOption {
return func(g *Gateway) {
g.log = l
}
}

// WithGatewayBlock returns a GatewayOption that determines if Start launches
// a go-routine or not. It defaults to launching a go-routine. If this is set,
// start will block on serving the HTTP endpoint.
Expand Down Expand Up @@ -107,12 +96,12 @@ func WithGatewayTLSServer(certPath, keyPath string) GatewayOption {
// Start starts the gateway to start receiving and forwarding requests. It
// does not block unless WithGatewayBlock was set.
func (g *Gateway) Start() {
slog.Info("Starting server", "address", g.gatewayAddr)
lis, err := net.Listen("tcp", g.gatewayAddr)
if err != nil {
g.log.Fatalf("failed to listen on addr %s: %s", g.gatewayAddr, err)
panic(err)
}
g.lis = lis
g.log.Printf("listening on %s...", lis.Addr().String())

if g.blockOnStart {
g.listenAndServe()
Expand All @@ -138,7 +127,7 @@ func (g *Gateway) listenAndServe() {

conn, err := grpc.NewClient(g.logCacheAddr, g.logCacheDialOpts...)
if err != nil {
g.log.Fatalf("failed to dial Log Cache: %s", err)
panic(err)
}

err = logcache_v1.RegisterEgressHandlerClient(
Expand All @@ -147,7 +136,7 @@ func (g *Gateway) listenAndServe() {
logcache_v1.NewEgressClient(conn),
)
if err != nil {
g.log.Fatalf("failed to register LogCache handler: %s", err)
panic(err)
}

err = logcache_v1.RegisterPromQLQuerierHandlerClient(
Expand All @@ -156,7 +145,7 @@ func (g *Gateway) listenAndServe() {
logcache_v1.NewPromQLQuerierClient(conn),
)
if err != nil {
g.log.Fatalf("failed to register PromQLQuerier handler: %s", err)
panic(err)
}

topLevelMux := http.NewServeMux()
Expand All @@ -169,19 +158,19 @@ func (g *Gateway) listenAndServe() {
}
if g.certPath != "" || g.keyPath != "" {
if err := server.ServeTLS(g.lis, g.certPath, g.keyPath); err != nil {
g.log.Fatalf("failed to serve HTTPS endpoint: %s", err)
panic(err)
}
} else {
if err := server.Serve(g.lis); err != nil {
g.log.Fatalf("failed to serve HTTP endpoint: %s", err)
panic(err)
}
}
}

func (g *Gateway) handleInfoEndpoint(w http.ResponseWriter, r *http.Request) {
_, err := w.Write([]byte(fmt.Sprintf(`{"version":"%s","vm_uptime":"%d"}`+"\n", g.logCacheVersion, g.uptimeFn())))
if err != nil {
g.log.Println("Cannot send result for the info endpoint")
slog.Error("Failed to send result for the info endpoint", "error", err)
}
}

Expand Down Expand Up @@ -222,16 +211,16 @@ func (g *Gateway) httpErrorHandler(

buf, merr := marshaler.Marshal(body)
if merr != nil {
g.log.Printf("Failed to marshal error message %q: %v", body, merr)
slog.Error("Failed to marshal error message", "error", merr)
w.WriteHeader(http.StatusInternalServerError)
if _, err := io.WriteString(w, fallback); err != nil {
g.log.Printf("Failed to write response: %v", err)
slog.Error("Failed to write response", "error", err)
}
return
}

w.WriteHeader(runtime.HTTPStatusFromCode(status.Code(err)))
if _, err := w.Write(buf); err != nil {
g.log.Printf("Failed to write response: %v", err)
slog.Error("Failed to write response", "error", err)
}
}
4 changes: 4 additions & 0 deletions src/internal/gateway/gateway_suite_test.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
package gateway_test

import (
"log/slog"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"

"testing"
)

func TestGateway(t *testing.T) {
slog.SetDefault(slog.New(slog.NewTextHandler(GinkgoWriter, nil)))

RegisterFailHandler(Fail)
RunSpecs(t, "Gateway Suite")
}
Loading