diff --git a/docker-compose.yaml b/docker-compose.yaml index d7b04ebac..f07c40a9b 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -85,3 +85,16 @@ services: retries: 10 ports: - "5432:5432" + + jaeger: + image: jaegertracing/all-in-one:latest + environment: + COLLECTOR_OTLP_ENABLED: "true" + ports: + - "16686:16686" # Web UI + - "4317:4317" # OTLP gRPC + - "4318:4318" # OTLP HTTP + - "14250:14250" # Model/collector gRPC + profiles: + - tracing + restart: always \ No newline at end of file diff --git a/docs/Configuring.md b/docs/Configuring.md index 3810e4245..adaae537f 100644 --- a/docs/Configuring.md +++ b/docs/Configuring.md @@ -334,3 +334,27 @@ server: Admins can manage the authorization policy directly in the YAML configuration file. For detailed configuration options, refer to the [Casbin documentation](https://casbin.org/docs/en/syntax-for-models). +## Tracing Configuration + +The tracing configuration controls OpenTelemetry tracing behavior. + +Root level key `trace` + +| Field | Description | Default | Environment Variable | +|-----------------|------------------------------------------------------------| ---------|---------------------| +| `enabled` | Enable OpenTelemetry tracing | `false` | OPENTDF_TRACE_ENABLED | +| `folder` | Directory where trace logs will be stored | `traces` | OPENTDF_TRACE_FOLDER | +| `exportToJaeger`| Export traces to Jaeger instead of local file | `false` | OPENTDF_TRACE_EXPORT_TO_JAEGER | + +Example: +```yaml +trace: + enabled: true + folder: "traces" # Traces will be written to traces/traces.log + exportToJaeger: false # Set to true to export to Jaeger at localhost:4317 +``` + +When enabled, traces are either: +- Written to a local file with automatic log rotation (when `exportToJaeger: false`) +- Exported to a Jaeger instance at localhost:4317 (when `exportToJaeger: true`) + diff --git a/docs/Contributing.md b/docs/Contributing.md index 378146dc7..9afb89971 100644 --- a/docs/Contributing.md +++ b/docs/Contributing.md @@ -25,6 +25,30 @@ For end-users/consumers, see [here](./Consuming.md). Note: support was added to provision a set of fixture data into the database. Run `go run github.com/opentdf/platform/service provision fixtures -h` for more information. +## Running with Tracing + +To enable distributed tracing with Jaeger: + +1. Start the development stack with the tracing profile: + ```bash + docker compose --profile tracing up + ``` + This will start Jaeger alongside the other services. + +2. Configure tracing in your `opentdf.yaml`: + ```yaml + trace: + enabled: true + exportToJaeger: true # This will export traces to Jaeger instead of local files + ``` + +3. Access the Jaeger UI at http://localhost:16686 to view traces. + - Search for traces by service name "opentdf-service" + - View detailed spans and timing information + - Analyze request flows across services + +Note: When `exportToJaeger` is false, traces will be written to local files instead of being sent to Jaeger. + ## Advice for Code Contributors * Make sure to run our linters with `make lint` diff --git a/opentdf-dev.yaml b/opentdf-dev.yaml index f93d7d630..b881e6cd6 100644 --- a/opentdf-dev.yaml +++ b/opentdf-dev.yaml @@ -2,6 +2,10 @@ logger: level: debug type: text output: stdout +trace: + enabled: true + folder: "traces" + exportToJaeger: false # DB and Server configurations are defaulted for local development # db: # host: localhost diff --git a/service/authorization/authorization.go b/service/authorization/authorization.go index 0df0cc492..5d44d5de7 100644 --- a/service/authorization/authorization.go +++ b/service/authorization/authorization.go @@ -29,6 +29,9 @@ import ( "github.com/opentdf/platform/service/pkg/db" "github.com/opentdf/platform/service/pkg/serviceregistry" "github.com/opentdf/platform/service/policies" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/propagation" + "go.opentelemetry.io/otel/trace" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) @@ -42,6 +45,7 @@ type AuthorizationService struct { //nolint:revive // AuthorizationService is a config Config logger *logger.Logger eval rego.PreparedEvalQuery + trace.Tracer } type Config struct { @@ -137,6 +141,7 @@ func NewRegistration() *serviceregistry.Service[authorizationconnect.Authorizati } as.config = *authZCfg + as.Tracer = srp.Tracer return as, nil }, @@ -151,7 +156,15 @@ func (as AuthorizationService) IsReady(ctx context.Context) error { } func (as *AuthorizationService) GetDecisionsByToken(ctx context.Context, req *connect.Request[authorization.GetDecisionsByTokenRequest]) (*connect.Response[authorization.GetDecisionsByTokenResponse], error) { + // Extract trace context from the incoming request + propagator := otel.GetTextMapPropagator() + ctx = propagator.Extract(ctx, propagation.HeaderCarrier(req.Header())) + + ctx, span := as.Tracer.Start(ctx, "GetDecisionsByToken") + defer span.End() + decisionsRequests := []*authorization.DecisionRequest{} + // for each token decision request for _, tdr := range req.Msg.GetDecisionRequests() { ecResp, err := as.sdk.EntityResoution.CreateEntityChainFromJwt(ctx, &entityresolution.CreateEntityChainFromJwtRequest{Tokens: tdr.GetTokens()}) @@ -184,6 +197,9 @@ func (as *AuthorizationService) GetDecisionsByToken(ctx context.Context, req *co func (as *AuthorizationService) GetDecisions(ctx context.Context, req *connect.Request[authorization.GetDecisionsRequest]) (*connect.Response[authorization.GetDecisionsResponse], error) { as.logger.DebugContext(ctx, "getting decisions") + ctx, span := as.Tracer.Start(ctx, "GetDecisions") + defer span.End() + // Temporary canned echo response with permit decision for all requested decision/entity/ra combos rsp := &authorization.GetDecisionsResponse{ DecisionResponses: make([]*authorization.DecisionResponse, 0), @@ -494,6 +510,9 @@ func makeScopeMap(scope *authorization.ResourceAttribute) map[string]bool { func (as *AuthorizationService) GetEntitlements(ctx context.Context, req *connect.Request[authorization.GetEntitlementsRequest]) (*connect.Response[authorization.GetEntitlementsResponse], error) { as.logger.DebugContext(ctx, "getting entitlements") + ctx, span := as.Tracer.Start(ctx, "GetEntitlements") + defer span.End() + var nextOffset int32 attrsList := make([]*policy.Attribute, 0) subjectMappingsList := make([]*policy.SubjectMapping, 0) diff --git a/service/authorization/authorization_test.go b/service/authorization/authorization_test.go index 34ce530fd..1d66fe88d 100644 --- a/service/authorization/authorization_test.go +++ b/service/authorization/authorization_test.go @@ -8,6 +8,8 @@ import ( "strings" "testing" + "go.opentelemetry.io/otel/trace/noop" + "connectrpc.com/connect" "github.com/open-policy-agent/opa/rego" "github.com/opentdf/platform/protocol/go/authorization" @@ -282,11 +284,14 @@ func Test_GetDecisionsAllOf_Pass(t *testing.T) { } as := AuthorizationService{ - logger: logger, sdk: &otdf.SDK{ - SubjectMapping: &mySubjectMappingClient{}, - Attributes: &myAttributesClient{}, EntityResoution: &myERSClient{}, + logger: logger, + sdk: &otdf.SDK{ + SubjectMapping: &mySubjectMappingClient{}, + Attributes: &myAttributesClient{}, + EntityResoution: &myERSClient{}, }, - eval: prepared, + eval: prepared, + Tracer: noop.NewTracerProvider().Tracer(""), } resp, err := as.GetDecisions(ctxb, &req) @@ -451,7 +456,8 @@ func Test_GetDecisions_AllOf_Fail(t *testing.T) { SubjectMapping: &mySubjectMappingClient{}, Attributes: &myAttributesClient{}, EntityResoution: &myERSClient{}, }, - eval: prepared, + eval: prepared, + Tracer: noop.NewTracerProvider().Tracer(""), } resp, err := as.GetDecisions(ctxb, &req) @@ -550,7 +556,8 @@ func Test_GetDecisionsAllOfWithEnvironmental_Pass(t *testing.T) { SubjectMapping: &mySubjectMappingClient{}, Attributes: &myAttributesClient{}, EntityResoution: &myERSClient{}, }, - eval: prepared, + eval: prepared, + Tracer: noop.NewTracerProvider().Tracer(""), } resp, err := as.GetDecisions(ctxb, &req) @@ -646,7 +653,8 @@ func Test_GetDecisionsAllOfWithEnvironmental_Fail(t *testing.T) { SubjectMapping: &mySubjectMappingClient{}, Attributes: &myAttributesClient{}, EntityResoution: &myERSClient{}, }, - eval: prepared, + eval: prepared, + Tracer: noop.NewTracerProvider().Tracer(""), } resp, err := as.GetDecisions(ctxb, &req) @@ -720,7 +728,8 @@ func Test_GetEntitlementsSimple(t *testing.T) { SubjectMapping: &mySubjectMappingClient{}, Attributes: &myAttributesClient{}, EntityResoution: &myERSClient{}, }, - eval: prepared, + eval: prepared, + Tracer: noop.NewTracerProvider().Tracer(""), } req := connect.Request[authorization.GetEntitlementsRequest]{ @@ -793,7 +802,8 @@ func Test_GetEntitlementsFqnCasing(t *testing.T) { SubjectMapping: &mySubjectMappingClient{}, Attributes: &myAttributesClient{}, EntityResoution: &myERSClient{}, }, - eval: prepared, + eval: prepared, + Tracer: noop.NewTracerProvider().Tracer(""), } req := connect.Request[authorization.GetEntitlementsRequest]{ @@ -872,7 +882,8 @@ func Test_GetEntitlements_HandlesPagination(t *testing.T) { Attributes: &paginatedMockAttributesClient{}, EntityResoution: &myERSClient{}, }, - eval: prepared, + eval: prepared, + Tracer: noop.NewTracerProvider().Tracer(""), } req := connect.Request[authorization.GetEntitlementsRequest]{ @@ -963,7 +974,8 @@ func Test_GetEntitlementsWithComprehensiveHierarchy(t *testing.T) { SubjectMapping: &mySubjectMappingClient{}, Attributes: &myAttributesClient{}, EntityResoution: &myERSClient{}, }, - eval: prepared, + eval: prepared, + Tracer: noop.NewTracerProvider().Tracer(""), } withHierarchy := true @@ -1204,7 +1216,8 @@ func Test_GetDecisions_RA_FQN_Edge_Cases(t *testing.T) { SubjectMapping: &mySubjectMappingClient{}, Attributes: &myAttributesClient{}, EntityResoution: &myERSClient{}, }, - eval: prepared, + eval: prepared, + Tracer: noop.NewTracerProvider().Tracer(""), } ///////////// TEST1: Only empty string ///////////// @@ -1411,7 +1424,8 @@ func Test_GetDecisionsAllOf_Pass_EC_RA_Length_Mismatch(t *testing.T) { SubjectMapping: &mySubjectMappingClient{}, Attributes: &myAttributesClient{}, EntityResoution: &myERSClient{}, }, - eval: prepared, + eval: prepared, + Tracer: noop.NewTracerProvider().Tracer(""), } resp, err := as.GetDecisions(ctxb, &req) @@ -1689,7 +1703,8 @@ func Test_GetDecisions_Empty_EC_RA(t *testing.T) { SubjectMapping: &mySubjectMappingClient{}, Attributes: &myAttributesClient{}, EntityResoution: &myERSClient{}, }, - eval: prepared, + eval: prepared, + Tracer: noop.NewTracerProvider().Tracer(""), } ///////////// Test Cases ///////////////////// diff --git a/service/cmd/migrate.go b/service/cmd/migrate.go index e336fa2e6..a7a7e2bef 100644 --- a/service/cmd/migrate.go +++ b/service/cmd/migrate.go @@ -125,7 +125,7 @@ func migrateDBClient(cmd *cobra.Command, opts ...db.OptsFunc) (*db.Client, error if err != nil { panic(fmt.Errorf("could not load config: %w", err)) } - return db.New(context.Background(), conf.DB, conf.Logger, opts...) + return db.New(context.Background(), conf.DB, conf.Logger, nil, opts...) } func init() { diff --git a/service/cmd/policy.go b/service/cmd/policy.go index 58a7daa22..5c7bc0895 100644 --- a/service/cmd/policy.go +++ b/service/cmd/policy.go @@ -76,7 +76,7 @@ func policyDBClient(conf *config.Config) (policydb.PolicyDBClient, error) { if !strings.HasSuffix(conf.DB.Schema, "_policy") { conf.DB.Schema += "_policy" } - dbClient, err := db.New(context.Background(), conf.DB, conf.Logger, db.WithMigrations(policy.Migrations)) + dbClient, err := db.New(context.Background(), conf.DB, conf.Logger, nil, db.WithMigrations(policy.Migrations)) if err != nil { //nolint:wrapcheck // we want to return the error as is. the start command will wrap it return policydb.PolicyDBClient{}, err diff --git a/service/cmd/provisionFixtures.go b/service/cmd/provisionFixtures.go index 38542ebd4..a412fe8f8 100644 --- a/service/cmd/provisionFixtures.go +++ b/service/cmd/provisionFixtures.go @@ -43,7 +43,7 @@ You can clear/recycle your database with 'docker compose down' and 'docker compo panic(fmt.Errorf("could not load config: %w", err)) } - dbClient, err := db.New(context.Background(), cfg.DB, cfg.Logger) + dbClient, err := db.New(context.Background(), cfg.DB, cfg.Logger, nil) if err != nil { panic(fmt.Errorf("issue creating database client: %w", err)) } diff --git a/service/entityresolution/claims/claims_entity_resolution.go b/service/entityresolution/claims/claims_entity_resolution.go index 090d2b676..75f8059e2 100644 --- a/service/entityresolution/claims/claims_entity_resolution.go +++ b/service/entityresolution/claims/claims_entity_resolution.go @@ -12,6 +12,7 @@ import ( auth "github.com/opentdf/platform/service/authorization" "github.com/opentdf/platform/service/logger" "github.com/opentdf/platform/service/pkg/serviceregistry" + "go.opentelemetry.io/otel/trace" "google.golang.org/protobuf/encoding/protojson" "google.golang.org/protobuf/types/known/anypb" "google.golang.org/protobuf/types/known/structpb" @@ -20,6 +21,7 @@ import ( type ClaimsEntityResolutionService struct { entityresolution.UnimplementedEntityResolutionServiceServer logger *logger.Logger + trace.Tracer } func RegisterClaimsERS(_ serviceregistry.ServiceConfig, logger *logger.Logger) (ClaimsEntityResolutionService, serviceregistry.HandlerServer) { @@ -33,6 +35,9 @@ func (s ClaimsEntityResolutionService) ResolveEntities(ctx context.Context, req } func (s ClaimsEntityResolutionService) CreateEntityChainFromJwt(ctx context.Context, req *connect.Request[entityresolution.CreateEntityChainFromJwtRequest]) (*connect.Response[entityresolution.CreateEntityChainFromJwtResponse], error) { + ctx, span := s.Tracer.Start(ctx, "CreateEntityChainFromJwt") + defer span.End() + resp, err := CreateEntityChainFromJwt(ctx, req.Msg, s.logger) return connect.NewResponse(&resp), err } diff --git a/service/entityresolution/entityresolution.go b/service/entityresolution/entityresolution.go index cd09205c0..e8271149f 100644 --- a/service/entityresolution/entityresolution.go +++ b/service/entityresolution/entityresolution.go @@ -7,6 +7,7 @@ import ( claims "github.com/opentdf/platform/service/entityresolution/claims" keycloak "github.com/opentdf/platform/service/entityresolution/keycloak" "github.com/opentdf/platform/service/pkg/serviceregistry" + "go.opentelemetry.io/otel/trace" ) type ERSConfig struct { @@ -20,6 +21,7 @@ const ( type EntityResolution struct { entityresolutionconnect.EntityResolutionServiceHandler + trace.Tracer } func NewRegistration() *serviceregistry.Service[entityresolutionconnect.EntityResolutionServiceHandler] { @@ -37,12 +39,15 @@ func NewRegistration() *serviceregistry.Service[entityresolutionconnect.EntityRe } if inputConfig.Mode == ClaimsMode { claimsSVC, claimsHandler := claims.RegisterClaimsERS(srp.Config, srp.Logger) + claimsSVC.Tracer = srp.Tracer return EntityResolution{EntityResolutionServiceHandler: claimsSVC}, claimsHandler } // Default to keycloak ERS kcSVC, kcHandler := keycloak.RegisterKeycloakERS(srp.Config, srp.Logger) - return EntityResolution{EntityResolutionServiceHandler: kcSVC}, kcHandler + kcSVC.Tracer = srp.Tracer + + return EntityResolution{EntityResolutionServiceHandler: kcSVC, Tracer: srp.Tracer}, kcHandler }, }, } diff --git a/service/entityresolution/keycloak/keycloak_entity_resolution.go b/service/entityresolution/keycloak/keycloak_entity_resolution.go index f991ef2ed..5f680b84b 100644 --- a/service/entityresolution/keycloak/keycloak_entity_resolution.go +++ b/service/entityresolution/keycloak/keycloak_entity_resolution.go @@ -17,6 +17,7 @@ import ( auth "github.com/opentdf/platform/service/authorization" "github.com/opentdf/platform/service/logger" "github.com/opentdf/platform/service/pkg/serviceregistry" + "go.opentelemetry.io/otel/trace" "google.golang.org/grpc/codes" "google.golang.org/protobuf/encoding/protojson" "google.golang.org/protobuf/types/known/structpb" @@ -39,6 +40,7 @@ type KeycloakEntityResolutionService struct { entityresolution.UnimplementedEntityResolutionServiceServer idpConfig KeycloakConfig logger *logger.Logger + trace.Tracer } type KeycloakConfig struct { @@ -62,11 +64,17 @@ func RegisterKeycloakERS(config serviceregistry.ServiceConfig, logger *logger.Lo } func (s KeycloakEntityResolutionService) ResolveEntities(ctx context.Context, req *connect.Request[entityresolution.ResolveEntitiesRequest]) (*connect.Response[entityresolution.ResolveEntitiesResponse], error) { + ctx, span := s.Tracer.Start(ctx, "ResolveEntities") + defer span.End() + resp, err := EntityResolution(ctx, req.Msg, s.idpConfig, s.logger) return connect.NewResponse(&resp), err } func (s KeycloakEntityResolutionService) CreateEntityChainFromJwt(ctx context.Context, req *connect.Request[entityresolution.CreateEntityChainFromJwtRequest]) (*connect.Response[entityresolution.CreateEntityChainFromJwtResponse], error) { + ctx, span := s.Tracer.Start(ctx, "CreateEntityChainFromJwt") + defer span.End() + resp, err := CreateEntityChainFromJwt(ctx, req.Msg, s.idpConfig, s.logger) return connect.NewResponse(&resp), err } diff --git a/service/go.mod b/service/go.mod index b1e0a819a..703177b03 100644 --- a/service/go.mod +++ b/service/go.mod @@ -33,6 +33,8 @@ require ( github.com/stretchr/testify v1.9.0 github.com/testcontainers/testcontainers-go v0.34.0 go.opentelemetry.io/otel v1.31.0 + go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.31.0 + go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.31.0 go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.31.0 go.opentelemetry.io/otel/sdk v1.31.0 go.opentelemetry.io/otel/trace v1.31.0 @@ -74,9 +76,8 @@ require ( github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/xeipuuv/gojsonschema v1.2.0 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.53.0 // indirect - go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.31.0 // indirect - go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.31.0 // indirect go.opentelemetry.io/otel/metric v1.31.0 // indirect + go.opentelemetry.io/proto/otlp v1.3.1 // indirect golang.org/x/exp v0.0.0-20240222234643-814bf88cf225 // indirect golang.org/x/oauth2 v0.22.0 // indirect ) diff --git a/service/go.sum b/service/go.sum index 2f6fa55ce..78570a84e 100644 --- a/service/go.sum +++ b/service/go.sum @@ -421,6 +421,8 @@ go.opentelemetry.io/otel/trace v1.31.0 h1:ffjsj1aRouKewfr85U2aGagJ46+MvodynlQ1HY go.opentelemetry.io/otel/trace v1.31.0/go.mod h1:TXZkRk7SM2ZQLtR6eoAWQFIHPvzQ06FJAsO1tJg480A= go.opentelemetry.io/proto/otlp v1.3.1 h1:TrMUixzpM0yuc/znrFTP9MMRh8trP93mkCiDVeXrui0= go.opentelemetry.io/proto/otlp v1.3.1/go.mod h1:0X1WI4de4ZsLrrJNLAQbFeLCm3T7yBkR0XqQ7niQU+8= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= diff --git a/service/internal/fixtures/db.go b/service/internal/fixtures/db.go index dc70ce6c2..a20807718 100644 --- a/service/internal/fixtures/db.go +++ b/service/internal/fixtures/db.go @@ -8,9 +8,10 @@ import ( "github.com/opentdf/platform/service/internal/config" "github.com/opentdf/platform/service/logger" - "github.com/opentdf/platform/service/pkg/db" policydb "github.com/opentdf/platform/service/policy/db" + "github.com/opentdf/platform/service/tracing" + "go.opentelemetry.io/otel" ) var ( @@ -31,7 +32,9 @@ func NewDBInterface(cfg config.Config) DBInterface { config := cfg.DB config.Schema = cfg.DB.Schema logCfg := cfg.Logger - c, err := db.New(context.Background(), config, logCfg) + tracer := otel.Tracer(tracing.ServiceName) + + c, err := db.New(context.Background(), config, logCfg, &tracer) if err != nil { slog.Error("issue creating database client", slog.String("error", err.Error())) panic(err) diff --git a/service/kas/access/accessPdp.go b/service/kas/access/accessPdp.go index 8bdfb87e7..6e2ab0406 100644 --- a/service/kas/access/accessPdp.go +++ b/service/kas/access/accessPdp.go @@ -7,6 +7,7 @@ import ( "github.com/opentdf/platform/protocol/go/authorization" "github.com/opentdf/platform/protocol/go/policy" + "github.com/opentdf/platform/service/tracing" ) const ( @@ -43,11 +44,14 @@ func (p *Provider) canAccess(ctx context.Context, token *authorization.Token, po } } - dr, err := p.checkAttributes(ctx, rasList, token) + ctx, span := p.Tracer.Start(ctx, "checkAttributes") + defer span.End() + dr, err := p.checkAttributes(ctx, rasList, token) if err != nil { return nil, err } + for _, resp := range dr.GetDecisionResponses() { policy, ok := idPolicyMap[resp.GetResourceAttributesId()] if !ok { // this really should not happen @@ -71,6 +75,8 @@ func (p *Provider) checkAttributes(ctx context.Context, ras []*authorization.Res }, }, } + + ctx = tracing.InjectTraceContext(ctx) dr, err := p.SDK.Authorization.GetDecisionsByToken(ctx, &in) if err != nil { p.Logger.ErrorContext(ctx, "Error received from GetDecisionsByToken", "err", err) diff --git a/service/kas/access/publicKey.go b/service/kas/access/publicKey.go index 2b2623d4b..d16764069 100644 --- a/service/kas/access/publicKey.go +++ b/service/kas/access/publicKey.go @@ -11,7 +11,6 @@ import ( "connectrpc.com/connect" kaspb "github.com/opentdf/platform/protocol/go/kas" "github.com/opentdf/platform/service/internal/security" - "go.opentelemetry.io/otel/trace" wrapperspb "google.golang.org/protobuf/types/known/wrapperspb" ) @@ -72,11 +71,8 @@ func (p Provider) LegacyPublicKey(ctx context.Context, req *connect.Request[kasp } func (p Provider) PublicKey(ctx context.Context, req *connect.Request[kaspb.PublicKeyRequest]) (*connect.Response[kaspb.PublicKeyResponse], error) { - if p.Tracer != nil { - var span trace.Span - ctx, span = p.Tracer.Start(ctx, "publickey") - defer span.End() - } + ctx, span := p.Tracer.Start(ctx, "PublicKey") + defer span.End() algorithm := req.Msg.GetAlgorithm() if algorithm == "" { diff --git a/service/kas/access/publicKey_test.go b/service/kas/access/publicKey_test.go index 047d6028b..eacc2db1d 100644 --- a/service/kas/access/publicKey_test.go +++ b/service/kas/access/publicKey_test.go @@ -19,6 +19,7 @@ import ( "github.com/opentdf/platform/service/logger" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/trace/noop" ) func TestExportRsaPublicKeyAsPemStrSuccess(t *testing.T) { @@ -98,6 +99,7 @@ func TestStandardCertificateHandlerEmpty(t *testing.T) { URI: *kasURI, CryptoProvider: c, Logger: logger.CreateTestLogger(), + Tracer: noop.NewTracerProvider().Tracer(""), } result, err := kas.PublicKey(context.Background(), &connect.Request[kaspb.PublicKeyRequest]{Msg: &kaspb.PublicKeyRequest{Fmt: "pkcs8"}}) @@ -154,6 +156,7 @@ func TestStandardPublicKeyHandlerV2(t *testing.T) { }, }, }, + Tracer: noop.NewTracerProvider().Tracer(""), } result, err := kas.PublicKey(context.Background(), &connect.Request[kaspb.PublicKeyRequest]{Msg: &kaspb.PublicKeyRequest{}}) @@ -182,6 +185,7 @@ func TestStandardPublicKeyHandlerV2Failure(t *testing.T) { URI: *kasURI, CryptoProvider: c, Logger: logger.CreateTestLogger(), + Tracer: noop.NewTracerProvider().Tracer(""), } k, err := kas.PublicKey(context.Background(), &connect.Request[kaspb.PublicKeyRequest]{Msg: &kaspb.PublicKeyRequest{}}) @@ -208,6 +212,7 @@ func TestStandardPublicKeyHandlerV2NotFound(t *testing.T) { URI: *kasURI, CryptoProvider: c, Logger: logger.CreateTestLogger(), + Tracer: noop.NewTracerProvider().Tracer(""), } k, err := kas.PublicKey(context.Background(), &connect.Request[kaspb.PublicKeyRequest]{ @@ -248,6 +253,7 @@ func TestStandardPublicKeyHandlerV2WithJwk(t *testing.T) { }, }, }, + Tracer: noop.NewTracerProvider().Tracer(""), Logger: logger.CreateTestLogger(), } @@ -283,6 +289,7 @@ func TestStandardCertificateHandlerWithEc256(t *testing.T) { kas := Provider{ URI: *kasURI, CryptoProvider: c, + Tracer: noop.NewTracerProvider().Tracer(""), KASConfig: kasCfg, Logger: logger.CreateTestLogger(), } diff --git a/service/kas/access/rewrap.go b/service/kas/access/rewrap.go index 7b7fd2e9d..209a9a27a 100644 --- a/service/kas/access/rewrap.go +++ b/service/kas/access/rewrap.go @@ -25,17 +25,16 @@ import ( "github.com/lestrrat-go/jwx/v2/jwt" "github.com/opentdf/platform/lib/ocrypto" "github.com/opentdf/platform/protocol/go/authorization" - "go.opentelemetry.io/otel/trace" - "google.golang.org/protobuf/encoding/protojson" - kaspb "github.com/opentdf/platform/protocol/go/kas" "github.com/opentdf/platform/sdk" "github.com/opentdf/platform/service/internal/security" "github.com/opentdf/platform/service/logger" "github.com/opentdf/platform/service/logger/audit" ctxAuth "github.com/opentdf/platform/service/pkg/auth" + "go.opentelemetry.io/otel/trace" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + "google.golang.org/protobuf/encoding/protojson" ) const ( @@ -437,6 +436,9 @@ func (p *Provider) Rewrap(ctx context.Context, req *connect.Request[kaspb.Rewrap } func (p *Provider) verifyRewrapRequests(ctx context.Context, req *kaspb.UnsignedRewrapRequest_WithPolicyRequest) (*Policy, map[string]kaoResult, error) { + ctx, span := p.Tracer.Start(ctx, "tdf3Rewrap") + defer span.End() + results := make(map[string]kaoResult) anyValidKAOs := false p.Logger.DebugContext(ctx, "extracting policy", "requestBody.policy", req.GetPolicy()) @@ -676,12 +678,10 @@ func (p *Provider) tdf3Rewrap(ctx context.Context, requests []*kaspb.UnsignedRew } func (p *Provider) nanoTDFRewrap(ctx context.Context, requests []*kaspb.UnsignedRewrapRequest_WithPolicyRequest, clientPublicKey string, entity *entityInfo) (string, policyKAOResults) { + ctx, span := p.Tracer.Start(ctx, "nanoTDFRewrap") + defer span.End() + results := make(policyKAOResults) - if p.Tracer != nil { - var span trace.Span - ctx, span = p.Tracer.Start(ctx, "rewrap-nanotdf") - defer span.End() - } var policies []*Policy policyReqs := make(map[*Policy]*kaspb.UnsignedRewrapRequest_WithPolicyRequest) diff --git a/service/pkg/db/db.go b/service/pkg/db/db.go index 6dc4e0d64..37fc02e0e 100644 --- a/service/pkg/db/db.go +++ b/service/pkg/db/db.go @@ -15,6 +15,7 @@ import ( "github.com/jackc/pgx/v5/pgxpool" "github.com/jackc/pgx/v5/stdlib" "github.com/opentdf/platform/service/logger" + "go.opentelemetry.io/otel/trace" ) type Table struct { @@ -112,14 +113,14 @@ type Client struct { ranMigrations bool // This is the stdlib connection that is used for transactions SQLDB *sql.DB + trace.Tracer } /* Connections and pools seems to be pulled in from env vars We should be able to tell the platform how to run */ - -func New(ctx context.Context, config Config, logCfg logger.Config, o ...OptsFunc) (*Client, error) { +func New(ctx context.Context, config Config, logCfg logger.Config, tracer *trace.Tracer, o ...OptsFunc) (*Client, error) { for _, f := range o { config = f(config) } @@ -128,6 +129,10 @@ func New(ctx context.Context, config Config, logCfg logger.Config, o ...OptsFunc config: config, } + if tracer != nil { + c.Tracer = *tracer + } + l, err := logger.NewLogger(logger.Config{ Output: logCfg.Output, Type: logCfg.Type, diff --git a/service/pkg/server/services.go b/service/pkg/server/services.go index bd7df9994..fffe6bc05 100644 --- a/service/pkg/server/services.go +++ b/service/pkg/server/services.go @@ -151,17 +151,14 @@ func startServices(ctx context.Context, cfg config.Config, otdf *server.OpenTDFS } var svcDBClient *db.Client - var tracer trace.Tracer - if cfg.Trace.Enabled { - tracer = otel.Tracer(tracing.ServiceName) - } + tracer := otel.Tracer(tracing.ServiceName) for _, svc := range namespace.Services { // Get new db client if it is required and not already created if svc.IsDBRequired() && svcDBClient == nil { logger.Debug("creating database client", slog.String("namespace", ns)) var err error - svcDBClient, err = newServiceDBClient(ctx, cfg.Logger, cfg.DB, ns, svc.DBMigrations()) + svcDBClient, err = newServiceDBClient(ctx, cfg.Logger, cfg.DB, tracer, ns, svc.DBMigrations()) if err != nil { return err } @@ -241,10 +238,10 @@ func extractServiceLoggerConfig(cfg serviceregistry.ServiceConfig) (string, erro // newServiceDBClient creates a new database client for the specified namespace. // It initializes the client with the provided context, logger configuration, database configuration, // namespace, and migrations. It returns the created client and any error encountered during creation. -func newServiceDBClient(ctx context.Context, logCfg logging.Config, dbCfg db.Config, ns string, migrations *embed.FS) (*db.Client, error) { +func newServiceDBClient(ctx context.Context, logCfg logging.Config, dbCfg db.Config, trace trace.Tracer, ns string, migrations *embed.FS) (*db.Client, error) { var err error - client, err := db.New(ctx, dbCfg, logCfg, + client, err := db.New(ctx, dbCfg, logCfg, &trace, db.WithService(ns), db.WithMigrations(migrations), ) diff --git a/service/pkg/server/start.go b/service/pkg/server/start.go index 9247ea49c..5056bbdbd 100644 --- a/service/pkg/server/start.go +++ b/service/pkg/server/start.go @@ -66,12 +66,13 @@ func Start(f ...StartOptions) error { // Set default for places we can't pass the logger slog.SetDefault(logger.Logger) - if cfg.Trace.Enabled { - // Initialize tracer - logger.Debug("configuring otel tracer") - shutdown := tracing.InitTracer(cfg.Trace) - defer shutdown() + // Initialize tracer + logger.Debug("configuring otel tracer") + shutdown, err := tracing.InitTracer(ctx, cfg.Trace) + if err != nil { + return fmt.Errorf("could not initialize tracer: %w", err) } + defer shutdown() logger.Info("starting opentdf services") diff --git a/service/policy/attributes/attributes.go b/service/policy/attributes/attributes.go index 7e5e65ab8..828c487f8 100644 --- a/service/policy/attributes/attributes.go +++ b/service/policy/attributes/attributes.go @@ -15,12 +15,14 @@ import ( "github.com/opentdf/platform/service/pkg/serviceregistry" policyconfig "github.com/opentdf/platform/service/policy/config" policydb "github.com/opentdf/platform/service/policy/db" + "go.opentelemetry.io/otel/trace" ) type AttributesService struct { //nolint:revive // AttributesService is a valid name for this struct dbClient policydb.PolicyDBClient logger *logger.Logger config *policyconfig.Config + trace.Tracer } func NewRegistration(ns string, dbRegister serviceregistry.DBRegister) *serviceregistry.Service[attributesconnect.AttributesServiceHandler] { @@ -37,6 +39,7 @@ func NewRegistration(ns string, dbRegister serviceregistry.DBRegister) *servicer dbClient: policydb.NewClient(srp.DBClient, srp.Logger, int32(cfg.ListRequestLimitMax), int32(cfg.ListRequestLimitDefault)), logger: srp.Logger, config: cfg, + Tracer: srp.Tracer, }, nil }, }, @@ -80,6 +83,9 @@ func (s AttributesService) CreateAttribute(ctx context.Context, func (s *AttributesService) ListAttributes(ctx context.Context, req *connect.Request[attributes.ListAttributesRequest], ) (*connect.Response[attributes.ListAttributesResponse], error) { + ctx, span := s.Tracer.Start(ctx, "ListAttributes") + defer span.End() + state := req.Msg.GetState().String() s.logger.Debug("listing attribute definitions", slog.String("state", state)) @@ -94,6 +100,9 @@ func (s *AttributesService) ListAttributes(ctx context.Context, func (s *AttributesService) GetAttribute(ctx context.Context, req *connect.Request[attributes.GetAttributeRequest], ) (*connect.Response[attributes.GetAttributeResponse], error) { + ctx, span := s.Tracer.Start(ctx, "GetAttribute") + defer span.End() + rsp := &attributes.GetAttributeResponse{} var identifier any @@ -116,6 +125,9 @@ func (s *AttributesService) GetAttribute(ctx context.Context, func (s *AttributesService) GetAttributeValuesByFqns(ctx context.Context, req *connect.Request[attributes.GetAttributeValuesByFqnsRequest], ) (*connect.Response[attributes.GetAttributeValuesByFqnsResponse], error) { + ctx, span := s.Tracer.Start(ctx, "GetAttributeValuesByFqns") + defer span.End() + rsp := &attributes.GetAttributeValuesByFqnsResponse{} fqnsToAttributes, err := s.dbClient.GetAttributesByValueFqns(ctx, req.Msg) diff --git a/service/policy/db/attribute_fqn.go b/service/policy/db/attribute_fqn.go index 3374817fa..e3bf3edcd 100644 --- a/service/policy/db/attribute_fqn.go +++ b/service/policy/db/attribute_fqn.go @@ -74,6 +74,9 @@ func (c *PolicyDBClient) AttrFqnReindex(ctx context.Context) (res struct { //nol func (c *PolicyDBClient) GetAttributesByValueFqns(ctx context.Context, r *attributes.GetAttributeValuesByFqnsRequest) (map[string]*attributes.GetAttributeValuesByFqnsResponse_AttributeAndValue, error) { fqns := r.GetFqns() + ctx, span := c.Tracer.Start(ctx, "DB:GetAttributesByValueFqns") + defer span.End() + list := make(map[string]*attributes.GetAttributeValuesByFqnsResponse_AttributeAndValue, len(fqns)) for i, fqn := range fqns { diff --git a/service/tracing/otel.go b/service/tracing/otel.go index 40b8c27b1..c9921b287 100644 --- a/service/tracing/otel.go +++ b/service/tracing/otel.go @@ -2,19 +2,26 @@ package tracing import ( "context" + "fmt" "log" "os" + "strings" "sync" "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" "go.opentelemetry.io/otel/exporters/stdout/stdouttrace" + "go.opentelemetry.io/otel/propagation" "go.opentelemetry.io/otel/sdk/resource" sdktrace "go.opentelemetry.io/otel/sdk/trace" semconv "go.opentelemetry.io/otel/semconv/v1.4.0" + "go.opentelemetry.io/otel/trace/noop" + "google.golang.org/grpc/metadata" "gopkg.in/natefinch/lumberjack.v2" ) -const ServiceName = "github.com/opentdf/platform/service" +const ServiceName = "opentdf-service" // Create a thread-safe writer wrapper type syncWriter struct { @@ -22,20 +29,24 @@ type syncWriter struct { writer *lumberjack.Logger } -type Config struct { - Enabled bool `json:"enabled"` - Folder string `json:"folder"` -} - func (w *syncWriter) Write(p []byte) (int, error) { w.mu.Lock() defer w.mu.Unlock() return w.writer.Write(p) } -func InitTracer(cfg Config) func() { +type Config struct { + Enabled bool `json:"enabled"` + Folder string `json:"folder"` + ExportToJaeger bool `yaml:"exportToJaeger"` +} + +func InitTracer(ctx context.Context, cfg Config) (func(), error) { if !cfg.Enabled { - return func() {} + tp := noop.NewTracerProvider() + otel.SetTracerProvider(tp) + otel.SetTextMapPropagator(propagation.TraceContext{}) + return func() {}, nil } // Create a directory for the traces @@ -44,7 +55,7 @@ func InitTracer(cfg Config) func() { td = "traces" } if err := os.MkdirAll(td, os.ModePerm); err != nil { - log.Fatal(err) + return nil, fmt.Errorf("failed to create traces directory: %w", err) } lumberjackLogger := &lumberjack.Logger{ @@ -55,19 +66,27 @@ func InitTracer(cfg Config) func() { Compress: true, // compress the rotated files } - // Wrap the logger with our thread-safe writer safeWriter := &syncWriter{ writer: lumberjackLogger, } - exporter, err := stdouttrace.New( - stdouttrace.WithWriter(safeWriter), - ) + var exporter sdktrace.SpanExporter + var err error + + if cfg.ExportToJaeger { + exporter, err = otlptrace.New(ctx, otlptracegrpc.NewClient( + otlptracegrpc.WithInsecure(), + otlptracegrpc.WithEndpoint("localhost:4317"), + )) + } else { + exporter, err = stdouttrace.New( + stdouttrace.WithWriter(safeWriter), + ) + } if err != nil { - log.Fatal(err) + return nil, fmt.Errorf("failed to create exporter: %w", err) } - // Create a tracer provider with the exporter tp := sdktrace.NewTracerProvider( sdktrace.WithBatcher(exporter), sdktrace.WithResource(resource.NewWithAttributes( @@ -75,13 +94,45 @@ func InitTracer(cfg Config) func() { semconv.ServiceNameKey.String(ServiceName), )), ) - otel.SetTracerProvider(tp) + otel.SetTextMapPropagator(propagation.TraceContext{}) return func() { - if err := tp.Shutdown(context.Background()); err != nil { - log.Fatal(err) + if err := tp.Shutdown(ctx); err != nil { + log.Printf("Error shutting down tracer provider: %v", err) } - lumberjackLogger.Close() + }, nil +} + +// InjectTraceContext injects trace context into outgoing context +func InjectTraceContext(ctx context.Context) context.Context { + md := metadata.New(nil) + if existingMD, ok := metadata.FromOutgoingContext(ctx); ok { + md = existingMD.Copy() + } + propagation.TraceContext{}.Inject(ctx, &metadataCarrier{md}) + return metadata.NewOutgoingContext(ctx, md) +} + +type metadataCarrier struct { + md metadata.MD +} + +func (mc *metadataCarrier) Get(key string) string { + if values := mc.md.Get(strings.ToLower(key)); len(values) > 0 { + return values[0] + } + return "" +} + +func (mc *metadataCarrier) Set(key, value string) { + mc.md.Set(strings.ToLower(key), value) +} + +func (mc *metadataCarrier) Keys() []string { + keys := make([]string, 0, len(mc.md)) + for k := range mc.md { + keys = append(keys, k) } + return keys }