Skip to content

Commit

Permalink
Merge branch 'main' into feat/generator
Browse files Browse the repository at this point in the history
  • Loading branch information
hgiasac committed Feb 25, 2024
2 parents 30a1b9d + f033e66 commit 6367f35
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 114 deletions.
185 changes: 74 additions & 111 deletions connector/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,8 @@ func (s *Server[Configuration, State]) withAuth(handler http.HandlerFunc) http.H
})

s.telemetry.queryCounter.Add(r.Context(), 1, metric.WithAttributes(
attribute.String("status", "failed"),
attribute.String("reason", "unauthorized"),
failureStatusAttribute,
httpStatusAttribute(http.StatusUnauthorized),
))
return
}
Expand Down Expand Up @@ -156,31 +156,14 @@ func (s *Server[Configuration, State]) Query(w http.ResponseWriter, r *http.Requ
ctx, span := s.telemetry.Tracer.Start(r.Context(), "Query", trace.WithSpanKind(trace.SpanKindServer))
defer span.End()

attributes := []attribute.KeyValue{}
_, decodeSpan := s.telemetry.Tracer.Start(ctx, "Decode JSON Body")
defer decodeSpan.End()
var body schema.QueryRequest
if err := json.NewDecoder(r.Body).Decode(&body); err != nil {
writeJson(w, logger, http.StatusBadRequest, schema.ErrorResponse{
Message: "failed to decode json request body",
Details: map[string]any{
"cause": err.Error(),
},
})

attributes := []attribute.KeyValue{
attribute.String("status", "failed"),
attribute.String("reason", "json_decode"),
}
span.SetAttributes(attributes...)
s.telemetry.queryCounter.Add(r.Context(), 1, metric.WithAttributes(attributes...))
if err := s.unmarshalBodyJSON(w, r, ctx, span, s.telemetry.queryCounter, &body); err != nil {
return
}
decodeSpan.End()

collectionAttr := attribute.String("collection", body.Collection)
attributes = append(attributes, collectionAttr)
span.SetAttributes(attributes...)
span.SetAttributes(collectionAttr)

execQueryCtx, execQuerySpan := s.telemetry.Tracer.Start(ctx, "Execute Query")
defer execQuerySpan.End()

Expand All @@ -189,22 +172,21 @@ func (s *Server[Configuration, State]) Query(w http.ResponseWriter, r *http.Requ
if err != nil {
status := writeError(w, logger, err)
statusAttributes := []attribute.KeyValue{
attribute.String("status", "failed"),
attribute.String("reason", fmt.Sprintf("%d", status)),
failureStatusAttribute,
httpStatusAttribute(status),
}
span.SetAttributes(statusAttributes...)
s.telemetry.queryCounter.Add(r.Context(), 1, metric.WithAttributes(append(attributes, statusAttributes...)...))
span.SetAttributes(append(statusAttributes, attribute.String("reason", err.Error()))...)
s.telemetry.queryCounter.Add(r.Context(), 1, metric.WithAttributes(append(statusAttributes, collectionAttr)...))
return
}
execQuerySpan.End()

statusAttribute := attribute.String("status", "success")
span.SetAttributes(statusAttribute)
span.SetAttributes(successStatusAttribute)
_, responseSpan := s.telemetry.Tracer.Start(ctx, "Response")
writeJson(w, logger, http.StatusOK, response)
responseSpan.End()

s.telemetry.queryCounter.Add(r.Context(), 1, metric.WithAttributes(append(attributes, statusAttribute)...))
s.telemetry.queryCounter.Add(r.Context(), 1, metric.WithAttributes(collectionAttr, successStatusAttribute))
// record latency for success requests only
s.telemetry.queryLatencyHistogram.Record(r.Context(), time.Since(startTime).Seconds(), metric.WithAttributes(collectionAttr))
}
Expand All @@ -216,53 +198,35 @@ func (s *Server[Configuration, State]) QueryExplain(w http.ResponseWriter, r *ht
ctx, span := s.telemetry.Tracer.Start(r.Context(), "Query Explain", trace.WithSpanKind(trace.SpanKindServer))
defer span.End()

attributes := []attribute.KeyValue{}
_, decodeSpan := s.telemetry.Tracer.Start(ctx, "Decode JSON Body")
defer decodeSpan.End()
var body schema.QueryRequest
if err := json.NewDecoder(r.Body).Decode(&body); err != nil {
writeJson(w, logger, http.StatusBadRequest, schema.ErrorResponse{
Message: "failed to decode json request body",
Details: map[string]any{
"cause": err.Error(),
},
})

attributes := []attribute.KeyValue{
attribute.String("status", "failed"),
attribute.String("reason", "json_decode"),
}
span.SetAttributes(attributes...)
s.telemetry.queryExplainCounter.Add(r.Context(), 1, metric.WithAttributes(attributes...))
if err := s.unmarshalBodyJSON(w, r, ctx, span, s.telemetry.queryExplainCounter, &body); err != nil {
return
}
decodeSpan.End()

collectionAttr := attribute.String("collection", body.Collection)
attributes = append(attributes, collectionAttr)
span.SetAttributes(attributes...)
span.SetAttributes(collectionAttr)

execCtx, execSpan := s.telemetry.Tracer.Start(ctx, "Execute Explain")
defer execSpan.End()

response, err := s.connector.QueryExplain(execCtx, s.configuration, s.state, &body)
if err != nil {
status := writeError(w, logger, err)
statusAttributes := []attribute.KeyValue{
attribute.String("status", "failed"),
attribute.String("reason", fmt.Sprintf("%d", status)),
failureStatusAttribute,
httpStatusAttribute(status),
}
span.SetAttributes(attributes...)
s.telemetry.queryExplainCounter.Add(r.Context(), 1, metric.WithAttributes(append(attributes, statusAttributes...)...))
span.SetAttributes(append(statusAttributes, attribute.String("reason", err.Error()))...)
s.telemetry.queryExplainCounter.Add(r.Context(), 1, metric.WithAttributes(append(statusAttributes, collectionAttr)...))
return
}
execSpan.End()

statusAttribute := attribute.String("status", "success")
span.SetAttributes(statusAttribute)
span.SetAttributes(successStatusAttribute)
_, responseSpan := s.telemetry.Tracer.Start(ctx, "Response")
writeJson(w, logger, http.StatusOK, response)
responseSpan.End()
s.telemetry.queryExplainCounter.Add(r.Context(), 1, metric.WithAttributes(append(attributes, statusAttribute)...))
s.telemetry.queryExplainCounter.Add(r.Context(), 1, metric.WithAttributes(successStatusAttribute, collectionAttr))

// record latency for success requests only
s.telemetry.queryExplainLatencyHistogram.Record(r.Context(), time.Since(startTime).Seconds(), metric.WithAttributes(collectionAttr))
Expand All @@ -275,60 +239,44 @@ func (s *Server[Configuration, State]) MutationExplain(w http.ResponseWriter, r
ctx, span := s.telemetry.Tracer.Start(r.Context(), "Mutation Explain", trace.WithSpanKind(trace.SpanKindServer))
defer span.End()

attributes := []attribute.KeyValue{}
_, decodeSpan := s.telemetry.Tracer.Start(ctx, "Decode JSON Body")
defer decodeSpan.End()
var body schema.MutationRequest
if err := json.NewDecoder(r.Body).Decode(&body); err != nil {
writeJson(w, logger, http.StatusBadRequest, schema.ErrorResponse{
Message: "failed to decode json request body",
Details: map[string]any{
"cause": err.Error(),
},
})

attributes := []attribute.KeyValue{
attribute.String("status", "failed"),
attribute.String("reason", "json_decode"),
}
span.SetAttributes(attributes...)
s.telemetry.mutationExplainCounter.Add(r.Context(), 1, metric.WithAttributes(attributes...))
if err := s.unmarshalBodyJSON(w, r, ctx, span, s.telemetry.mutationExplainCounter, &body); err != nil {
return
}
decodeSpan.End()

var operationNames []string
for _, op := range body.Operations {
operationNames = append(operationNames, op.Name)
}
collectionAttr := attribute.String("operations", strings.Join(operationNames, ","))
attributes = append(attributes, collectionAttr)
span.SetAttributes(attributes...)

operationAttr := attribute.String("operations", strings.Join(operationNames, ","))
span.SetAttributes(operationAttr)

execCtx, execSpan := s.telemetry.Tracer.Start(ctx, "Execute Explain")
defer execSpan.End()

response, err := s.connector.MutationExplain(execCtx, s.configuration, s.state, &body)
if err != nil {
status := writeError(w, logger, err)
statusAttributes := []attribute.KeyValue{
attribute.String("status", "failed"),
attribute.String("reason", fmt.Sprintf("%d", status)),
failureStatusAttribute,
httpStatusAttribute(status),
}
span.SetAttributes(attributes...)
s.telemetry.mutationExplainCounter.Add(r.Context(), 1, metric.WithAttributes(append(attributes, statusAttributes...)...))
span.SetAttributes(append(statusAttributes, attribute.String("reason", err.Error()))...)
s.telemetry.mutationExplainCounter.Add(r.Context(), 1, metric.WithAttributes(append(statusAttributes, operationAttr)...))
return
}
execSpan.End()

statusAttribute := attribute.String("status", "success")
span.SetAttributes(statusAttribute)
span.SetAttributes(successStatusAttribute)

_, responseSpan := s.telemetry.Tracer.Start(ctx, "Response")
writeJson(w, logger, http.StatusOK, response)
responseSpan.End()
s.telemetry.mutationExplainCounter.Add(r.Context(), 1, metric.WithAttributes(append(attributes, statusAttribute)...))
s.telemetry.mutationExplainCounter.Add(r.Context(), 1, metric.WithAttributes(successStatusAttribute, operationAttr))

// record latency for success requests only
s.telemetry.mutationExplainLatencyHistogram.Record(r.Context(), time.Since(startTime).Seconds(), metric.WithAttributes(collectionAttr))
s.telemetry.mutationExplainLatencyHistogram.Record(r.Context(), time.Since(startTime).Seconds(), metric.WithAttributes(operationAttr))
}

// Mutation implements a handler for the /mutation endpoint, POST method that executes a mutation.
Expand All @@ -338,52 +286,67 @@ func (s *Server[Configuration, State]) Mutation(w http.ResponseWriter, r *http.R
ctx, span := s.telemetry.Tracer.Start(r.Context(), "Mutation", trace.WithSpanKind(trace.SpanKindServer))
defer span.End()

_, decodeSpan := s.telemetry.Tracer.Start(ctx, "Decode JSON Body")
defer decodeSpan.End()
var body schema.MutationRequest
if err := json.NewDecoder(r.Body).Decode(&body); err != nil {
writeJson(w, logger, http.StatusBadRequest, schema.ErrorResponse{
Message: "failed to decode json request body",
Details: map[string]any{
"cause": err.Error(),
},
})

attributes := []attribute.KeyValue{
attribute.String("status", "failed"),
attribute.String("reason", "json_decode"),
}
span.SetAttributes(attributes...)
s.telemetry.mutationCounter.Add(r.Context(), 1, metric.WithAttributes(attributes...))
if err := s.unmarshalBodyJSON(w, r, ctx, span, s.telemetry.mutationCounter, &body); err != nil {
return
}
decodeSpan.End()

var operationNames []string
for _, op := range body.Operations {
operationNames = append(operationNames, op.Name)
}

operationAttr := attribute.String("operations", strings.Join(operationNames, ","))
span.SetAttributes(operationAttr)

execCtx, execSpan := s.telemetry.Tracer.Start(ctx, "Execute Mutation")
defer execSpan.End()
response, err := s.connector.Mutation(execCtx, s.configuration, s.state, &body)
if err != nil {
status := writeError(w, logger, err)
attributes := []attribute.KeyValue{
attribute.String("status", "failed"),
attribute.String("reason", fmt.Sprintf("%d", status)),
statusAttributes := []attribute.KeyValue{
failureStatusAttribute,
httpStatusAttribute(status),
}
span.SetAttributes(attributes...)
s.telemetry.mutationCounter.Add(r.Context(), 1, metric.WithAttributes(attributes...))
span.SetAttributes(append(statusAttributes, attribute.String("reason", err.Error()))...)
s.telemetry.mutationCounter.Add(r.Context(), 1, metric.WithAttributes(append(statusAttributes, operationAttr)...))
return
}
execSpan.End()

attributes := attribute.String("status", "success")
span.SetAttributes(attributes)
span.SetAttributes(successStatusAttribute)
_, responseSpan := s.telemetry.Tracer.Start(ctx, "Response")
writeJson(w, logger, http.StatusOK, response)
responseSpan.End()

s.telemetry.mutationCounter.Add(r.Context(), 1, metric.WithAttributes(attributes))
s.telemetry.mutationCounter.Add(r.Context(), 1, metric.WithAttributes(successStatusAttribute, operationAttr))

// record latency for success requests only
s.telemetry.mutationLatencyHistogram.Record(r.Context(), time.Since(startTime).Seconds())
s.telemetry.mutationLatencyHistogram.Record(r.Context(), time.Since(startTime).Seconds(), metric.WithAttributes(operationAttr))
}

// the common unmarshal json body method
func (s *Server[Configuration, State]) unmarshalBodyJSON(w http.ResponseWriter, r *http.Request, ctx context.Context, span trace.Span, counter metric.Int64Counter, body any) error {
_, decodeSpan := s.telemetry.Tracer.Start(ctx, "Decode JSON Body")
defer decodeSpan.End()
if err := json.NewDecoder(r.Body).Decode(body); err != nil {
writeJson(w, GetLogger(r.Context()), http.StatusBadRequest, schema.ErrorResponse{
Message: "failed to decode json request body",
Details: map[string]any{
"cause": err.Error(),
},
})

attributes := []attribute.KeyValue{
failureStatusAttribute,
httpStatusAttribute(http.StatusBadRequest),
}
span.SetAttributes(append(attributes, attribute.String("status", "json_decode"))...)
s.telemetry.mutationExplainCounter.Add(r.Context(), 1, metric.WithAttributes(attributes...))
return err
}

return nil
}

func (s *Server[Configuration, State]) buildHandler() *http.ServeMux {
Expand Down
32 changes: 30 additions & 2 deletions connector/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/prometheus/client_golang/prometheus/collectors"
"github.com/rs/zerolog"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
Expand All @@ -28,8 +29,14 @@ import (
traceapi "go.opentelemetry.io/otel/trace"
)

var (
userVisibilityAttribute = traceapi.WithAttributes(attribute.String("internal.visibility", "user"))
successStatusAttribute = attribute.String("status", "success")
failureStatusAttribute = attribute.String("status", "failure")
)

type TelemetryState struct {
Tracer traceapi.Tracer
Tracer *Tracer
Meter metricapi.Meter
Shutdown func(context.Context) error
queryCounter metricapi.Int64Counter
Expand Down Expand Up @@ -182,7 +189,7 @@ func setupOTelSDK(ctx context.Context, serverOptions *ServerOptions, serviceVers
}

state := &TelemetryState{
Tracer: traceProvider.Tracer(serverOptions.ServiceName),
Tracer: &Tracer{traceProvider.Tracer(serverOptions.ServiceName)},
Meter: meterProvider.Meter(serverOptions.ServiceName),
Shutdown: shutdownFunc,
}
Expand Down Expand Up @@ -279,3 +286,24 @@ func setupMetrics(telemetry *TelemetryState, metricsPrefix string) error {

return err
}

// Tracer is the wrapper of traceapi.Tracer with user visibility on Hasura Console
type Tracer struct {
tracer traceapi.Tracer
}

// Start creates a span and a context.Context containing the newly-created span
// with `internal.visibility: "user"` so that it shows up in the Hasura Console.
func (t *Tracer) Start(ctx context.Context, spanName string, opts ...traceapi.SpanStartOption) (context.Context, traceapi.Span) {
return t.tracer.Start(ctx, spanName, append(opts, userVisibilityAttribute)...)
}

// StartInternal creates a span and a context.Context containing the newly-created span.
// It won't show up in the Hasura Console
func (t *Tracer) StartInternal(ctx context.Context, spanName string, opts ...traceapi.SpanStartOption) (context.Context, traceapi.Span) {
return t.tracer.Start(ctx, spanName, opts...)
}

func httpStatusAttribute(code int) attribute.KeyValue {
return attribute.Int("http_status", code)
}
2 changes: 1 addition & 1 deletion connector/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ type serveOptions struct {
func defaultServeOptions() *serveOptions {
return &serveOptions{
logger: log.Level(zerolog.GlobalLevel()),
serviceName: "ndc-go",
serviceName: "hasura-ndc-go",
version: "0.1.0",
withoutConfig: false,
withoutRecovery: false,
Expand Down

0 comments on commit 6367f35

Please sign in to comment.