From 5356ef78598132b8406a4df327ae47e66385e706 Mon Sep 17 00:00:00 2001 From: jt-dd Date: Fri, 18 Oct 2024 14:16:20 +0200 Subject: [PATCH] refactoring --- cmd/kubehound/main.go | 5 ++++- cmd/kubehound/root.go | 3 --- cmd/kubehound/server.go | 3 --- cmd/kubehound/version.go | 4 ++++ deployments/kubehound/binary/Dockerfile_debug | 3 ++- pkg/cmd/config.go | 5 ++--- pkg/cmd/dump.go | 2 +- pkg/collector/file.go | 16 ++++++++-------- pkg/collector/k8s_api.go | 19 +++++++++---------- pkg/config/dynamic.go | 1 + pkg/config/k8s.go | 2 +- pkg/dump/ingestor.go | 4 ++-- pkg/dump/pipeline/pipeline.go | 6 +++--- pkg/dump/writer/file_writer.go | 6 +++--- pkg/dump/writer/fs_writer.go | 4 ++-- pkg/dump/writer/tar_writer.go | 8 ++++---- pkg/globals/application.go | 18 ++++++++++-------- pkg/ingestor/api/api.go | 19 ++++++++++--------- pkg/ingestor/ingestor.go | 2 +- pkg/ingestor/notifier/noop/noop.go | 2 +- pkg/ingestor/puller/blob/blob.go | 16 ++++++++-------- pkg/kubehound/core/core_dump.go | 6 ++++-- pkg/kubehound/core/core_grpc_api.go | 2 +- pkg/kubehound/core/core_grpc_client.go | 8 ++++---- pkg/kubehound/core/core_live.go | 6 +++--- pkg/kubehound/graph/builder.go | 2 +- pkg/kubehound/providers/providers.go | 2 +- .../storage/graphdb/janusgraph_edge_writer.go | 6 ++---- .../graphdb/janusgraph_vertex_writer.go | 6 ++---- pkg/kubehound/storage/storedb/mongo_writer.go | 4 ++-- pkg/telemetry/log/fields.go | 1 + pkg/telemetry/log/formatter.go | 2 +- pkg/telemetry/log/kv.go | 2 +- pkg/telemetry/log/logger.go | 4 ++++ pkg/telemetry/log/trace_logger.go | 12 ++++++------ pkg/telemetry/span/spans.go | 9 +++++---- pkg/telemetry/telemetry.go | 4 ++-- pkg/telemetry/tracer/tracer.go | 8 ++++++-- scripts/dashboard-demo/main.py | 8 ++++---- test/system/graph_dsl_test.go | 3 ++- test/system/graph_vertex_test.go | 2 +- 41 files changed, 129 insertions(+), 116 deletions(-) diff --git a/cmd/kubehound/main.go b/cmd/kubehound/main.go index cf913d504..77066fa33 100644 --- a/cmd/kubehound/main.go +++ b/cmd/kubehound/main.go @@ -1,13 +1,16 @@ package main import ( + "github.com/DataDog/KubeHound/pkg/cmd" "github.com/DataDog/KubeHound/pkg/telemetry/log" "github.com/DataDog/KubeHound/pkg/telemetry/tag" ) func main() { tag.SetupBaseTags() - if err := rootCmd.Execute(); err != nil { + err := rootCmd.Execute() + cmd.CloseKubehoundConfig(rootCmd.Context()) + if err != nil { log.Logger(rootCmd.Context()).Fatal(err.Error()) } } diff --git a/cmd/kubehound/root.go b/cmd/kubehound/root.go index f95b7a182..fedc496c1 100644 --- a/cmd/kubehound/root.go +++ b/cmd/kubehound/root.go @@ -70,9 +70,6 @@ var ( return nil }, - PersistentPostRunE: func(cobraCmd *cobra.Command, args []string) error { - return cmd.CloseKubehoundConfig(cobraCmd.Context()) - }, SilenceUsage: true, SilenceErrors: true, } diff --git a/cmd/kubehound/server.go b/cmd/kubehound/server.go index cfc24a1ca..77fd87860 100644 --- a/cmd/kubehound/server.go +++ b/cmd/kubehound/server.go @@ -26,9 +26,6 @@ var ( return core.CoreGrpcApi(cobraCmd.Context(), khCfg) }, - PersistentPostRunE: func(cobraCmd *cobra.Command, args []string) error { - return cmd.CloseKubehoundConfig(cobraCmd.Context()) - }, } ) diff --git a/cmd/kubehound/version.go b/cmd/kubehound/version.go index f06c0e0e7..f661608ea 100644 --- a/cmd/kubehound/version.go +++ b/cmd/kubehound/version.go @@ -2,6 +2,7 @@ package main import ( "fmt" + "os" "github.com/DataDog/KubeHound/pkg/config" "github.com/spf13/cobra" @@ -15,6 +16,9 @@ var ( Run: func(cobraCmd *cobra.Command, args []string) { fmt.Printf("kubehound version: %s (%s/%s)", config.BuildVersion, config.BuildArch, config.BuildOs) //nolint:forbidigo }, + PersistentPostRun: func(cobraCmd *cobra.Command, args []string) { + os.Exit(0) + }, } ) diff --git a/deployments/kubehound/binary/Dockerfile_debug b/deployments/kubehound/binary/Dockerfile_debug index 5d1d5493d..7ca3920f7 100644 --- a/deployments/kubehound/binary/Dockerfile_debug +++ b/deployments/kubehound/binary/Dockerfile_debug @@ -11,7 +11,8 @@ COPY deployments ./deployments RUN GOOS=linux GOARCH=amd64 go build -o "./bin/build/kubehound" ./cmd/kubehound/ -FROM ubuntu:24.04 AS build-release-stage +FROM registry.ddbuild.io/images/base/gbi-ubuntu_2404:release +# FROM ubuntu:24.04 AS build-release-stage WORKDIR / diff --git a/pkg/cmd/config.go b/pkg/cmd/config.go index 3439e910a..b66073606 100644 --- a/pkg/cmd/config.go +++ b/pkg/cmd/config.go @@ -39,14 +39,13 @@ func InitializeKubehoundConfig(ctx context.Context, configPath string, generateR } InitTags(ctx, khCfg) - InitTelemetry(khCfg) + InitTelemetry(ctx, khCfg) return nil } -func InitTelemetry(khCfg *config.KubehoundConfig) { - ctx := context.Background() +func InitTelemetry(ctx context.Context, khCfg *config.KubehoundConfig) { l := log.Logger(ctx) l.Info("Initializing application telemetry") err := telemetry.Initialize(ctx, khCfg) diff --git a/pkg/cmd/dump.go b/pkg/cmd/dump.go index 29eba481d..078932885 100644 --- a/pkg/cmd/dump.go +++ b/pkg/cmd/dump.go @@ -41,7 +41,7 @@ func InitRemoteDumpCmd(cmd *cobra.Command) { viper.BindPFlag(config.IngestorBlobBucketURL, cmd.Flags().Lookup("bucket-url")) //nolint: errcheck cmd.Flags().String("region", "", "Region to retrieve the configuration (only for s3) (e.g.: us-east-1)") - viper.BindPFlag(config.IngestorBlobBucketURL, cmd.Flags().Lookup("region")) //nolint: errcheck + viper.BindPFlag(config.IngestorBlobRegion, cmd.Flags().Lookup("region")) //nolint: errcheck } func InitLocalIngestCmd(cmd *cobra.Command) { diff --git a/pkg/collector/file.go b/pkg/collector/file.go index 5e2cc1564..09172c60e 100644 --- a/pkg/collector/file.go +++ b/pkg/collector/file.go @@ -136,7 +136,7 @@ func (c *FileCollector) streamPodsNamespace(ctx context.Context, fp string, inge } func (c *FileCollector) StreamPods(ctx context.Context, ingestor PodIngestor) error { - span, ctx := tracer.StartSpanFromContext(ctx, span.CollectorStream, tracer.Measured()) + span, ctx := span.SpanRunFromContext(ctx, span.CollectorStream) span.SetTag(tag.EntityTag, tag.EntityPods) l := log.Trace(ctx) var err error @@ -188,7 +188,7 @@ func (c *FileCollector) streamRolesNamespace(ctx context.Context, fp string, ing } func (c *FileCollector) StreamRoles(ctx context.Context, ingestor RoleIngestor) error { - span, ctx := tracer.StartSpanFromContext(ctx, span.CollectorStream, tracer.Measured()) + span, ctx := span.SpanRunFromContext(ctx, span.CollectorStream) span.SetTag(tag.EntityTag, tag.EntityRoles) l := log.Trace(ctx) var err error @@ -240,7 +240,7 @@ func (c *FileCollector) streamRoleBindingsNamespace(ctx context.Context, fp stri } func (c *FileCollector) StreamRoleBindings(ctx context.Context, ingestor RoleBindingIngestor) error { - span, ctx := tracer.StartSpanFromContext(ctx, span.CollectorStream, tracer.Measured()) + span, ctx := span.SpanRunFromContext(ctx, span.CollectorStream) span.SetTag(tag.EntityTag, tag.EntityRolebindings) l := log.Trace(ctx) var err error @@ -292,7 +292,7 @@ func (c *FileCollector) streamEndpointsNamespace(ctx context.Context, fp string, } func (c *FileCollector) StreamEndpoints(ctx context.Context, ingestor EndpointIngestor) error { - span, ctx := tracer.StartSpanFromContext(ctx, span.CollectorStream, tracer.Measured()) + span, ctx := span.SpanRunFromContext(ctx, span.CollectorStream) span.SetTag(tag.EntityTag, tag.EntityEndpoints) l := log.Trace(ctx) var err error @@ -324,7 +324,7 @@ func (c *FileCollector) StreamEndpoints(ctx context.Context, ingestor EndpointIn } func (c *FileCollector) StreamNodes(ctx context.Context, ingestor NodeIngestor) error { - span, ctx := tracer.StartSpanFromContext(ctx, span.CollectorStream, tracer.Measured()) + span, ctx := span.SpanRunFromContext(ctx, span.CollectorStream) span.SetTag(tag.EntityTag, tag.EntityNodes) l := log.Trace(ctx) var err error @@ -351,7 +351,7 @@ func (c *FileCollector) StreamNodes(ctx context.Context, ingestor NodeIngestor) } func (c *FileCollector) StreamClusterRoles(ctx context.Context, ingestor ClusterRoleIngestor) error { - span, ctx := tracer.StartSpanFromContext(ctx, span.CollectorStream, tracer.Measured()) + span, ctx := span.SpanRunFromContext(ctx, span.CollectorStream) span.SetTag(tag.EntityTag, tag.EntityClusterRoles) l := log.Trace(ctx) var err error @@ -378,7 +378,7 @@ func (c *FileCollector) StreamClusterRoles(ctx context.Context, ingestor Cluster } func (c *FileCollector) StreamClusterRoleBindings(ctx context.Context, ingestor ClusterRoleBindingIngestor) error { - span, ctx := tracer.StartSpanFromContext(ctx, span.CollectorStream, tracer.Measured()) + span, ctx := span.SpanRunFromContext(ctx, span.CollectorStream) span.SetTag(tag.EntityTag, tag.EntityClusterRolebindings) l := log.Trace(ctx) var err error @@ -407,7 +407,7 @@ func (c *FileCollector) StreamClusterRoleBindings(ctx context.Context, ingestor // readList loads a list of K8s API objects into memory from a JSON file on disk. // NOTE: This implementation reads the entire array of objects from the file into memory at once. func readList[Tl types.ListInputType](ctx context.Context, inputPath string) (Tl, error) { - span, _ := tracer.StartSpanFromContext(ctx, span.DumperReadFile, tracer.Measured()) + span, _ := span.SpanRunFromContext(ctx, span.DumperReadFile) var err error defer func() { span.Finish(tracer.WithError(err)) }() diff --git a/pkg/collector/k8s_api.go b/pkg/collector/k8s_api.go index 34c314e27..b4e2d59df 100644 --- a/pkg/collector/k8s_api.go +++ b/pkg/collector/k8s_api.go @@ -79,7 +79,7 @@ func NewK8sAPICollector(ctx context.Context, cfg *config.KubehoundConfig) (Colle } if !cfg.Collector.NonInteractive { - l.Warn("About to dump k8s cluster - Do you want to continue ? [Yes/No]", log.String("cluster", clusterName)) + l.Warn("About to dump k8s cluster - Do you want to continue ? [Yes/No]", log.String(log.FieldClusterKey, clusterName)) proceed, err := cmd.AskForConfirmation(ctx) if err != nil { return nil, err @@ -89,8 +89,7 @@ func NewK8sAPICollector(ctx context.Context, cfg *config.KubehoundConfig) (Colle return nil, errors.New("user did not confirm") } } else { - msg := fmt.Sprintf("Non-interactive mode enabled, proceeding with k8s cluster dump: %s", clusterName) - l.Warn(msg) + l.Warnf("Non-interactive mode enabled, proceeding with k8s cluster dump: %s", clusterName) } err = checkK8sAPICollectorConfig(cfg.Collector.Type) @@ -304,7 +303,7 @@ func (c *k8sAPICollector) streamPodsNamespace(ctx context.Context, namespace str func (c *k8sAPICollector) StreamPods(ctx context.Context, ingestor PodIngestor) error { entity := tag.EntityPods - span, ctx := tracer.StartSpanFromContext(ctx, span.CollectorStream, tracer.Measured()) + span, ctx := span.SpanRunFromContext(ctx, span.CollectorStream) span.SetTag(tag.EntityTag, entity) var err error defer func() { span.Finish(tracer.WithError(err)) }() @@ -359,7 +358,7 @@ func (c *k8sAPICollector) streamRolesNamespace(ctx context.Context, namespace st func (c *k8sAPICollector) StreamRoles(ctx context.Context, ingestor RoleIngestor) error { entity := tag.EntityRoles - span, ctx := tracer.StartSpanFromContext(ctx, span.CollectorStream, tracer.Measured()) + span, ctx := span.SpanRunFromContext(ctx, span.CollectorStream) span.SetTag(tag.EntityTag, entity) var err error defer func() { span.Finish(tracer.WithError(err)) }() @@ -414,7 +413,7 @@ func (c *k8sAPICollector) streamRoleBindingsNamespace(ctx context.Context, names func (c *k8sAPICollector) StreamRoleBindings(ctx context.Context, ingestor RoleBindingIngestor) error { entity := tag.EntityRolebindings - span, ctx := tracer.StartSpanFromContext(ctx, span.CollectorStream, tracer.Measured()) + span, ctx := span.SpanRunFromContext(ctx, span.CollectorStream) span.SetTag(tag.EntityTag, entity) var err error defer func() { span.Finish(tracer.WithError(err)) }() @@ -469,7 +468,7 @@ func (c *k8sAPICollector) streamEndpointsNamespace(ctx context.Context, namespac func (c *k8sAPICollector) StreamEndpoints(ctx context.Context, ingestor EndpointIngestor) error { entity := tag.EntityEndpoints - span, ctx := tracer.StartSpanFromContext(ctx, span.CollectorStream, tracer.Measured()) + span, ctx := span.SpanRunFromContext(ctx, span.CollectorStream) span.SetTag(tag.EntityTag, tag.EntityEndpoints) var err error defer func() { span.Finish(tracer.WithError(err)) }() @@ -487,7 +486,7 @@ func (c *k8sAPICollector) StreamEndpoints(ctx context.Context, ingestor Endpoint func (c *k8sAPICollector) StreamNodes(ctx context.Context, ingestor NodeIngestor) error { entity := tag.EntityNodes - span, ctx := tracer.StartSpanFromContext(ctx, span.CollectorStream, tracer.Measured()) + span, ctx := span.SpanRunFromContext(ctx, span.CollectorStream) span.SetTag(tag.EntityTag, entity) var err error defer func() { span.Finish(tracer.WithError(err)) }() @@ -530,7 +529,7 @@ func (c *k8sAPICollector) StreamNodes(ctx context.Context, ingestor NodeIngestor func (c *k8sAPICollector) StreamClusterRoles(ctx context.Context, ingestor ClusterRoleIngestor) error { entity := tag.EntityClusterRoles - span, ctx := tracer.StartSpanFromContext(ctx, span.CollectorStream, tracer.Measured()) + span, ctx := span.SpanRunFromContext(ctx, span.CollectorStream) span.SetTag(tag.EntityTag, entity) var err error defer func() { span.Finish(tracer.WithError(err)) }() @@ -573,7 +572,7 @@ func (c *k8sAPICollector) StreamClusterRoles(ctx context.Context, ingestor Clust func (c *k8sAPICollector) StreamClusterRoleBindings(ctx context.Context, ingestor ClusterRoleBindingIngestor) error { entity := tag.EntityClusterRolebindings - span, ctx := tracer.StartSpanFromContext(ctx, span.CollectorStream, tracer.Measured()) + span, ctx := span.SpanRunFromContext(ctx, span.CollectorStream) span.SetTag(tag.EntityTag, entity) var err error defer func() { span.Finish(tracer.WithError(err)) }() diff --git a/pkg/config/dynamic.go b/pkg/config/dynamic.go index dbc2938db..c0eb3f6e3 100644 --- a/pkg/config/dynamic.go +++ b/pkg/config/dynamic.go @@ -15,6 +15,7 @@ type DynamicConfig struct { mu sync.Mutex RunID *RunID `mapstructure:"run_id"` ClusterName string `mapstructure:"cluster_name"` + Service string `mapstructure:"service"` } func (d *DynamicConfig) HealthCheck() error { diff --git a/pkg/config/k8s.go b/pkg/config/k8s.go index 767a5ae11..784c22460 100644 --- a/pkg/config/k8s.go +++ b/pkg/config/k8s.go @@ -24,7 +24,7 @@ func NewClusterInfo(ctx context.Context) (*ClusterInfo, error) { l := log.Logger(ctx) clusterName := os.Getenv(clusterNameEnvVar) if clusterName != "" { - l.Warn("Using cluster name from environment variable", log.String("env_var", clusterNameEnvVar), log.String("cluster_name", clusterName)) + l.Warn("Using cluster name from environment variable", log.String("env_var", clusterNameEnvVar), log.String(log.FieldClusterKey, clusterName)) return &ClusterInfo{ Name: clusterName, diff --git a/pkg/dump/ingestor.go b/pkg/dump/ingestor.go index e8b142911..66264ceb2 100644 --- a/pkg/dump/ingestor.go +++ b/pkg/dump/ingestor.go @@ -76,7 +76,7 @@ func (d *DumpIngestor) OutputPath() string { } func (d *DumpIngestor) DumpK8sObjects(ctx context.Context) error { - spanDump, ctx := tracer.StartSpanFromContext(ctx, span.CollectorDump, tracer.Measured()) + spanDump, ctx := span.SpanRunFromContext(ctx, span.CollectorDump) var err error defer func() { spanDump.Finish(tracer.WithError(err)) }() @@ -114,7 +114,7 @@ const ( func ParsePath(ctx context.Context, path string) (*DumpResult, error) { l := log.Logger(ctx) - l.Warn("[Backward Compatibility] Extracting the metadata", log.String("path", path)) + l.Warn("[Backward Compatibility] Extracting the metadata", log.String(log.FieldPathKey, path)) // .//kubehound__[.tar.gz] // re := regexp.MustCompile(`([a-z0-9\.\-_]+)/kubehound_([a-z0-9\.-_]+)_([a-z0-9]{26})\.?([a-z0-9\.]+)?`) diff --git a/pkg/dump/pipeline/pipeline.go b/pkg/dump/pipeline/pipeline.go index 899fadca3..5b388dfb9 100644 --- a/pkg/dump/pipeline/pipeline.go +++ b/pkg/dump/pipeline/pipeline.go @@ -173,15 +173,15 @@ func (p *PipelineDumpIngestor) WaitAndClose(ctx context.Context) error { // Static wrapper to dump k8s object dynamically (streams Kubernetes objects to the collector writer). func dumpK8sObjs(ctx context.Context, operationName string, entity string, streamFunc StreamFunc) error { - span, ctx := tracer.StartSpanFromContext(ctx, operationName, tracer.Measured()) + span, ctx := span.SpanRunFromContext(ctx, operationName) span.SetTag(tag.EntityTag, entity) l := log.Logger(ctx) - l.Info("Dumping entity", log.String("entity", entity)) + l.Info("Dumping entity", log.String(log.FieldEntityKey, entity)) var err error defer func() { span.Finish(tracer.WithError(err)) }() err = streamFunc(ctx) - l.Info("Dumping entity done", log.String("entity", entity)) + l.Info("Dumping entity done", log.String(log.FieldEntityKey, entity)) return err } diff --git a/pkg/dump/writer/file_writer.go b/pkg/dump/writer/file_writer.go index 302c774a3..24e397b37 100644 --- a/pkg/dump/writer/file_writer.go +++ b/pkg/dump/writer/file_writer.go @@ -60,7 +60,7 @@ func (f *FileWriter) WorkerNumber() int { // All buffer are stored in a map which is flushed at the end of every type processed func (f *FileWriter) Write(ctx context.Context, k8sObj []byte, pathObj string) error { l := log.Logger(ctx) - l.Debug("Writing to file", log.String("path", pathObj)) + l.Debug("Writing to file", log.String(log.FieldPathKey, pathObj)) f.mu.Lock() defer f.mu.Unlock() @@ -110,7 +110,7 @@ func (f *FileWriter) Write(ctx context.Context, k8sObj []byte, pathObj string) e // No flush needed for the file writer as we are flushing the buffer at every write func (f *FileWriter) Flush(ctx context.Context) error { - span, _ := tracer.StartSpanFromContext(ctx, span.DumperWriterFlush, tracer.Measured()) + span, _ := span.SpanRunFromContext(ctx, span.DumperWriterFlush) span.SetTag(tag.DumperWriterTypeTag, FileTypeTag) var err error defer func() { span.Finish(tracer.WithError(err)) }() @@ -121,7 +121,7 @@ func (f *FileWriter) Flush(ctx context.Context) error { func (f *FileWriter) Close(ctx context.Context) error { l := log.Logger(ctx) l.Debug("Closing writers") - span, _ := tracer.StartSpanFromContext(ctx, span.DumperWriterClose, tracer.Measured()) + span, _ := span.SpanRunFromContext(ctx, span.DumperWriterClose) span.SetTag(tag.DumperWriterTypeTag, FileTypeTag) var err error defer func() { span.Finish(tracer.WithError(err)) }() diff --git a/pkg/dump/writer/fs_writer.go b/pkg/dump/writer/fs_writer.go index aa6848bf1..b6143e7ac 100644 --- a/pkg/dump/writer/fs_writer.go +++ b/pkg/dump/writer/fs_writer.go @@ -34,7 +34,7 @@ func NewFSWriter(ctx context.Context) (*FSWriter, error) { // All buffer are stored in a map which is flushed at the end of every type processed func (f *FSWriter) WriteFile(ctx context.Context, pathObj string, k8sObj []byte) error { l := log.Logger(ctx) - l.Debug("Writing to file", log.String("path", pathObj)) + l.Debug("Writing to file", log.String(log.FieldPathKey, pathObj)) f.mu.Lock() defer f.mu.Unlock() @@ -54,7 +54,7 @@ func (f *FSWriter) WriteFile(ctx context.Context, pathObj string, k8sObj []byte) // No flush needed for the file writer as we are flushing the buffer at every write func (f *FSWriter) Flush(ctx context.Context) error { - span, _ := tracer.StartSpanFromContext(ctx, span.DumperWriterFlush, tracer.Measured()) + span, _ := span.SpanRunFromContext(ctx, span.DumperWriterFlush) span.SetTag(tag.DumperWriterTypeTag, TarTypeTag) var err error defer func() { span.Finish(tracer.WithError(err)) }() diff --git a/pkg/dump/writer/tar_writer.go b/pkg/dump/writer/tar_writer.go index 558ebc2a7..ca93d278a 100644 --- a/pkg/dump/writer/tar_writer.go +++ b/pkg/dump/writer/tar_writer.go @@ -62,7 +62,7 @@ func NewTarWriter(ctx context.Context, tarPath string) (*TarWriter, error) { func createTarFile(ctx context.Context, tarPath string) (*os.File, error) { l := log.Logger(ctx) - l.Debugf("Creating tar file", log.String("path", tarPath)) + l.Debugf("Creating tar file", log.String(log.FieldPathKey, tarPath)) err := os.MkdirAll(filepath.Dir(tarPath), WriterDirMod) if err != nil { return nil, fmt.Errorf("failed to create directories: %w", err) @@ -83,7 +83,7 @@ func (f *TarWriter) WorkerNumber() int { // All buffer are stored in a map which is flushed at the end of every type processed func (t *TarWriter) Write(ctx context.Context, k8sObj []byte, filePath string) error { l := log.Logger(ctx) - l.Debug("Writing to file", log.String("path", filePath)) + l.Debug("Writing to file", log.String(log.FieldPathKey, filePath)) t.mu.Lock() defer t.mu.Unlock() @@ -100,7 +100,7 @@ func (t *TarWriter) Write(ctx context.Context, k8sObj []byte, filePath string) e func (t *TarWriter) Flush(ctx context.Context) error { l := log.Logger(ctx) l.Debug("Flushing writers") - span, _ := tracer.StartSpanFromContext(ctx, span.DumperWriterFlush, tracer.Measured()) + span, _ := span.SpanRunFromContext(ctx, span.DumperWriterFlush) span.SetTag(tag.DumperWriterTypeTag, TarTypeTag) var err error defer func() { span.Finish(tracer.WithError(err)) }() @@ -128,7 +128,7 @@ func (t *TarWriter) Flush(ctx context.Context) error { func (t *TarWriter) Close(ctx context.Context) error { l := log.Logger(ctx) l.Debug("Closing handlers for tar") - span, _ := tracer.StartSpanFromContext(ctx, span.DumperWriterClose, tracer.Measured()) + span, _ := span.SpanRunFromContext(ctx, span.DumperWriterClose) span.SetTag(tag.DumperWriterTypeTag, TarTypeTag) var err error defer func() { span.Finish(tracer.WithError(err)) }() diff --git a/pkg/globals/application.go b/pkg/globals/application.go index 90bc5021f..ff0cc8da2 100644 --- a/pkg/globals/application.go +++ b/pkg/globals/application.go @@ -5,9 +5,8 @@ import ( ) const ( - DDServiceName = "kubehound" - DefaultDDEnv = "dev" - DefaultComponent = "kubehound-ingestor" + DDServiceName = "kubehound" + DefaultDDEnv = "dev" ) func GetDDEnv() string { @@ -19,8 +18,11 @@ func GetDDEnv() string { return env } -const ( - FileCollectorComponent = "file-collector" - IngestorComponent = "pipeline-ingestor" - BuilderComponent = "graph-builder" -) +func GetDDServiceName() string { + serviceName := os.Getenv("DD_SERVICE_NAME") + if serviceName == "" { + return DDServiceName + } + + return serviceName +} diff --git a/pkg/ingestor/api/api.go b/pkg/ingestor/api/api.go index 986ab9219..14e72db8c 100644 --- a/pkg/ingestor/api/api.go +++ b/pkg/ingestor/api/api.go @@ -59,6 +59,7 @@ func NewIngestorAPI(cfg *config.KubehoundConfig, puller puller.DataPuller, notif // RehydrateLatest is just a GRPC wrapper around the Ingest method from the API package func (g *IngestorAPI) RehydrateLatest(ctx context.Context) ([]*grpc.IngestedCluster, error) { l := log.Logger(ctx) + l.Error("id123") // first level key are cluster names directories, errRet := g.puller.ListFiles(ctx, "", false) if errRet != nil { @@ -88,7 +89,7 @@ func (g *IngestorAPI) RehydrateLatest(ctx context.Context) ([]*grpc.IngestedClus if clusterErr != nil { errRet = errors.Join(errRet, fmt.Errorf("ingesting cluster %s: %w", latestDumpKey, clusterErr)) } - l.Info("Rehydrated cluster", log.String("cluster_name", clusterName), log.Time("dump_ingest_time", latestDumpIngestTime), log.String("dump_key", latestDumpKey)) + l.Info("Rehydrated cluster", log.String(log.FieldClusterKey, clusterName), log.Time("dump_ingest_time", latestDumpIngestTime), log.String("dump_key", latestDumpKey)) ingestedCluster := &grpc.IngestedCluster{ ClusterName: clusterName, Key: latestDumpKey, @@ -102,9 +103,9 @@ func (g *IngestorAPI) RehydrateLatest(ctx context.Context) ([]*grpc.IngestedClus } func (g *IngestorAPI) Ingest(ctx context.Context, path string) error { - l := log.Logger(ctx) // Settings global variables for the run in the context to propagate them to the spans runCtx := context.Background() + l := log.Logger(runCtx) archivePath, err := g.puller.Pull(runCtx, path) //nolint: contextcheck if err != nil { @@ -149,6 +150,12 @@ func (g *IngestorAPI) Ingest(ctx context.Context, path string) error { }, } + runCtx = context.WithValue(runCtx, log.ContextFieldCluster, clusterName) + runCtx = context.WithValue(runCtx, log.ContextFieldRunID, runID) + l = log.Logger(runCtx) + spanJob, runCtx := span.SpanRunFromContext(runCtx, span.IngestorStartJob) + defer func() { spanJob.Finish(tracer.WithError(err)) }() + events.PushEvent( fmt.Sprintf("Ingesting cluster %s with runID %s", clusterName, runID), fmt.Sprintf("Ingesting cluster %s with runID %s", clusterName, runID), @@ -157,12 +164,6 @@ func (g *IngestorAPI) Ingest(ctx context.Context, path string) error { }, ) - runCtx = context.WithValue(runCtx, span.ContextLogFieldClusterName, clusterName) - runCtx = context.WithValue(runCtx, span.ContextLogFieldRunID, runID) - - spanJob, runCtx := span.SpanIngestRunFromContext(runCtx, span.IngestorStartJob) - defer func() { spanJob.Finish(tracer.WithError(err)) }() - alreadyIngested, err := g.isAlreadyIngestedInGraph(runCtx, clusterName, runID) //nolint: contextcheck if err != nil { return err @@ -200,7 +201,7 @@ func (g *IngestorAPI) Ingest(ctx context.Context, path string) error { } if alreadyIngestedInDB { - l.Info("Data already ingested in the database for %s/%s, droping the current data", log.String("cluster_name", clusterName), log.String("run_id", runID)) + l.Info("Data already ingested in the database for %s/%s, droping the current data", log.String(log.FieldClusterKey, clusterName), log.String(log.FieldRunIDKey, runID)) err := g.providers.StoreProvider.Clean(runCtx, runID, clusterName) //nolint: contextcheck if err != nil { return err diff --git a/pkg/ingestor/ingestor.go b/pkg/ingestor/ingestor.go index 1988eb990..7a652b004 100644 --- a/pkg/ingestor/ingestor.go +++ b/pkg/ingestor/ingestor.go @@ -21,7 +21,7 @@ func IngestData(ctx context.Context, cfg *config.KubehoundConfig, collect collec l := log.Logger(ctx) start := time.Now() - span, ctx := tracer.StartSpanFromContext(ctx, span.IngestData, tracer.Measured()) + span, ctx := span.SpanRunFromContext(ctx, span.IngestData) var err error defer func() { span.Finish(tracer.WithError(err)) }() diff --git a/pkg/ingestor/notifier/noop/noop.go b/pkg/ingestor/notifier/noop/noop.go index 477f126f1..5c61eee35 100644 --- a/pkg/ingestor/notifier/noop/noop.go +++ b/pkg/ingestor/notifier/noop/noop.go @@ -15,7 +15,7 @@ func NewNoopNotifier() notifier.Notifier { func (n *NoopNotifier) Notify(ctx context.Context, clusterName string, runID string) error { l := log.Logger(ctx) - l.Warn("Noop Notifying for cluster and run ID", log.String("cluster_name", clusterName), log.String("run_id", runID)) + l.Warn("Noop Notifying for cluster and run ID", log.String(log.FieldClusterKey, clusterName), log.String(log.FieldRunIDKey, runID)) return nil } diff --git a/pkg/ingestor/puller/blob/blob.go b/pkg/ingestor/puller/blob/blob.go index baf6fefd7..0afae671f 100644 --- a/pkg/ingestor/puller/blob/blob.go +++ b/pkg/ingestor/puller/blob/blob.go @@ -132,8 +132,8 @@ func (bs *BlobStore) ListFiles(ctx context.Context, prefix string, recursive boo // Pull pulls the data from the blob store (e.g: s3) and returns the path of the folder containing the archive func (bs *BlobStore) Put(outer context.Context, archivePath string, clusterName string, runID string) error { l := log.Logger(outer) - l.Info("Pulling data from blob store bucket", log.String("bucket_name", bs.bucketName), log.String("cluster_name", clusterName), log.String("run_id", runID)) - spanPut, ctx := span.SpanIngestRunFromContext(outer, span.IngestorBlobPull) + l.Info("Putting data on blob store bucket", log.String("bucket_name", bs.bucketName), log.String(log.FieldClusterKey, clusterName), log.String(log.FieldRunIDKey, runID)) + spanPut, ctx := span.SpanRunFromContext(outer, span.IngestorBlobPull) var err error defer func() { spanPut.Finish(tracer.WithError(err)) }() @@ -148,7 +148,7 @@ func (bs *BlobStore) Put(outer context.Context, archivePath string, clusterName return err } defer b.Close() - l.Info("Opening archive file", log.String("path", archivePath)) + l.Info("Opening archive file", log.String(log.FieldPathKey, archivePath)) f, err := os.Open(archivePath) if err != nil { return err @@ -176,7 +176,7 @@ func (bs *BlobStore) Put(outer context.Context, archivePath string, clusterName func (bs *BlobStore) Pull(outer context.Context, key string) (string, error) { l := log.Logger(outer) l.Info("Pulling data from blob store bucket", log.String("bucket_name", bs.bucketName), log.String("key", key)) - spanPull, ctx := span.SpanIngestRunFromContext(outer, span.IngestorBlobPull) + spanPull, ctx := span.SpanRunFromContext(outer, span.IngestorBlobPull) var err error defer func() { spanPull.Finish(tracer.WithError(err)) }() @@ -197,7 +197,7 @@ func (bs *BlobStore) Pull(outer context.Context, key string) (string, error) { return dirname, err } - l.Info("Created temporary directory", log.String("path", dirname)) + l.Info("Created temporary directory", log.String(log.FieldPathKey, dirname)) archivePath := filepath.Join(dirname, config.DefaultArchiveName) f, err := os.Create(archivePath) if err != nil { @@ -205,7 +205,7 @@ func (bs *BlobStore) Pull(outer context.Context, key string) (string, error) { } defer f.Close() - l.Info("Downloading archive (%q) from blob store", log.String("key", key)) + l.Info("Downloading archive from blob store", log.String("key", key)) w := bufio.NewWriter(f) err = b.Download(ctx, key, w, nil) if err != nil { @@ -221,7 +221,7 @@ func (bs *BlobStore) Pull(outer context.Context, key string) (string, error) { } func (bs *BlobStore) Extract(ctx context.Context, archivePath string) error { - spanExtract, _ := span.SpanIngestRunFromContext(ctx, span.IngestorBlobExtract) + spanExtract, _ := span.SpanRunFromContext(ctx, span.IngestorBlobExtract) var err error defer func() { spanExtract.Finish(tracer.WithError(err)) }() @@ -243,7 +243,7 @@ func (bs *BlobStore) Extract(ctx context.Context, archivePath string) error { // Once downloaded and processed, we should cleanup the disk so we can reduce the disk usage // required for large infrastructure func (bs *BlobStore) Close(ctx context.Context, archivePath string) error { - spanClose, _ := span.SpanIngestRunFromContext(ctx, span.IngestorBlobClose) + spanClose, _ := span.SpanRunFromContext(ctx, span.IngestorBlobClose) var err error defer func() { spanClose.Finish(tracer.WithError(err)) }() diff --git a/pkg/kubehound/core/core_dump.go b/pkg/kubehound/core/core_dump.go index 75d08737b..4c638b221 100644 --- a/pkg/kubehound/core/core_dump.go +++ b/pkg/kubehound/core/core_dump.go @@ -14,6 +14,7 @@ import ( "github.com/DataDog/KubeHound/pkg/telemetry/log" "github.com/DataDog/KubeHound/pkg/telemetry/span" "github.com/DataDog/KubeHound/pkg/telemetry/tag" + "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext" "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer" ) @@ -34,7 +35,8 @@ func DumpCore(ctx context.Context, khCfg *config.KubehoundConfig, upload bool) ( start := time.Now() - span, ctx := tracer.StartSpanFromContext(ctx, span.DumperLaunch, tracer.Measured()) + span, ctx := span.SpanRunFromContext(ctx, span.DumperLaunch) + span.SetTag(ext.ManualKeep, true) defer func() { span.Finish(tracer.WithError(err)) }() @@ -53,7 +55,7 @@ func DumpCore(ctx context.Context, khCfg *config.KubehoundConfig, upload bool) ( if err != nil { return "", err } - l.Info("result saved to file", log.String("path", filePath)) + l.Info("result saved to file", log.String(log.FieldPathKey, filePath)) if upload { // Clean up the temporary directory when done diff --git a/pkg/kubehound/core/core_grpc_api.go b/pkg/kubehound/core/core_grpc_api.go index ef092f1a5..12bfaaec7 100644 --- a/pkg/kubehound/core/core_grpc_api.go +++ b/pkg/kubehound/core/core_grpc_api.go @@ -18,7 +18,7 @@ import ( func CoreGrpcApi(ctx context.Context, khCfg *config.KubehoundConfig) error { l := log.Logger(ctx) l.Info("Starting KubeHound Distributed Ingestor Service") - span, ctx := tracer.StartSpanFromContext(ctx, span.IngestorLaunch, tracer.Measured()) + span, ctx := span.SpanRunFromContext(ctx, span.IngestorLaunch) var err error defer func() { span.Finish(tracer.WithError(err)) diff --git a/pkg/kubehound/core/core_grpc_client.go b/pkg/kubehound/core/core_grpc_client.go index 91ff2e4fa..602ac81bb 100644 --- a/pkg/kubehound/core/core_grpc_client.go +++ b/pkg/kubehound/core/core_grpc_client.go @@ -41,9 +41,9 @@ func CoreClientGRPCIngest(ctx context.Context, ingestorConfig config.IngestorCon } defer conn.Close() client := pb.NewAPIClient(conn) - l.Info("Launching ingestion", log.String("endpoint", ingestorConfig.API.Endpoint), log.String("run_id", runID)) + l.Info("Launching ingestion", log.String("endpoint", ingestorConfig.API.Endpoint), log.String(log.FieldRunIDKey, runID)) - _, err = client.Ingest(context.Background(), &pb.IngestRequest{ + _, err = client.Ingest(ctx, &pb.IngestRequest{ RunId: runID, ClusterName: clusteName, }) @@ -64,13 +64,13 @@ func CoreClientGRPCRehydrateLatest(ctx context.Context, ingestorConfig config.In client := pb.NewAPIClient(conn) l.Info("Launching rehydratation [latest]", log.String("endpoint", ingestorConfig.API.Endpoint)) - results, err := client.RehydrateLatest(context.Background(), &pb.RehydrateLatestRequest{}) + results, err := client.RehydrateLatest(ctx, &pb.RehydrateLatestRequest{}) if err != nil { return fmt.Errorf("call rehydratation (latest): %w", err) } for _, res := range results.IngestedCluster { - l.Info("Rehydrated cluster", log.String("cluster_name", res.ClusterName), log.Time("time", res.Date.AsTime()), log.String("key", res.Key)) + l.Info("Rehydrated cluster", log.String(log.FieldClusterKey, res.ClusterName), log.Time("time", res.Date.AsTime()), log.String("key", res.Key)) } return nil diff --git a/pkg/kubehound/core/core_live.go b/pkg/kubehound/core/core_live.go index 449bf8e58..0363478b8 100644 --- a/pkg/kubehound/core/core_live.go +++ b/pkg/kubehound/core/core_live.go @@ -26,7 +26,7 @@ func CoreInitLive(ctx context.Context, khCfg *config.KubehoundConfig) error { // CoreLive will launch the KubeHound application to ingest data from a collector and create an attack graph. func CoreLive(ctx context.Context, khCfg *config.KubehoundConfig) error { l := log.Logger(ctx) - span, ctx := tracer.StartSpanFromContext(ctx, span.Launch, tracer.Measured()) + span, ctx := span.SpanRunFromContext(ctx, span.Launch) var err error defer func() { span.Finish(tracer.WithError(err)) }() @@ -38,7 +38,7 @@ func CoreLive(ctx context.Context, khCfg *config.KubehoundConfig) error { // Start the run start := time.Now() - l.Info("Starting KubeHound", log.String("run_id", khCfg.Dynamic.RunID.String()), log.String("cluster_name", khCfg.Dynamic.ClusterName)) + l.Info("Starting KubeHound", log.String(log.FieldRunIDKey, khCfg.Dynamic.RunID.String()), log.String("cluster_name", khCfg.Dynamic.ClusterName)) // Initialize the providers (graph, cache, store) l.Info("Initializing providers (graph, cache, store)") @@ -55,7 +55,7 @@ func CoreLive(ctx context.Context, khCfg *config.KubehoundConfig) error { return fmt.Errorf("ingest build data: %w", err) } - l.Info("KubeHound run complete", log.String("run_id", khCfg.Dynamic.RunID.String()), log.Duration("duration", time.Since(start))) + l.Info("KubeHound run complete", log.String(log.FieldRunIDKey, khCfg.Dynamic.RunID.String()), log.Duration("duration", time.Since(start))) return nil } diff --git a/pkg/kubehound/graph/builder.go b/pkg/kubehound/graph/builder.go index 62052b662..801316782 100644 --- a/pkg/kubehound/graph/builder.go +++ b/pkg/kubehound/graph/builder.go @@ -221,7 +221,7 @@ func BuildGraph(outer context.Context, cfg *config.KubehoundConfig, storedb stor graphdb graphdb.Provider, cache cache.CacheReader) error { l := log.Logger(outer) start := time.Now() - span, ctx := span.SpanIngestRunFromContext(outer, span.BuildGraph) + span, ctx := span.SpanRunFromContext(outer, span.BuildGraph) var err error defer func() { span.Finish(tracer.WithError(err)) }() diff --git a/pkg/kubehound/providers/providers.go b/pkg/kubehound/providers/providers.go index 937be9cdc..5d476d4ae 100644 --- a/pkg/kubehound/providers/providers.go +++ b/pkg/kubehound/providers/providers.go @@ -57,7 +57,7 @@ func NewProvidersFactoryConfig(ctx context.Context, khCfg *config.KubehoundConfi return nil, fmt.Errorf("graph database client creation: %w", err) } msg = fmt.Sprintf("Loaded %s graph provider", gp.Name()) - l.Infof(msg, log.String("provider", sp.Name())) + l.Info(msg, log.String("provider", sp.Name())) err = gp.Prepare(ctx) if err != nil { diff --git a/pkg/kubehound/storage/graphdb/janusgraph_edge_writer.go b/pkg/kubehound/storage/graphdb/janusgraph_edge_writer.go index 430181039..93bf15088 100644 --- a/pkg/kubehound/storage/graphdb/janusgraph_edge_writer.go +++ b/pkg/kubehound/storage/graphdb/janusgraph_edge_writer.go @@ -96,8 +96,7 @@ func (jgv *JanusGraphEdgeWriter) startBackgroundWriter(ctx context.Context) { // batchWrite will write a batch of entries into the graph DB and block until the write completes. // Callers are responsible for doing an Add(1) to the writingInFlight wait group to ensure proper synchronization. func (jgv *JanusGraphEdgeWriter) batchWrite(ctx context.Context, data []any) error { - span, ctx := tracer.StartSpanFromContext(ctx, span.JanusGraphBatchWrite, - tracer.Measured(), tracer.ServiceName(TracerServicename)) + span, ctx := span.SpanRunFromContext(ctx, span.JanusGraphBatchWrite) span.SetTag(tag.LabelTag, jgv.builder) var err error defer func() { span.Finish(tracer.WithError(err)) }() @@ -127,8 +126,7 @@ func (jgv *JanusGraphEdgeWriter) Close(ctx context.Context) error { // Flush triggers writes of any remaining items in the queue. // This is blocking func (jgv *JanusGraphEdgeWriter) Flush(ctx context.Context) error { - span, ctx := tracer.StartSpanFromContext(ctx, span.JanusGraphFlush, - tracer.Measured(), tracer.ServiceName(TracerServicename)) + span, ctx := span.SpanRunFromContext(ctx, span.JanusGraphFlush) span.SetTag(tag.LabelTag, jgv.builder) var err error defer func() { span.Finish(tracer.WithError(err)) }() diff --git a/pkg/kubehound/storage/graphdb/janusgraph_vertex_writer.go b/pkg/kubehound/storage/graphdb/janusgraph_vertex_writer.go index 0272408c9..55a92ca05 100644 --- a/pkg/kubehound/storage/graphdb/janusgraph_vertex_writer.go +++ b/pkg/kubehound/storage/graphdb/janusgraph_vertex_writer.go @@ -123,8 +123,7 @@ func (jgv *JanusGraphVertexWriter) cacheIds(ctx context.Context, idMap []*gremli // batchWrite will write a batch of entries into the graph DB and block until the write completes. // Callers are responsible for doing an Add(1) to the writingInFlight wait group to ensure proper synchronization. func (jgv *JanusGraphVertexWriter) batchWrite(ctx context.Context, data []any) error { - span, ctx := tracer.StartSpanFromContext(ctx, span.JanusGraphBatchWrite, - tracer.Measured(), tracer.ServiceName(TracerServicename)) + span, ctx := span.SpanRunFromContext(ctx, span.JanusGraphBatchWrite) span.SetTag(tag.LabelTag, jgv.builder) var err error defer func() { span.Finish(tracer.WithError(err)) }() @@ -162,8 +161,7 @@ func (jgv *JanusGraphVertexWriter) Close(ctx context.Context) error { // Flush triggers writes of any remaining items in the queue. // This is blocking func (jgv *JanusGraphVertexWriter) Flush(ctx context.Context) error { - span, ctx := tracer.StartSpanFromContext(ctx, span.JanusGraphFlush, - tracer.Measured(), tracer.ServiceName(TracerServicename)) + span, ctx := span.SpanRunFromContext(ctx, span.JanusGraphFlush) span.SetTag(tag.LabelTag, jgv.builder) var err error defer func() { span.Finish(tracer.WithError(err)) }() diff --git a/pkg/kubehound/storage/storedb/mongo_writer.go b/pkg/kubehound/storage/storedb/mongo_writer.go index 8d01347fb..ad606852b 100644 --- a/pkg/kubehound/storage/storedb/mongo_writer.go +++ b/pkg/kubehound/storage/storedb/mongo_writer.go @@ -85,7 +85,7 @@ func (maw *MongoAsyncWriter) startBackgroundWriter(ctx context.Context) { // batchWrite blocks until the write is complete func (maw *MongoAsyncWriter) batchWrite(ctx context.Context, ops []mongo.WriteModel) error { - span, ctx := tracer.StartSpanFromContext(ctx, span.MongoDBBatchWrite, tracer.Measured()) + span, ctx := span.SpanRunFromContext(ctx, span.MongoDBBatchWrite) span.SetTag(tag.CollectionTag, maw.collection.Name()) var err error defer func() { span.Finish(tracer.WithError(err)) }() @@ -126,7 +126,7 @@ func (maw *MongoAsyncWriter) Queue(ctx context.Context, model any) error { // Flush triggers writes of any remaining items in the queue. // This is blocking func (maw *MongoAsyncWriter) Flush(ctx context.Context) error { - span, ctx := tracer.StartSpanFromContext(ctx, span.MongoDBFlush, tracer.Measured()) + span, ctx := span.SpanRunFromContext(ctx, span.MongoDBFlush) span.SetTag(tag.CollectionTag, maw.collection.Name()) var err error defer func() { span.Finish(tracer.WithError(err)) }() diff --git a/pkg/telemetry/log/fields.go b/pkg/telemetry/log/fields.go index 5f51845ab..8cc36bf1a 100644 --- a/pkg/telemetry/log/fields.go +++ b/pkg/telemetry/log/fields.go @@ -23,6 +23,7 @@ const ( FieldRunIDKey = "run_id" FieldTeamKey = "team" FieldServiceKey = "service" + FieldAppKey = "app" FieldIngestorPipelineKey = "ingestor_pipeline" FieldDumpPipelineKey = "dump_pipeline" FieldPathKey = "path" diff --git a/pkg/telemetry/log/formatter.go b/pkg/telemetry/log/formatter.go index 21c6f76bb..41efd0c54 100644 --- a/pkg/telemetry/log/formatter.go +++ b/pkg/telemetry/log/formatter.go @@ -89,7 +89,7 @@ func newZapConfig() zap.Config { } zc.InitialFields = map[string]interface{}{ - FieldServiceKey: "kubehound", + FieldAppKey: "kubehound", } return zc diff --git a/pkg/telemetry/log/kv.go b/pkg/telemetry/log/kv.go index 1a0a396e5..98d213531 100644 --- a/pkg/telemetry/log/kv.go +++ b/pkg/telemetry/log/kv.go @@ -13,7 +13,7 @@ import ( ) var ( - DefaultRemovedFields = []string{FieldTeamKey, FieldServiceKey, FieldRunIDKey, FieldClusterKey, FieldComponentKey, spanIDKey, traceIDKey} + DefaultRemovedFields = []string{FieldTeamKey, FieldServiceKey, FieldAppKey, FieldRunIDKey, FieldClusterKey, FieldComponentKey, spanIDKey, traceIDKey} bufferpool = buffer.NewPool() ) diff --git a/pkg/telemetry/log/logger.go b/pkg/telemetry/log/logger.go index 228284a1f..cd0222bb7 100644 --- a/pkg/telemetry/log/logger.go +++ b/pkg/telemetry/log/logger.go @@ -50,6 +50,10 @@ func Logger(ctx context.Context) LoggerI { const ( spanIDKey = "dd.span_id" traceIDKey = "dd.trace_id" + + logFormatDD = "dd" + logFormatJSON = "json" + logFormatText = "text" ) // DefaultLogger returns the global logger diff --git a/pkg/telemetry/log/trace_logger.go b/pkg/telemetry/log/trace_logger.go index 21c68344c..c6e53440b 100644 --- a/pkg/telemetry/log/trace_logger.go +++ b/pkg/telemetry/log/trace_logger.go @@ -133,25 +133,25 @@ func (t *traceLogger) Fatal(msg string, fields ...Field) { } func (t *traceLogger) Debugf(msg string, params ...interface{}) { - t.logger.Debugf(t.appendTracingFields(msg), params...) + t.logger.With(t.fields...).Debugf(msg, params...) } func (t *traceLogger) Infof(msg string, params ...interface{}) { - t.logger.Infof(t.appendTracingFields(msg), params...) + t.logger.With(t.fields...).Infof(msg, params...) } func (t *traceLogger) Warnf(msg string, params ...interface{}) { - t.logger.Warnf(t.appendTracingFields(msg), params...) + t.logger.With(t.fields...).Warnf(msg, params...) } func (t *traceLogger) Errorf(msg string, params ...interface{}) { - t.logger.Errorf(t.appendTracingFields(msg), params...) + t.logger.With(t.fields...).Errorf(msg, params...) } func (t *traceLogger) Panicf(msg string, params ...interface{}) { - t.logger.Panicf(t.appendTracingFields(msg), params...) + t.logger.With(t.fields...).Panicf(msg, params...) } func (t *traceLogger) Fatalf(msg string, params ...interface{}) { - t.logger.Fatalf(t.appendTracingFields(msg), params...) + t.logger.With(t.fields...).Fatalf(msg, params...) } diff --git a/pkg/telemetry/span/spans.go b/pkg/telemetry/span/spans.go index 25725da82..3d1537e8d 100644 --- a/pkg/telemetry/span/spans.go +++ b/pkg/telemetry/span/spans.go @@ -3,6 +3,7 @@ package span import ( "context" + "github.com/DataDog/KubeHound/pkg/telemetry/log" "github.com/DataDog/KubeHound/pkg/telemetry/tag" "gopkg.in/DataDog/dd-trace-go.v1/ddtrace" "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer" @@ -81,14 +82,14 @@ func convertTag(value any) string { return val } -func SpanIngestRunFromContext(runCtx context.Context, spanName string) (ddtrace.Span, context.Context) { - spanJob, runCtx := tracer.StartSpanFromContext(runCtx, spanName, tracer.ResourceName(convertTag(runCtx.Value(ContextLogFieldClusterName))), tracer.Measured()) +func SpanRunFromContext(runCtx context.Context, spanName string) (ddtrace.Span, context.Context) { + spanJob, runCtx := tracer.StartSpanFromContext(runCtx, spanName, tracer.ResourceName(convertTag(runCtx.Value(log.ContextFieldCluster))), tracer.Measured()) spanIngestRunSetDefaultTag(runCtx, spanJob) return spanJob, runCtx } func spanIngestRunSetDefaultTag(ctx context.Context, span ddtrace.Span) { - span.SetTag(tag.CollectorClusterTag, convertTag(ctx.Value(ContextLogFieldClusterName))) - span.SetTag(tag.RunIdTag, convertTag(ctx.Value(ContextLogFieldRunID))) + span.SetTag(tag.CollectorClusterTag, convertTag(ctx.Value(log.ContextFieldCluster))) + span.SetTag(tag.RunIdTag, convertTag(ctx.Value(log.ContextFieldRunID))) } diff --git a/pkg/telemetry/telemetry.go b/pkg/telemetry/telemetry.go index 71017e5d7..a25629013 100644 --- a/pkg/telemetry/telemetry.go +++ b/pkg/telemetry/telemetry.go @@ -41,7 +41,7 @@ func Initialize(ctx context.Context, khCfg *config.KubehoundConfig) error { func Shutdown(ctx context.Context, enabled bool) { l := log.Logger(ctx) - if enabled { + if !enabled { return } @@ -49,7 +49,7 @@ func Shutdown(ctx context.Context, enabled bool) { profiler.Shutdown() // Tracing - tracer.Shutdown() + tracer.Shutdown(ctx) // Metrics err := statsd.Flush() diff --git a/pkg/telemetry/tracer/tracer.go b/pkg/telemetry/tracer/tracer.go index a3619ad4a..e7eedd083 100644 --- a/pkg/telemetry/tracer/tracer.go +++ b/pkg/telemetry/tracer/tracer.go @@ -13,12 +13,14 @@ import ( func Initialize(ctx context.Context, cfg *config.KubehoundConfig) { l := log.Logger(ctx) + // Default options opts := []tracer.StartOption{ tracer.WithEnv(globals.GetDDEnv()), - tracer.WithService(globals.DDServiceName), + tracer.WithService(globals.GetDDServiceName()), tracer.WithServiceVersion(config.BuildVersion), tracer.WithLogStartup(true), + tracer.WithAnalytics(true), } if cfg.Telemetry.Tracer.URL != "" { @@ -43,6 +45,8 @@ func Initialize(ctx context.Context, cfg *config.KubehoundConfig) { tracer.Start(opts...) } -func Shutdown() { +func Shutdown(ctx context.Context) { + l := log.Logger(ctx) + l.Debug("Stoping tracer") tracer.Stop() } diff --git a/scripts/dashboard-demo/main.py b/scripts/dashboard-demo/main.py index 19f09aae3..5a4613cb7 100644 --- a/scripts/dashboard-demo/main.py +++ b/scripts/dashboard-demo/main.py @@ -26,7 +26,7 @@ def __init__(self, client): self.res_query_critical_path = c.submit(self.KH_QUERY_EXTERNAL_CRITICAL_PATH).all().result() self.get_details() print("Loading " + self.DISPLAY_TITLE + " DONE") - + def get_main(self): return self.DESCRIPTION @@ -44,7 +44,7 @@ def get_details(self): def display(self): return pn.Column( - f'# {self.DISPLAY_TITLE}', + f'# {self.DISPLAY_TITLE}', pn.layout.Divider(), pn.Row( self.DESCRIPTION, @@ -186,7 +186,7 @@ def get_main(self): def display(self): return pn.Column( - f'# {self.DISPLAY_TITLE}', + f'# {self.DISPLAY_TITLE}', pn.layout.Divider(), pn.Row( self.get_main(), @@ -267,4 +267,4 @@ def GetClusterName(): main=[pn.Column(data, sizing_mode="stretch_both")], main_layout=None, accent=ACCENT, -).servable() \ No newline at end of file +).servable() diff --git a/test/system/graph_dsl_test.go b/test/system/graph_dsl_test.go index a71e3f6b2..e39513c3f 100644 --- a/test/system/graph_dsl_test.go +++ b/test/system/graph_dsl_test.go @@ -17,7 +17,8 @@ type DslTestSuite struct { } func (suite *DslTestSuite) SetupTest() { - gdb, err := graphdb.Factory(context.Background(), config.MustLoadConfig(KubeHoundConfigPath)) + ctx := context.Background() + gdb, err := graphdb.Factory(ctx, config.MustLoadConfig(ctx, KubeHoundConfigPath)) suite.Require().NoError(err) suite.gdb = gdb suite.client = gdb.Raw().(*gremlingo.DriverRemoteConnection) diff --git a/test/system/graph_vertex_test.go b/test/system/graph_vertex_test.go index c795003b5..1cc139565 100644 --- a/test/system/graph_vertex_test.go +++ b/test/system/graph_vertex_test.go @@ -84,7 +84,7 @@ type VertexTestSuite struct { func (suite *VertexTestSuite) SetupSuite() { require := suite.Require() ctx := context.Background() - cfg := config.MustLoadConfig("./kubehound.yaml") + cfg := config.MustLoadConfig(ctx, "./kubehound.yaml") // JanusGraph gdb, err := graphdb.Factory(ctx, cfg)