diff --git a/internal/elasticsearch/ingest/pipeline.go b/internal/elasticsearch/ingest/pipeline.go index 660717c80..15b4e6719 100644 --- a/internal/elasticsearch/ingest/pipeline.go +++ b/internal/elasticsearch/ingest/pipeline.go @@ -8,14 +8,17 @@ import ( "bytes" "context" "encoding/json" + "errors" "fmt" "io" "net/http" "strings" + "time" "gopkg.in/yaml.v3" "github.com/elastic/elastic-package/internal/elasticsearch" + "github.com/elastic/elastic-package/internal/logger" ) type simulatePipelineRequest struct { @@ -23,16 +26,45 @@ type simulatePipelineRequest struct { } type simulatePipelineResponse struct { - Docs []pipelineIngestedDocument `json:"docs"` + Docs []struct { + ProcessorResults []verboseProcessorResult `json:"processor_results"` + } `json:"docs"` +} + +type verboseProcessorResult struct { + Processor string `json:"processor_type"` + Status string `json:"status"` + Doc pipelineDocument `json:"doc"` + Error verboseProcessorError `json:"error"` + Ignored struct { + Error verboseProcessorError `json:"error"` + } `json:"ignored_error"` +} + +type verboseProcessorError struct { + Type string `json:"type"` + Reason string `json:"reason"` + RootCause json.RawMessage `json:"root_cause"` +} + +func (e verboseProcessorError) Error() string { + return fmt.Sprintf("[%s] %s", e.Type, e.Reason) } type pipelineDocument struct { - Index string `json:"_index"` - Source json.RawMessage `json:"_source"` + Index string `json:"_index"` + Source json.RawMessage `json:"_source"` + Ingest verboseProcessorIngest `json:"_ingest"` } -type pipelineIngestedDocument struct { - Doc pipelineDocument `json:"doc"` +type verboseProcessorIngest struct { + Pipeline string `json:"pipeline"` + Timestamp time.Time `json:"timestamp"` + + OnFailurePipeline string `json:"on_failure_pipeline"` + OnFailureMessage string `json:"on_failure_message"` + OnFailureProcessorTag string `json:"on_failure_processor_tag"` + OnFailureProcessorType string `json:"on_failure_processor_type"` } // Pipeline represents a pipeline resource loaded from a file @@ -90,6 +122,7 @@ func SimulatePipeline(ctx context.Context, api *elasticsearch.API, pipelineName r, err := api.Ingest.Simulate(bytes.NewReader(requestBody), api.Ingest.Simulate.WithContext(ctx), api.Ingest.Simulate.WithPipelineID(pipelineName), + api.Ingest.Simulate.WithVerbose(true), ) if err != nil { return nil, fmt.Errorf("simulate API call failed (pipelineName: %s): %w", pipelineName, err) @@ -111,11 +144,59 @@ func SimulatePipeline(ctx context.Context, api *elasticsearch.API, pipelineName return nil, fmt.Errorf("unmarshalling simulate request failed: %w", err) } + handleErrors := func(ingest verboseProcessorIngest, errs []error) []error { + var filtered []error + for _, err := range errs { + var processorError verboseProcessorError + if errors.As(err, &processorError) && processorError.Reason == ingest.OnFailureMessage { + continue + } + filtered = append(filtered, err) + } + return filtered + } + processedEvents := make([]json.RawMessage, len(response.Docs)) + var errs []error for i, doc := range response.Docs { - processedEvents[i] = doc.Doc.Source + var source json.RawMessage + failed := false + for _, result := range doc.ProcessorResults { + if result.Doc.Ingest.OnFailureMessage != "" { + // This processor is in an on_failure handler, filter out the handled errors + // and assume that processing is going on. + errs = handleErrors(result.Doc.Ingest, errs) + failed = false + } + + switch result.Status { + case "success": + // Keep last successful document. + source = result.Doc.Source + case "dropped": + source = nil + case "skipped": + continue + case "error_ignored": + logger.Debugf("error ignored for processor %s: [%s] %s", result.Processor, result.Ignored.Error.Type, result.Ignored.Error.Reason) + continue + case "error": + failed = true + errs = append(errs, fmt.Errorf("error in processor %s: %w", result.Processor, result.Error)) + case "failed": + failed = true + errs = append(errs, fmt.Errorf("%q processor failed", result.Processor)) + default: + errs = append(errs, fmt.Errorf("unexpected result status %q for processor %q", result.Status, result.Processor)) + } + } + + if !failed { + processedEvents[i] = source + } } - return processedEvents, nil + + return processedEvents, errors.Join(errs...) } func UninstallPipelines(ctx context.Context, api *elasticsearch.API, pipelines []Pipeline) error {