From f033e661e1c5e2674012cc50f0734d7d8e846e0e Mon Sep 17 00:00:00 2001 From: Toan Nguyen Date: Sun, 25 Feb 2024 23:39:36 +0700 Subject: [PATCH] improve OpenTelemetry attributes and enable tracing visibility (#10) --- connector/server.go | 185 +++++++++++++++++------------------------ connector/telemetry.go | 32 ++++++- connector/types.go | 2 +- 3 files changed, 105 insertions(+), 114 deletions(-) diff --git a/connector/server.go b/connector/server.go index 2a696123..a66fedf9 100644 --- a/connector/server.go +++ b/connector/server.go @@ -109,8 +109,8 @@ func (s *Server[Configuration, State]) withAuth(handler http.HandlerFunc) http.H }) s.telemetry.queryCounter.Add(r.Context(), 1, metric.WithAttributes( - attribute.String("status", "failed"), - attribute.String("reason", "unauthorized"), + failureStatusAttribute, + httpStatusAttribute(http.StatusUnauthorized), )) return } @@ -156,31 +156,14 @@ func (s *Server[Configuration, State]) Query(w http.ResponseWriter, r *http.Requ ctx, span := s.telemetry.Tracer.Start(r.Context(), "Query", trace.WithSpanKind(trace.SpanKindServer)) defer span.End() - attributes := []attribute.KeyValue{} - _, decodeSpan := s.telemetry.Tracer.Start(ctx, "Decode JSON Body") - defer decodeSpan.End() var body schema.QueryRequest - if err := json.NewDecoder(r.Body).Decode(&body); err != nil { - writeJson(w, logger, http.StatusBadRequest, schema.ErrorResponse{ - Message: "failed to decode json request body", - Details: map[string]any{ - "cause": err.Error(), - }, - }) - - attributes := []attribute.KeyValue{ - attribute.String("status", "failed"), - attribute.String("reason", "json_decode"), - } - span.SetAttributes(attributes...) - s.telemetry.queryCounter.Add(r.Context(), 1, metric.WithAttributes(attributes...)) + if err := s.unmarshalBodyJSON(w, r, ctx, span, s.telemetry.queryCounter, &body); err != nil { return } - decodeSpan.End() collectionAttr := attribute.String("collection", body.Collection) - attributes = append(attributes, collectionAttr) - span.SetAttributes(attributes...) + span.SetAttributes(collectionAttr) + execQueryCtx, execQuerySpan := s.telemetry.Tracer.Start(ctx, "Execute Query") defer execQuerySpan.End() @@ -189,22 +172,21 @@ func (s *Server[Configuration, State]) Query(w http.ResponseWriter, r *http.Requ if err != nil { status := writeError(w, logger, err) statusAttributes := []attribute.KeyValue{ - attribute.String("status", "failed"), - attribute.String("reason", fmt.Sprintf("%d", status)), + failureStatusAttribute, + httpStatusAttribute(status), } - span.SetAttributes(statusAttributes...) - s.telemetry.queryCounter.Add(r.Context(), 1, metric.WithAttributes(append(attributes, statusAttributes...)...)) + span.SetAttributes(append(statusAttributes, attribute.String("reason", err.Error()))...) + s.telemetry.queryCounter.Add(r.Context(), 1, metric.WithAttributes(append(statusAttributes, collectionAttr)...)) return } execQuerySpan.End() - statusAttribute := attribute.String("status", "success") - span.SetAttributes(statusAttribute) + span.SetAttributes(successStatusAttribute) _, responseSpan := s.telemetry.Tracer.Start(ctx, "Response") writeJson(w, logger, http.StatusOK, response) responseSpan.End() - s.telemetry.queryCounter.Add(r.Context(), 1, metric.WithAttributes(append(attributes, statusAttribute)...)) + s.telemetry.queryCounter.Add(r.Context(), 1, metric.WithAttributes(collectionAttr, successStatusAttribute)) // record latency for success requests only s.telemetry.queryLatencyHistogram.Record(r.Context(), time.Since(startTime).Seconds(), metric.WithAttributes(collectionAttr)) } @@ -216,31 +198,14 @@ func (s *Server[Configuration, State]) QueryExplain(w http.ResponseWriter, r *ht ctx, span := s.telemetry.Tracer.Start(r.Context(), "Query Explain", trace.WithSpanKind(trace.SpanKindServer)) defer span.End() - attributes := []attribute.KeyValue{} - _, decodeSpan := s.telemetry.Tracer.Start(ctx, "Decode JSON Body") - defer decodeSpan.End() var body schema.QueryRequest - if err := json.NewDecoder(r.Body).Decode(&body); err != nil { - writeJson(w, logger, http.StatusBadRequest, schema.ErrorResponse{ - Message: "failed to decode json request body", - Details: map[string]any{ - "cause": err.Error(), - }, - }) - - attributes := []attribute.KeyValue{ - attribute.String("status", "failed"), - attribute.String("reason", "json_decode"), - } - span.SetAttributes(attributes...) - s.telemetry.queryExplainCounter.Add(r.Context(), 1, metric.WithAttributes(attributes...)) + if err := s.unmarshalBodyJSON(w, r, ctx, span, s.telemetry.queryExplainCounter, &body); err != nil { return } - decodeSpan.End() collectionAttr := attribute.String("collection", body.Collection) - attributes = append(attributes, collectionAttr) - span.SetAttributes(attributes...) + span.SetAttributes(collectionAttr) + execCtx, execSpan := s.telemetry.Tracer.Start(ctx, "Execute Explain") defer execSpan.End() @@ -248,21 +213,20 @@ func (s *Server[Configuration, State]) QueryExplain(w http.ResponseWriter, r *ht if err != nil { status := writeError(w, logger, err) statusAttributes := []attribute.KeyValue{ - attribute.String("status", "failed"), - attribute.String("reason", fmt.Sprintf("%d", status)), + failureStatusAttribute, + httpStatusAttribute(status), } - span.SetAttributes(attributes...) - s.telemetry.queryExplainCounter.Add(r.Context(), 1, metric.WithAttributes(append(attributes, statusAttributes...)...)) + span.SetAttributes(append(statusAttributes, attribute.String("reason", err.Error()))...) + s.telemetry.queryExplainCounter.Add(r.Context(), 1, metric.WithAttributes(append(statusAttributes, collectionAttr)...)) return } execSpan.End() - statusAttribute := attribute.String("status", "success") - span.SetAttributes(statusAttribute) + span.SetAttributes(successStatusAttribute) _, responseSpan := s.telemetry.Tracer.Start(ctx, "Response") writeJson(w, logger, http.StatusOK, response) responseSpan.End() - s.telemetry.queryExplainCounter.Add(r.Context(), 1, metric.WithAttributes(append(attributes, statusAttribute)...)) + s.telemetry.queryExplainCounter.Add(r.Context(), 1, metric.WithAttributes(successStatusAttribute, collectionAttr)) // record latency for success requests only s.telemetry.queryExplainLatencyHistogram.Record(r.Context(), time.Since(startTime).Seconds(), metric.WithAttributes(collectionAttr)) @@ -275,35 +239,19 @@ func (s *Server[Configuration, State]) MutationExplain(w http.ResponseWriter, r ctx, span := s.telemetry.Tracer.Start(r.Context(), "Mutation Explain", trace.WithSpanKind(trace.SpanKindServer)) defer span.End() - attributes := []attribute.KeyValue{} - _, decodeSpan := s.telemetry.Tracer.Start(ctx, "Decode JSON Body") - defer decodeSpan.End() var body schema.MutationRequest - if err := json.NewDecoder(r.Body).Decode(&body); err != nil { - writeJson(w, logger, http.StatusBadRequest, schema.ErrorResponse{ - Message: "failed to decode json request body", - Details: map[string]any{ - "cause": err.Error(), - }, - }) - - attributes := []attribute.KeyValue{ - attribute.String("status", "failed"), - attribute.String("reason", "json_decode"), - } - span.SetAttributes(attributes...) - s.telemetry.mutationExplainCounter.Add(r.Context(), 1, metric.WithAttributes(attributes...)) + if err := s.unmarshalBodyJSON(w, r, ctx, span, s.telemetry.mutationExplainCounter, &body); err != nil { return } - decodeSpan.End() var operationNames []string for _, op := range body.Operations { operationNames = append(operationNames, op.Name) } - collectionAttr := attribute.String("operations", strings.Join(operationNames, ",")) - attributes = append(attributes, collectionAttr) - span.SetAttributes(attributes...) + + operationAttr := attribute.String("operations", strings.Join(operationNames, ",")) + span.SetAttributes(operationAttr) + execCtx, execSpan := s.telemetry.Tracer.Start(ctx, "Execute Explain") defer execSpan.End() @@ -311,24 +259,24 @@ func (s *Server[Configuration, State]) MutationExplain(w http.ResponseWriter, r if err != nil { status := writeError(w, logger, err) statusAttributes := []attribute.KeyValue{ - attribute.String("status", "failed"), - attribute.String("reason", fmt.Sprintf("%d", status)), + failureStatusAttribute, + httpStatusAttribute(status), } - span.SetAttributes(attributes...) - s.telemetry.mutationExplainCounter.Add(r.Context(), 1, metric.WithAttributes(append(attributes, statusAttributes...)...)) + span.SetAttributes(append(statusAttributes, attribute.String("reason", err.Error()))...) + s.telemetry.mutationExplainCounter.Add(r.Context(), 1, metric.WithAttributes(append(statusAttributes, operationAttr)...)) return } execSpan.End() - statusAttribute := attribute.String("status", "success") - span.SetAttributes(statusAttribute) + span.SetAttributes(successStatusAttribute) + _, responseSpan := s.telemetry.Tracer.Start(ctx, "Response") writeJson(w, logger, http.StatusOK, response) responseSpan.End() - s.telemetry.mutationExplainCounter.Add(r.Context(), 1, metric.WithAttributes(append(attributes, statusAttribute)...)) + s.telemetry.mutationExplainCounter.Add(r.Context(), 1, metric.WithAttributes(successStatusAttribute, operationAttr)) // record latency for success requests only - s.telemetry.mutationExplainLatencyHistogram.Record(r.Context(), time.Since(startTime).Seconds(), metric.WithAttributes(collectionAttr)) + s.telemetry.mutationExplainLatencyHistogram.Record(r.Context(), time.Since(startTime).Seconds(), metric.WithAttributes(operationAttr)) } // Mutation implements a handler for the /mutation endpoint, POST method that executes a mutation. @@ -338,52 +286,67 @@ func (s *Server[Configuration, State]) Mutation(w http.ResponseWriter, r *http.R ctx, span := s.telemetry.Tracer.Start(r.Context(), "Mutation", trace.WithSpanKind(trace.SpanKindServer)) defer span.End() - _, decodeSpan := s.telemetry.Tracer.Start(ctx, "Decode JSON Body") - defer decodeSpan.End() var body schema.MutationRequest - if err := json.NewDecoder(r.Body).Decode(&body); err != nil { - writeJson(w, logger, http.StatusBadRequest, schema.ErrorResponse{ - Message: "failed to decode json request body", - Details: map[string]any{ - "cause": err.Error(), - }, - }) - - attributes := []attribute.KeyValue{ - attribute.String("status", "failed"), - attribute.String("reason", "json_decode"), - } - span.SetAttributes(attributes...) - s.telemetry.mutationCounter.Add(r.Context(), 1, metric.WithAttributes(attributes...)) + if err := s.unmarshalBodyJSON(w, r, ctx, span, s.telemetry.mutationCounter, &body); err != nil { return } - decodeSpan.End() + + var operationNames []string + for _, op := range body.Operations { + operationNames = append(operationNames, op.Name) + } + + operationAttr := attribute.String("operations", strings.Join(operationNames, ",")) + span.SetAttributes(operationAttr) execCtx, execSpan := s.telemetry.Tracer.Start(ctx, "Execute Mutation") defer execSpan.End() response, err := s.connector.Mutation(execCtx, s.configuration, s.state, &body) if err != nil { status := writeError(w, logger, err) - attributes := []attribute.KeyValue{ - attribute.String("status", "failed"), - attribute.String("reason", fmt.Sprintf("%d", status)), + statusAttributes := []attribute.KeyValue{ + failureStatusAttribute, + httpStatusAttribute(status), } - span.SetAttributes(attributes...) - s.telemetry.mutationCounter.Add(r.Context(), 1, metric.WithAttributes(attributes...)) + span.SetAttributes(append(statusAttributes, attribute.String("reason", err.Error()))...) + s.telemetry.mutationCounter.Add(r.Context(), 1, metric.WithAttributes(append(statusAttributes, operationAttr)...)) return } execSpan.End() - attributes := attribute.String("status", "success") - span.SetAttributes(attributes) + span.SetAttributes(successStatusAttribute) _, responseSpan := s.telemetry.Tracer.Start(ctx, "Response") writeJson(w, logger, http.StatusOK, response) responseSpan.End() - s.telemetry.mutationCounter.Add(r.Context(), 1, metric.WithAttributes(attributes)) + s.telemetry.mutationCounter.Add(r.Context(), 1, metric.WithAttributes(successStatusAttribute, operationAttr)) // record latency for success requests only - s.telemetry.mutationLatencyHistogram.Record(r.Context(), time.Since(startTime).Seconds()) + s.telemetry.mutationLatencyHistogram.Record(r.Context(), time.Since(startTime).Seconds(), metric.WithAttributes(operationAttr)) +} + +// the common unmarshal json body method +func (s *Server[Configuration, State]) unmarshalBodyJSON(w http.ResponseWriter, r *http.Request, ctx context.Context, span trace.Span, counter metric.Int64Counter, body any) error { + _, decodeSpan := s.telemetry.Tracer.Start(ctx, "Decode JSON Body") + defer decodeSpan.End() + if err := json.NewDecoder(r.Body).Decode(body); err != nil { + writeJson(w, GetLogger(r.Context()), http.StatusBadRequest, schema.ErrorResponse{ + Message: "failed to decode json request body", + Details: map[string]any{ + "cause": err.Error(), + }, + }) + + attributes := []attribute.KeyValue{ + failureStatusAttribute, + httpStatusAttribute(http.StatusBadRequest), + } + span.SetAttributes(append(attributes, attribute.String("status", "json_decode"))...) + s.telemetry.mutationExplainCounter.Add(r.Context(), 1, metric.WithAttributes(attributes...)) + return err + } + + return nil } func (s *Server[Configuration, State]) buildHandler() *http.ServeMux { diff --git a/connector/telemetry.go b/connector/telemetry.go index 5d46599d..3ab28a57 100644 --- a/connector/telemetry.go +++ b/connector/telemetry.go @@ -13,6 +13,7 @@ import ( "github.com/prometheus/client_golang/prometheus/collectors" "github.com/rs/zerolog" "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc" "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp" "go.opentelemetry.io/otel/exporters/otlp/otlptrace" @@ -28,8 +29,14 @@ import ( traceapi "go.opentelemetry.io/otel/trace" ) +var ( + userVisibilityAttribute = traceapi.WithAttributes(attribute.String("internal.visibility", "user")) + successStatusAttribute = attribute.String("status", "success") + failureStatusAttribute = attribute.String("status", "failure") +) + type TelemetryState struct { - Tracer traceapi.Tracer + Tracer *Tracer Meter metricapi.Meter Shutdown func(context.Context) error queryCounter metricapi.Int64Counter @@ -182,7 +189,7 @@ func setupOTelSDK(ctx context.Context, serverOptions *ServerOptions, serviceVers } state := &TelemetryState{ - Tracer: traceProvider.Tracer(serverOptions.ServiceName), + Tracer: &Tracer{traceProvider.Tracer(serverOptions.ServiceName)}, Meter: meterProvider.Meter(serverOptions.ServiceName), Shutdown: shutdownFunc, } @@ -279,3 +286,24 @@ func setupMetrics(telemetry *TelemetryState, metricsPrefix string) error { return err } + +// Tracer is the wrapper of traceapi.Tracer with user visibility on Hasura Console +type Tracer struct { + tracer traceapi.Tracer +} + +// Start creates a span and a context.Context containing the newly-created span +// with `internal.visibility: "user"` so that it shows up in the Hasura Console. +func (t *Tracer) Start(ctx context.Context, spanName string, opts ...traceapi.SpanStartOption) (context.Context, traceapi.Span) { + return t.tracer.Start(ctx, spanName, append(opts, userVisibilityAttribute)...) +} + +// StartInternal creates a span and a context.Context containing the newly-created span. +// It won't show up in the Hasura Console +func (t *Tracer) StartInternal(ctx context.Context, spanName string, opts ...traceapi.SpanStartOption) (context.Context, traceapi.Span) { + return t.tracer.Start(ctx, spanName, opts...) +} + +func httpStatusAttribute(code int) attribute.KeyValue { + return attribute.Int("http_status", code) +} diff --git a/connector/types.go b/connector/types.go index 59103f2b..e146b3e5 100644 --- a/connector/types.go +++ b/connector/types.go @@ -90,7 +90,7 @@ type serveOptions struct { func defaultServeOptions() *serveOptions { return &serveOptions{ logger: log.Level(zerolog.GlobalLevel()), - serviceName: "ndc-go", + serviceName: "hasura-ndc-go", version: "0.1.0", withoutConfig: false, withoutRecovery: false,