Skip to content

Commit f033e66

Browse files
authored
improve OpenTelemetry attributes and enable tracing visibility (#10)
1 parent 98c2e9f commit f033e66

File tree

3 files changed

+105
-114
lines changed

3 files changed

+105
-114
lines changed

connector/server.go

Lines changed: 74 additions & 111 deletions
Original file line numberDiff line numberDiff line change
@@ -109,8 +109,8 @@ func (s *Server[Configuration, State]) withAuth(handler http.HandlerFunc) http.H
109109
})
110110

111111
s.telemetry.queryCounter.Add(r.Context(), 1, metric.WithAttributes(
112-
attribute.String("status", "failed"),
113-
attribute.String("reason", "unauthorized"),
112+
failureStatusAttribute,
113+
httpStatusAttribute(http.StatusUnauthorized),
114114
))
115115
return
116116
}
@@ -156,31 +156,14 @@ func (s *Server[Configuration, State]) Query(w http.ResponseWriter, r *http.Requ
156156
ctx, span := s.telemetry.Tracer.Start(r.Context(), "Query", trace.WithSpanKind(trace.SpanKindServer))
157157
defer span.End()
158158

159-
attributes := []attribute.KeyValue{}
160-
_, decodeSpan := s.telemetry.Tracer.Start(ctx, "Decode JSON Body")
161-
defer decodeSpan.End()
162159
var body schema.QueryRequest
163-
if err := json.NewDecoder(r.Body).Decode(&body); err != nil {
164-
writeJson(w, logger, http.StatusBadRequest, schema.ErrorResponse{
165-
Message: "failed to decode json request body",
166-
Details: map[string]any{
167-
"cause": err.Error(),
168-
},
169-
})
170-
171-
attributes := []attribute.KeyValue{
172-
attribute.String("status", "failed"),
173-
attribute.String("reason", "json_decode"),
174-
}
175-
span.SetAttributes(attributes...)
176-
s.telemetry.queryCounter.Add(r.Context(), 1, metric.WithAttributes(attributes...))
160+
if err := s.unmarshalBodyJSON(w, r, ctx, span, s.telemetry.queryCounter, &body); err != nil {
177161
return
178162
}
179-
decodeSpan.End()
180163

181164
collectionAttr := attribute.String("collection", body.Collection)
182-
attributes = append(attributes, collectionAttr)
183-
span.SetAttributes(attributes...)
165+
span.SetAttributes(collectionAttr)
166+
184167
execQueryCtx, execQuerySpan := s.telemetry.Tracer.Start(ctx, "Execute Query")
185168
defer execQuerySpan.End()
186169

@@ -189,22 +172,21 @@ func (s *Server[Configuration, State]) Query(w http.ResponseWriter, r *http.Requ
189172
if err != nil {
190173
status := writeError(w, logger, err)
191174
statusAttributes := []attribute.KeyValue{
192-
attribute.String("status", "failed"),
193-
attribute.String("reason", fmt.Sprintf("%d", status)),
175+
failureStatusAttribute,
176+
httpStatusAttribute(status),
194177
}
195-
span.SetAttributes(statusAttributes...)
196-
s.telemetry.queryCounter.Add(r.Context(), 1, metric.WithAttributes(append(attributes, statusAttributes...)...))
178+
span.SetAttributes(append(statusAttributes, attribute.String("reason", err.Error()))...)
179+
s.telemetry.queryCounter.Add(r.Context(), 1, metric.WithAttributes(append(statusAttributes, collectionAttr)...))
197180
return
198181
}
199182
execQuerySpan.End()
200183

201-
statusAttribute := attribute.String("status", "success")
202-
span.SetAttributes(statusAttribute)
184+
span.SetAttributes(successStatusAttribute)
203185
_, responseSpan := s.telemetry.Tracer.Start(ctx, "Response")
204186
writeJson(w, logger, http.StatusOK, response)
205187
responseSpan.End()
206188

207-
s.telemetry.queryCounter.Add(r.Context(), 1, metric.WithAttributes(append(attributes, statusAttribute)...))
189+
s.telemetry.queryCounter.Add(r.Context(), 1, metric.WithAttributes(collectionAttr, successStatusAttribute))
208190
// record latency for success requests only
209191
s.telemetry.queryLatencyHistogram.Record(r.Context(), time.Since(startTime).Seconds(), metric.WithAttributes(collectionAttr))
210192
}
@@ -216,53 +198,35 @@ func (s *Server[Configuration, State]) QueryExplain(w http.ResponseWriter, r *ht
216198
ctx, span := s.telemetry.Tracer.Start(r.Context(), "Query Explain", trace.WithSpanKind(trace.SpanKindServer))
217199
defer span.End()
218200

219-
attributes := []attribute.KeyValue{}
220-
_, decodeSpan := s.telemetry.Tracer.Start(ctx, "Decode JSON Body")
221-
defer decodeSpan.End()
222201
var body schema.QueryRequest
223-
if err := json.NewDecoder(r.Body).Decode(&body); err != nil {
224-
writeJson(w, logger, http.StatusBadRequest, schema.ErrorResponse{
225-
Message: "failed to decode json request body",
226-
Details: map[string]any{
227-
"cause": err.Error(),
228-
},
229-
})
230-
231-
attributes := []attribute.KeyValue{
232-
attribute.String("status", "failed"),
233-
attribute.String("reason", "json_decode"),
234-
}
235-
span.SetAttributes(attributes...)
236-
s.telemetry.queryExplainCounter.Add(r.Context(), 1, metric.WithAttributes(attributes...))
202+
if err := s.unmarshalBodyJSON(w, r, ctx, span, s.telemetry.queryExplainCounter, &body); err != nil {
237203
return
238204
}
239-
decodeSpan.End()
240205

241206
collectionAttr := attribute.String("collection", body.Collection)
242-
attributes = append(attributes, collectionAttr)
243-
span.SetAttributes(attributes...)
207+
span.SetAttributes(collectionAttr)
208+
244209
execCtx, execSpan := s.telemetry.Tracer.Start(ctx, "Execute Explain")
245210
defer execSpan.End()
246211

247212
response, err := s.connector.QueryExplain(execCtx, s.configuration, s.state, &body)
248213
if err != nil {
249214
status := writeError(w, logger, err)
250215
statusAttributes := []attribute.KeyValue{
251-
attribute.String("status", "failed"),
252-
attribute.String("reason", fmt.Sprintf("%d", status)),
216+
failureStatusAttribute,
217+
httpStatusAttribute(status),
253218
}
254-
span.SetAttributes(attributes...)
255-
s.telemetry.queryExplainCounter.Add(r.Context(), 1, metric.WithAttributes(append(attributes, statusAttributes...)...))
219+
span.SetAttributes(append(statusAttributes, attribute.String("reason", err.Error()))...)
220+
s.telemetry.queryExplainCounter.Add(r.Context(), 1, metric.WithAttributes(append(statusAttributes, collectionAttr)...))
256221
return
257222
}
258223
execSpan.End()
259224

260-
statusAttribute := attribute.String("status", "success")
261-
span.SetAttributes(statusAttribute)
225+
span.SetAttributes(successStatusAttribute)
262226
_, responseSpan := s.telemetry.Tracer.Start(ctx, "Response")
263227
writeJson(w, logger, http.StatusOK, response)
264228
responseSpan.End()
265-
s.telemetry.queryExplainCounter.Add(r.Context(), 1, metric.WithAttributes(append(attributes, statusAttribute)...))
229+
s.telemetry.queryExplainCounter.Add(r.Context(), 1, metric.WithAttributes(successStatusAttribute, collectionAttr))
266230

267231
// record latency for success requests only
268232
s.telemetry.queryExplainLatencyHistogram.Record(r.Context(), time.Since(startTime).Seconds(), metric.WithAttributes(collectionAttr))
@@ -275,60 +239,44 @@ func (s *Server[Configuration, State]) MutationExplain(w http.ResponseWriter, r
275239
ctx, span := s.telemetry.Tracer.Start(r.Context(), "Mutation Explain", trace.WithSpanKind(trace.SpanKindServer))
276240
defer span.End()
277241

278-
attributes := []attribute.KeyValue{}
279-
_, decodeSpan := s.telemetry.Tracer.Start(ctx, "Decode JSON Body")
280-
defer decodeSpan.End()
281242
var body schema.MutationRequest
282-
if err := json.NewDecoder(r.Body).Decode(&body); err != nil {
283-
writeJson(w, logger, http.StatusBadRequest, schema.ErrorResponse{
284-
Message: "failed to decode json request body",
285-
Details: map[string]any{
286-
"cause": err.Error(),
287-
},
288-
})
289-
290-
attributes := []attribute.KeyValue{
291-
attribute.String("status", "failed"),
292-
attribute.String("reason", "json_decode"),
293-
}
294-
span.SetAttributes(attributes...)
295-
s.telemetry.mutationExplainCounter.Add(r.Context(), 1, metric.WithAttributes(attributes...))
243+
if err := s.unmarshalBodyJSON(w, r, ctx, span, s.telemetry.mutationExplainCounter, &body); err != nil {
296244
return
297245
}
298-
decodeSpan.End()
299246

300247
var operationNames []string
301248
for _, op := range body.Operations {
302249
operationNames = append(operationNames, op.Name)
303250
}
304-
collectionAttr := attribute.String("operations", strings.Join(operationNames, ","))
305-
attributes = append(attributes, collectionAttr)
306-
span.SetAttributes(attributes...)
251+
252+
operationAttr := attribute.String("operations", strings.Join(operationNames, ","))
253+
span.SetAttributes(operationAttr)
254+
307255
execCtx, execSpan := s.telemetry.Tracer.Start(ctx, "Execute Explain")
308256
defer execSpan.End()
309257

310258
response, err := s.connector.MutationExplain(execCtx, s.configuration, s.state, &body)
311259
if err != nil {
312260
status := writeError(w, logger, err)
313261
statusAttributes := []attribute.KeyValue{
314-
attribute.String("status", "failed"),
315-
attribute.String("reason", fmt.Sprintf("%d", status)),
262+
failureStatusAttribute,
263+
httpStatusAttribute(status),
316264
}
317-
span.SetAttributes(attributes...)
318-
s.telemetry.mutationExplainCounter.Add(r.Context(), 1, metric.WithAttributes(append(attributes, statusAttributes...)...))
265+
span.SetAttributes(append(statusAttributes, attribute.String("reason", err.Error()))...)
266+
s.telemetry.mutationExplainCounter.Add(r.Context(), 1, metric.WithAttributes(append(statusAttributes, operationAttr)...))
319267
return
320268
}
321269
execSpan.End()
322270

323-
statusAttribute := attribute.String("status", "success")
324-
span.SetAttributes(statusAttribute)
271+
span.SetAttributes(successStatusAttribute)
272+
325273
_, responseSpan := s.telemetry.Tracer.Start(ctx, "Response")
326274
writeJson(w, logger, http.StatusOK, response)
327275
responseSpan.End()
328-
s.telemetry.mutationExplainCounter.Add(r.Context(), 1, metric.WithAttributes(append(attributes, statusAttribute)...))
276+
s.telemetry.mutationExplainCounter.Add(r.Context(), 1, metric.WithAttributes(successStatusAttribute, operationAttr))
329277

330278
// record latency for success requests only
331-
s.telemetry.mutationExplainLatencyHistogram.Record(r.Context(), time.Since(startTime).Seconds(), metric.WithAttributes(collectionAttr))
279+
s.telemetry.mutationExplainLatencyHistogram.Record(r.Context(), time.Since(startTime).Seconds(), metric.WithAttributes(operationAttr))
332280
}
333281

334282
// Mutation implements a handler for the /mutation endpoint, POST method that executes a mutation.
@@ -338,52 +286,67 @@ func (s *Server[Configuration, State]) Mutation(w http.ResponseWriter, r *http.R
338286
ctx, span := s.telemetry.Tracer.Start(r.Context(), "Mutation", trace.WithSpanKind(trace.SpanKindServer))
339287
defer span.End()
340288

341-
_, decodeSpan := s.telemetry.Tracer.Start(ctx, "Decode JSON Body")
342-
defer decodeSpan.End()
343289
var body schema.MutationRequest
344-
if err := json.NewDecoder(r.Body).Decode(&body); err != nil {
345-
writeJson(w, logger, http.StatusBadRequest, schema.ErrorResponse{
346-
Message: "failed to decode json request body",
347-
Details: map[string]any{
348-
"cause": err.Error(),
349-
},
350-
})
351-
352-
attributes := []attribute.KeyValue{
353-
attribute.String("status", "failed"),
354-
attribute.String("reason", "json_decode"),
355-
}
356-
span.SetAttributes(attributes...)
357-
s.telemetry.mutationCounter.Add(r.Context(), 1, metric.WithAttributes(attributes...))
290+
if err := s.unmarshalBodyJSON(w, r, ctx, span, s.telemetry.mutationCounter, &body); err != nil {
358291
return
359292
}
360-
decodeSpan.End()
293+
294+
var operationNames []string
295+
for _, op := range body.Operations {
296+
operationNames = append(operationNames, op.Name)
297+
}
298+
299+
operationAttr := attribute.String("operations", strings.Join(operationNames, ","))
300+
span.SetAttributes(operationAttr)
361301

362302
execCtx, execSpan := s.telemetry.Tracer.Start(ctx, "Execute Mutation")
363303
defer execSpan.End()
364304
response, err := s.connector.Mutation(execCtx, s.configuration, s.state, &body)
365305
if err != nil {
366306
status := writeError(w, logger, err)
367-
attributes := []attribute.KeyValue{
368-
attribute.String("status", "failed"),
369-
attribute.String("reason", fmt.Sprintf("%d", status)),
307+
statusAttributes := []attribute.KeyValue{
308+
failureStatusAttribute,
309+
httpStatusAttribute(status),
370310
}
371-
span.SetAttributes(attributes...)
372-
s.telemetry.mutationCounter.Add(r.Context(), 1, metric.WithAttributes(attributes...))
311+
span.SetAttributes(append(statusAttributes, attribute.String("reason", err.Error()))...)
312+
s.telemetry.mutationCounter.Add(r.Context(), 1, metric.WithAttributes(append(statusAttributes, operationAttr)...))
373313
return
374314
}
375315
execSpan.End()
376316

377-
attributes := attribute.String("status", "success")
378-
span.SetAttributes(attributes)
317+
span.SetAttributes(successStatusAttribute)
379318
_, responseSpan := s.telemetry.Tracer.Start(ctx, "Response")
380319
writeJson(w, logger, http.StatusOK, response)
381320
responseSpan.End()
382321

383-
s.telemetry.mutationCounter.Add(r.Context(), 1, metric.WithAttributes(attributes))
322+
s.telemetry.mutationCounter.Add(r.Context(), 1, metric.WithAttributes(successStatusAttribute, operationAttr))
384323

385324
// record latency for success requests only
386-
s.telemetry.mutationLatencyHistogram.Record(r.Context(), time.Since(startTime).Seconds())
325+
s.telemetry.mutationLatencyHistogram.Record(r.Context(), time.Since(startTime).Seconds(), metric.WithAttributes(operationAttr))
326+
}
327+
328+
// the common unmarshal json body method
329+
func (s *Server[Configuration, State]) unmarshalBodyJSON(w http.ResponseWriter, r *http.Request, ctx context.Context, span trace.Span, counter metric.Int64Counter, body any) error {
330+
_, decodeSpan := s.telemetry.Tracer.Start(ctx, "Decode JSON Body")
331+
defer decodeSpan.End()
332+
if err := json.NewDecoder(r.Body).Decode(body); err != nil {
333+
writeJson(w, GetLogger(r.Context()), http.StatusBadRequest, schema.ErrorResponse{
334+
Message: "failed to decode json request body",
335+
Details: map[string]any{
336+
"cause": err.Error(),
337+
},
338+
})
339+
340+
attributes := []attribute.KeyValue{
341+
failureStatusAttribute,
342+
httpStatusAttribute(http.StatusBadRequest),
343+
}
344+
span.SetAttributes(append(attributes, attribute.String("status", "json_decode"))...)
345+
s.telemetry.mutationExplainCounter.Add(r.Context(), 1, metric.WithAttributes(attributes...))
346+
return err
347+
}
348+
349+
return nil
387350
}
388351

389352
func (s *Server[Configuration, State]) buildHandler() *http.ServeMux {

connector/telemetry.go

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"github.com/prometheus/client_golang/prometheus/collectors"
1414
"github.com/rs/zerolog"
1515
"go.opentelemetry.io/otel"
16+
"go.opentelemetry.io/otel/attribute"
1617
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc"
1718
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp"
1819
"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
@@ -28,8 +29,14 @@ import (
2829
traceapi "go.opentelemetry.io/otel/trace"
2930
)
3031

32+
var (
33+
userVisibilityAttribute = traceapi.WithAttributes(attribute.String("internal.visibility", "user"))
34+
successStatusAttribute = attribute.String("status", "success")
35+
failureStatusAttribute = attribute.String("status", "failure")
36+
)
37+
3138
type TelemetryState struct {
32-
Tracer traceapi.Tracer
39+
Tracer *Tracer
3340
Meter metricapi.Meter
3441
Shutdown func(context.Context) error
3542
queryCounter metricapi.Int64Counter
@@ -182,7 +189,7 @@ func setupOTelSDK(ctx context.Context, serverOptions *ServerOptions, serviceVers
182189
}
183190

184191
state := &TelemetryState{
185-
Tracer: traceProvider.Tracer(serverOptions.ServiceName),
192+
Tracer: &Tracer{traceProvider.Tracer(serverOptions.ServiceName)},
186193
Meter: meterProvider.Meter(serverOptions.ServiceName),
187194
Shutdown: shutdownFunc,
188195
}
@@ -279,3 +286,24 @@ func setupMetrics(telemetry *TelemetryState, metricsPrefix string) error {
279286

280287
return err
281288
}
289+
290+
// Tracer is the wrapper of traceapi.Tracer with user visibility on Hasura Console
291+
type Tracer struct {
292+
tracer traceapi.Tracer
293+
}
294+
295+
// Start creates a span and a context.Context containing the newly-created span
296+
// with `internal.visibility: "user"` so that it shows up in the Hasura Console.
297+
func (t *Tracer) Start(ctx context.Context, spanName string, opts ...traceapi.SpanStartOption) (context.Context, traceapi.Span) {
298+
return t.tracer.Start(ctx, spanName, append(opts, userVisibilityAttribute)...)
299+
}
300+
301+
// StartInternal creates a span and a context.Context containing the newly-created span.
302+
// It won't show up in the Hasura Console
303+
func (t *Tracer) StartInternal(ctx context.Context, spanName string, opts ...traceapi.SpanStartOption) (context.Context, traceapi.Span) {
304+
return t.tracer.Start(ctx, spanName, opts...)
305+
}
306+
307+
func httpStatusAttribute(code int) attribute.KeyValue {
308+
return attribute.Int("http_status", code)
309+
}

connector/types.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ type serveOptions struct {
9090
func defaultServeOptions() *serveOptions {
9191
return &serveOptions{
9292
logger: log.Level(zerolog.GlobalLevel()),
93-
serviceName: "ndc-go",
93+
serviceName: "hasura-ndc-go",
9494
version: "0.1.0",
9595
withoutConfig: false,
9696
withoutRecovery: false,

0 commit comments

Comments
 (0)