diff --git a/AGENTS.md b/AGENTS.md index 37ef636..fae7ba7 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -186,7 +186,7 @@ Setting `ZelyoConfig.spec.mode: protect` only flips the remediation engine's str Only then does the full loop fire: 1. Correlator emits an incident -2. `RemediationPolicy` controller filters open incidents by `spec.severityFilter` and caps PR submissions per reconcile cycle via `spec.maxConcurrentPRs` +2. `RemediationPolicy` controller filters open incidents by `spec.severityFilter` and caps the number of open PRs via `spec.maxConcurrentPRs` — already-open Zelyo PRs on the target repo count against the budget, so new PRs only open when existing ones merge or close 3. Remediation engine asks the LLM for a structured JSON fix plan and scores the risk 4. GitHub engine creates a branch, commits the fix, and opens a PR (skipped globally when `ZelyoConfig.spec.mode: audit`, which leaves the engine in `dry-run`) 5. Human team reviews and merges the PR diff --git a/README.md b/README.md index c00cf62..0c0c2d6 100644 --- a/README.md +++ b/README.md @@ -76,7 +76,7 @@ graph LR | Mode | Behavior | | ----------------------------- | --------------------------------------------------------------------------------------- | | **Audit** *(default)* | Detects, correlates, and alerts. The remediation engine runs in `dry-run` — fix plans are logged but no PRs are opened. | -| **Protect** | Switches the remediation engine to the `gitops-pr` strategy. PRs are opened only when at least one `RemediationPolicy` CR points at a configured `GitOpsRepository`. The policy's `severityFilter` decides which incidents qualify, and `maxConcurrentPRs` caps submissions per reconcile cycle. | +| **Protect** | Switches the remediation engine to the `gitops-pr` strategy. PRs are opened only when at least one `RemediationPolicy` CR points at a configured `GitOpsRepository`. The policy's `severityFilter` decides which incidents qualify, and `maxConcurrentPRs` caps the number of open Zelyo PRs on the target repo. | > **Note:** `ZelyoConfig.spec.mode: protect` by itself does not produce any PRs — it only authorizes the pipeline. See [Enable GitOps Remediation](docs/quickstart.md#enable-gitops-remediation) for the full `GitOpsRepository` + `RemediationPolicy` setup. diff --git a/api/v1alpha1/remediationpolicy_types.go b/api/v1alpha1/remediationpolicy_types.go index c716d27..c827303 100644 --- a/api/v1alpha1/remediationpolicy_types.go +++ b/api/v1alpha1/remediationpolicy_types.go @@ -41,7 +41,11 @@ type RemediationPolicySpec struct { // +optional DryRun bool `json:"dryRun,omitempty"` - // maxConcurrentPRs limits the number of open PRs at any time. + // maxConcurrentPRs limits the number of open Zelyo-generated PRs on + // the target repo at any time. Already-open PRs count against the + // budget, so new PRs only open when existing ones merge or close. + // Current count is surfaced on status.openPRs. Multiple + // RemediationPolicies targeting the same repo share this budget. // +kubebuilder:validation:Minimum=1 // +kubebuilder:default=5 // +optional diff --git a/config/crd/bases/zelyo.ai_remediationpolicies.yaml b/config/crd/bases/zelyo.ai_remediationpolicies.yaml index 0dad0da..3adc397 100644 --- a/config/crd/bases/zelyo.ai_remediationpolicies.yaml +++ b/config/crd/bases/zelyo.ai_remediationpolicies.yaml @@ -73,8 +73,12 @@ spec: type: string maxConcurrentPRs: default: 5 - description: maxConcurrentPRs limits the number of open PRs at any - time. + description: |- + maxConcurrentPRs limits the number of open Zelyo-generated PRs on + the target repo at any time. Already-open PRs count against the + budget, so new PRs only open when existing ones merge or close. + Current count is surfaced on status.openPRs. Multiple + RemediationPolicies targeting the same repo share this budget. format: int32 minimum: 1 type: integer diff --git a/deploy/helm/zelyo-operator/crds/zelyo.ai_remediationpolicies.yaml b/deploy/helm/zelyo-operator/crds/zelyo.ai_remediationpolicies.yaml index 0dad0da..3adc397 100644 --- a/deploy/helm/zelyo-operator/crds/zelyo.ai_remediationpolicies.yaml +++ b/deploy/helm/zelyo-operator/crds/zelyo.ai_remediationpolicies.yaml @@ -73,8 +73,12 @@ spec: type: string maxConcurrentPRs: default: 5 - description: maxConcurrentPRs limits the number of open PRs at any - time. + description: |- + maxConcurrentPRs limits the number of open Zelyo-generated PRs on + the target repo at any time. Already-open PRs count against the + budget, so new PRs only open when existing ones merge or close. + Current count is surfaced on status.openPRs. Multiple + RemediationPolicies targeting the same repo share this budget. format: int32 minimum: 1 type: integer diff --git a/docs/gitops-onboarding.md b/docs/gitops-onboarding.md index 956db97..d651fc9 100644 --- a/docs/gitops-onboarding.md +++ b/docs/gitops-onboarding.md @@ -181,7 +181,7 @@ metadata: spec: gitOpsRepository: production-manifests # must match a GitOpsRepository CR severityFilter: high # critical | high | medium | low - maxConcurrentPRs: 3 # cap per reconcile cycle (not a global open-PR count) + maxConcurrentPRs: 3 # cap on open Zelyo PRs in the target repo; surfaced on status.openPRs prTemplate: titlePrefix: "[Zelyo Operator]" labels: ["auto-fix", "security"] diff --git a/docs/index.md b/docs/index.md index f588eae..333e336 100644 --- a/docs/index.md +++ b/docs/index.md @@ -116,7 +116,7 @@ Aggregate views, cross-cluster correlation, and centralized policy management ac | Mode | When | Behavior | |---|---|---| | **:material-magnify: Audit Mode** (default) | `ZelyoConfig.spec.mode: audit` | Detects, diagnoses, and sends alerts. The remediation engine runs in `dry-run` — fix plans are logged but no PRs are opened. Zero cluster modifications. | -| **:material-shield-check: Protect Mode** | `ZelyoConfig.spec.mode: protect` **and** at least one `RemediationPolicy` targeting a `GitOpsRepository` | Switches the remediation engine to the `gitops-pr` strategy. The `RemediationPolicy` controller drives PR creation — `severityFilter` gates which incidents qualify, `maxConcurrentPRs` caps submissions per reconcile cycle. | +| **:material-shield-check: Protect Mode** | `ZelyoConfig.spec.mode: protect` **and** at least one `RemediationPolicy` targeting a `GitOpsRepository` | Switches the remediation engine to the `gitops-pr` strategy. The `RemediationPolicy` controller drives PR creation — `severityFilter` gates which incidents qualify, `maxConcurrentPRs` caps the number of open Zelyo PRs on the target repo. | !!! note `ZelyoConfig.spec.mode: protect` only flips the engine strategy from `dry-run` to `gitops-pr`. **No PRs are opened until you also create at least one `RemediationPolicy` that points at a `GitOpsRepository`.** See [GitOps Onboarding](gitops-onboarding.md) for the full setup. diff --git a/docs/quickstart.md b/docs/quickstart.md index fba727c..6e7dba1 100644 --- a/docs/quickstart.md +++ b/docs/quickstart.md @@ -325,7 +325,7 @@ All three pieces are required — skipping any one of them means no PRs: | --- | --- | | `ZelyoConfig.spec.mode: protect` | Flips the remediation engine from `dry-run` to `gitops-pr`. Without this, plans are logged but never submitted. | | `GitOpsRepository` | Tells Zelyo which repo, branch, and paths to write fixes into, and provides Git auth. | -| `RemediationPolicy` | The only controller that calls `GeneratePlan` + `ApplyPlan`. `severityFilter` gates which incidents qualify; `maxConcurrentPRs` caps PR submissions per reconcile cycle (not a global limit on open PRs). | +| `RemediationPolicy` | The only controller that calls `GeneratePlan` + `ApplyPlan`. `severityFilter` gates which incidents qualify; `maxConcurrentPRs` caps the number of open Zelyo PRs on the target repo — already-open PRs count against the budget, so new PRs only open when existing ones merge or close. The current count surfaces on `status.openPRs`. | **0. Switch `ZelyoConfig` to Protect mode** (`ZelyoConfig` is cluster-scoped — no `-n` flag): @@ -373,7 +373,7 @@ spec: labels: ["security", "automated"] branchPrefix: "zelyo/fix-" severityFilter: high - maxConcurrentPRs: 3 # per reconcile cycle + maxConcurrentPRs: 3 # caps total open Zelyo PRs on the target repo dryRun: false autoMerge: false ``` diff --git a/internal/controller/remediationpolicy_controller.go b/internal/controller/remediationpolicy_controller.go index 9de585e..e4c115f 100644 --- a/internal/controller/remediationpolicy_controller.go +++ b/internal/controller/remediationpolicy_controller.go @@ -147,9 +147,9 @@ func (r *RemediationPolicyReconciler) Reconcile(ctx context.Context, req ctrl.Re } // ── Step 3: Query correlator for open incidents ── - var prsCreated int32 + var prsCreated, openPRs int32 if r.CorrelatorEngine != nil && r.RemediationEngine != nil { - prsCreated = r.processIncidents(ctx, policy, repo) + prsCreated, openPRs = r.processIncidents(ctx, policy, repo) } else { log.Info("Correlator or remediation engine not configured — skipping active remediation") } @@ -159,6 +159,10 @@ func (r *RemediationPolicyReconciler) Reconcile(ctx context.Context, req ctrl.Re policy.Status.Phase = zelyov1alpha1.PhaseActive policy.Status.LastRun = &now policy.Status.RemediationsApplied += prsCreated + // OpenPRs reflects the total count of open Zelyo-generated PRs in the + // target repo after this cycle: already-open PRs observed at the start + // plus any this cycle opened. + policy.Status.OpenPRs = openPRs + prsCreated policy.Status.ObservedGeneration = policy.Generation conditions.MarkTrue(&policy.Status.Conditions, zelyov1alpha1.ConditionReady, zelyov1alpha1.ReasonReconcileSuccess, @@ -179,17 +183,34 @@ func (r *RemediationPolicyReconciler) Reconcile(ctx context.Context, req ctrl.Re // processIncidents queries the correlator for open incidents, filters by severity, // generates remediation plans, and optionally submits PRs. +// +// Returns (prsCreated, openPRs) where openPRs is the number of Zelyo-generated +// PRs already open on the target repo *at the start of this cycle* — i.e. +// before any PR this cycle may have created. Callers combine them to derive +// status.openPRs. func (r *RemediationPolicyReconciler) processIncidents( ctx context.Context, policy *zelyov1alpha1.RemediationPolicy, repo *zelyov1alpha1.GitOpsRepository, -) int32 { +) (prsCreated, openPRs int32) { log := logf.FromContext(ctx) + // Parse repo owner/name from URL up front — it's used by the no-incidents + // short-circuit (for the openPRs snapshot) and by the main loop. + repoOwner, repoName := parseRepoURL(repo.Spec.URL) + + // ── Step 3: Initialize GitOps Engine from Secret ── + // Done before the no-incidents branch too, so even an idle policy with + // only a ListOpenPRs probe can use credentials from its repo's secret. + r.ensureGitOpsEngineFromSecret(ctx, repo, repoOwner, repoName) + incidents := r.CorrelatorEngine.GetOpenIncidents() if len(incidents) == 0 { log.Info("No open incidents found — nothing to remediate") - return 0 + // Even with no incidents, surface the current open-PR count to + // status so users can see it via `kubectl get remediationpolicy`. + openPRs, _ = r.snapshotOpenPRs(ctx, repoOwner, repoName) + return 0, openPRs } log.Info("Found open incidents", "count", len(incidents)) @@ -219,46 +240,50 @@ func (r *RemediationPolicyReconciler) processIncidents( } minSev := severityOrder[severityFilter] - // ── Step 3: Initialize GitOps Engine from Secret ── - r.initRemediationGitOps(ctx, repo) - // Respect MaxConcurrentPRs limit. maxPRs := policy.Spec.MaxConcurrentPRs if maxPRs == 0 { maxPRs = 5 } - // Parse repo owner/name from URL for PR submission. - repoOwner, repoName := parseRepoURL(repo.Spec.URL) + // Snapshot open Zelyo-generated PRs once. Feeds two concerns: + // - openPRs count → enforces the maxConcurrentPRs cap across reconciles + // (the headline fix of this PR) + // - existingBranches map → per-finding dedup in remediateIncident so we + // never push a second commit/PR against a branch that's already open + // (the fix originally added in #91) + openPRs, existingBranches := r.snapshotOpenPRs(ctx, repoOwner, repoName) + budget := maxPRs - openPRs + if budget <= 0 { + log.Info("MaxConcurrentPRs budget exhausted by already-open PRs — skipping", + "limit", maxPRs, "openPRs", openPRs) + return 0, openPRs + } - // One-shot dedup: snapshot the set of branches already backing an open - // Zelyo-authored PR, so we don't open a second PR for a finding whose - // last reconcile already produced one. This is the fix for "excessive - // PRs" — without it, every reconcile tick regenerated incidents for the - // same unfixed resource and opened another PR. - existingBranches := r.snapshotOpenPRBranches(ctx, repoOwner, repoName) - - // prsCreated counts real PRs opened this cycle and drives the status - // counter. processed counts every incident that consumed an LLM plan - // generation — whether the outcome was a new PR or a dryRun preview. - // The per-cycle budget must bound BOTH paths: without this, a policy - // with N open incidents and dryRun=true would fire N LLM calls per - // reconcile regardless of maxConcurrentPRs, burning tokens and pushing - // the reconcile toward its timeout. - var prsCreated, processed int32 + // Two counters: + // - processed drives the per-cycle budget — ticks for every incident + // that consumed an LLM plan generation, whether the outcome was a + // real PR or a dryRun preview. This bounds BOTH token cost and + // reconcile duration regardless of strategy. + // - prsCreated only ticks when a real PR was opened (result != nil), + // so status.openPRs stays accurate in audit / dryRun / report modes + // where ApplyPlan returns nil. + var processed int32 for _, incident := range incidents { - if processed >= maxPRs { - log.Info("MaxConcurrentPRs limit reached", "limit", maxPRs, "dryRun", policy.Spec.DryRun) + if processed >= budget { + log.Info("MaxConcurrentPRs budget reached this cycle", + "limit", maxPRs, "openPRs", openPRs, "createdThisCycle", prsCreated, + "dryRun", policy.Spec.DryRun) break } // Scope gate: when spec.targetPolicies is set, only remediate // incidents carrying at least one event from a listed // SecurityPolicy. Checked in the loop (not inside // remediateIncident) so filtered incidents are skipped without - // paying the dedup/ListOpenPRs cost and without ever calling + // paying the dedup cost and without ever calling // ResolveIncident on them — another RemediationPolicy may own - // that scope. Also not charged against MaxConcurrentPRs since - // no LLM call is made. + // that scope. Not charged against the budget since no LLM call + // is made. if targetSet != nil && !incidentMatchesTargets(incident, targetSet) { continue } @@ -272,24 +297,37 @@ func (r *RemediationPolicyReconciler) processIncidents( } } - return prsCreated + return prsCreated, openPRs } -// initRemediationGitOps resolves the GitOpsRepository's auth Secret and -// wires a PAT-backed GitHub engine into the remediation engine. A missing -// Secret, missing token, or missing AuthSecret field is silently skipped -// — the remediation engine then has no way to open PRs but the controller -// is still useful for dry-run/report strategies. Any hard failure is -// non-fatal for reconcile, so this returns nothing. Extracted from -// processIncidents to keep that function within the gocyclo budget. -func (r *RemediationPolicyReconciler) initRemediationGitOps( +// ensureGitOpsEngineFromSecret reads the repo's AuthSecret (if any) and, +// when a usable PAT/app token is present, builds a GitHub engine scoped to +// owner/name and registers it via RegisterGitOpsEngine. Using the repo-keyed +// registry (rather than SetGitOpsEngine, which mutates a process-wide +// fallback) keeps concurrent reconciles of RemediationPolicies targeting +// different repos from clobbering each other's credentials. +// +// The function is deliberately permissive: a missing secret, unreadable +// secret, or empty token silently leaves whatever engine is registered for +// this repo (including injected test engines) — there is no visible error +// condition because the surrounding reconciler handles missing creds by +// degrading gracefully to no-op remediation. +func (r *RemediationPolicyReconciler) ensureGitOpsEngineFromSecret( ctx context.Context, repo *zelyov1alpha1.GitOpsRepository, + owner, name string, ) { - log := logf.FromContext(ctx) if repo.Spec.AuthSecret == "" { return } + if owner == "" || name == "" { + // Cannot register per-repo without a key. Rather than fall back to + // SetGitOpsEngine (which would reintroduce the cross-reconcile + // race), leave the registry untouched and let the caller operate + // against whatever default was wired at engine construction. + return + } + log := logf.FromContext(ctx) secret := &corev1.Secret{} secretKey := types.NamespacedName{Name: repo.Spec.AuthSecret, Namespace: repo.Namespace} if err := r.Get(ctx, secretKey, secret); err != nil { @@ -304,45 +342,72 @@ func (r *RemediationPolicyReconciler) initRemediationGitOps( } ghClient := github.NewPATClient(token, "") ghEngine := github.NewEngine(ghClient, log.WithName("github-engine")) - r.RemediationEngine.SetGitOpsEngine(ghEngine) - log.Info("Successfully initialized GitOps engine for remediation", "repo", repo.Name) + r.RemediationEngine.RegisterGitOpsEngine(owner+"/"+name, ghEngine) + log.Info("Registered GitOps engine for remediation", "repo", repo.Name, "key", owner+"/"+name) } -// snapshotOpenPRBranches returns a branch → PR-URL map for the currently -// open, Zelyo-authored PRs against the given repo. A nil gitops engine, -// or a ListOpenPRs failure, yields an empty map (dedup degrades to -// no-dedup — the caller will open PRs even if duplicates already exist). -// Extracted from processIncidents to keep that function within the -// gocyclo budget. -func (r *RemediationPolicyReconciler) snapshotOpenPRBranches( +// snapshotOpenPRs queries the configured GitOps provider once for +// currently-open Zelyo-generated PRs on owner/repo and returns both views +// the caller needs: +// - count (int32): feeds the maxConcurrentPRs cap so it's honored across +// reconciles, not just within a single cycle. +// - branches (map[branch]URL): feeds remediateIncident's dedup check so we +// never push a second PR against a branch that already has one open. +// +// One ListOpenPRs call feeds both — they're inherently the same query. +// +// Errors are logged and treated as (0, empty map). Rationale for soft-fail +// (vs. returning an error to trigger requeue): the per-cycle loop bound +// already caps blast radius even when the snapshot is empty — we open at +// most maxPRs PRs per 5-minute cycle, not unbounded. A requeue storm across +// every RemediationPolicy during a GitHub outage would churn metrics and +// events without opening any PRs — net worse UX than soft-fail. +// +// When multiple RemediationPolicies target the same repo, they share the +// open-PR count (the cap is applied per repo, not per policy). Per-policy +// scoping requires PRTemplate.BranchPrefix to be both configurable and +// propagated into the branch name; BranchName currently hardcodes its +// prefix, so adding a prefix filter here would silently match zero PRs +// under the default config and re-break the cap we are fixing. +func (r *RemediationPolicyReconciler) snapshotOpenPRs( ctx context.Context, - repoOwner, repoName string, -) map[string]string { + owner, repo string, +) (count int32, branchesByName map[string]string) { log := logf.FromContext(ctx) - existing := map[string]string{} - ge := r.RemediationEngine.GitOpsEngineForRepo(repoOwner, repoName) + branchesByName = map[string]string{} + + if owner == "" || repo == "" || r.RemediationEngine == nil { + return 0, branchesByName + } + ge := r.RemediationEngine.GitOpsEngineForRepo(owner, repo) if ge == nil { - return existing + return 0, branchesByName } - open, err := ge.ListOpenPRs(ctx, repoOwner, repoName) + + existing, err := ge.ListOpenPRs(ctx, owner, repo) if err != nil { - log.Info("PR dedup skipped: ListOpenPRs failed", "error", err.Error()) - return existing + log.Error(err, "Failed to list open PRs — cap treats as zero, dedup disabled", + "owner", owner, "repo", repo) + return 0, branchesByName } - for _, pr := range open { - existing[pr.Branch] = pr.URL + for _, pr := range existing { + branchesByName[pr.Branch] = pr.URL } - return existing + //nolint:gosec // count bounded by ListOpenPRs pagination cap (1000 PRs). + return int32(len(existing)), branchesByName } -// remediateIncident handles the full severity-check → dedup → -// GeneratePlan → (dry-run preview | ApplyPlan) → resolve flow for a single -// incident. Factored out of processIncidents to keep each unit under the -// gocyclo threshold. The targetPolicies scope gate is applied by the -// caller (processIncidents), not here. +// remediateIncident handles the full severity-check → dedup → GeneratePlan +// → (dry-run preview | ApplyPlan) → resolve flow for a single incident. +// Factored out of processIncidents to keep each unit under the gocyclo +// threshold. The targetPolicies scope gate is applied by the caller +// (processIncidents), not here. // // Returns two flags so the caller can drive independent counters: -// - opened: a real PR was created (counts against status.RemediationsApplied) +// - opened: a real PR was created (counts against status.RemediationsApplied +// and status.OpenPRs). Only true when ApplyPlan returned a non-nil result +// — covers the engine-level StrategyDryRun / StrategyReport case where +// ApplyPlan returns (nil, nil) and we must NOT report a phantom PR. // - charged: this incident consumed an LLM plan generation (counts against // the per-cycle MaxConcurrentPRs budget — covers both real PRs and // dryRun previews, but NOT incidents skipped by severity or dedup since @@ -358,7 +423,7 @@ func (r *RemediationPolicyReconciler) remediateIncident( ) (opened, charged bool) { log := logf.FromContext(ctx) - // Severity filter. + // Severity filter — fast path, no cost, no budget consumption. incSev, ok := severityOrder[incident.Severity] if !ok || incSev > minSev { return false, false @@ -368,7 +433,7 @@ func (r *RemediationPolicyReconciler) remediateIncident( // Dedup: compute the branch name the PR would land on and skip if a // PR is already open for it. The remediation engine uses the same - // BranchName helper so the keys line up. + // BranchName helper so the keys line up. Fast path, no budget cost. branch := gitops.BranchName(finding.ResourceName, finding.ResourceNamespace, finding.Title) if existingURL, exists := existingBranches[branch]; exists { log.Info("Skipping remediation — open PR already exists", @@ -433,7 +498,13 @@ func (r *RemediationPolicyReconciler) remediateIncident( } r.CorrelatorEngine.ResolveIncident(incident.ID) - return true, true + // result==nil happens when the engine is in StrategyDryRun / StrategyReport + // (set by ZelyoConfig.spec.mode=audit, distinct from policy.Spec.DryRun). + // In that case: LLM was called (charged=true) but no PR was opened + // (opened=false). Guards Codex P2 — previously the success path + // returned (true, true) unconditionally and inflated status.openPRs + // under audit mode. + return result != nil, true } // incidentMatchesTargets reports whether the incident carries at least one diff --git a/internal/controller/remediationpolicy_controller_test.go b/internal/controller/remediationpolicy_controller_test.go index abc24a7..2e71274 100644 --- a/internal/controller/remediationpolicy_controller_test.go +++ b/internal/controller/remediationpolicy_controller_test.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "strings" + "sync/atomic" "time" "github.com/go-logr/logr" @@ -39,6 +40,57 @@ import ( "github.com/zelyo-ai/zelyo-operator/internal/remediation" ) +// budgetTestGitOpsEngine is a stub gitops.Engine for the maxConcurrentPRs +// budget + audit-mode tests. ListOpenPRs returns a fixed list to simulate +// PRs already open on the provider. CreatePullRequest records each call so +// tests can assert a dry-run / cap-exhausted path never calls it. +type budgetTestGitOpsEngine struct { + openPRs []gitops.PullRequestResult + createCalls atomic.Int32 +} + +func (f *budgetTestGitOpsEngine) CreatePullRequest(_ context.Context, pr *gitops.PullRequest) (*gitops.PullRequestResult, error) { + f.createCalls.Add(1) + return &gitops.PullRequestResult{ + Number: int(f.createCalls.Load()), + URL: "https://github.com/fake/repo/pull/" + pr.HeadBranch, + Branch: pr.HeadBranch, + CreatedAt: time.Now(), + }, nil +} + +func (f *budgetTestGitOpsEngine) GetFile(_ context.Context, _, _, _, _ string) ([]byte, error) { + return nil, nil +} + +func (f *budgetTestGitOpsEngine) ListOpenPRs(_ context.Context, _, _ string) ([]gitops.PullRequestResult, error) { + return f.openPRs, nil +} + +func (f *budgetTestGitOpsEngine) Close() error { return nil } + +// budgetTestLLMClient is a fake LLM used by the budget + audit-mode tests. +// The budget-exhausted path must not reach plan generation — the default +// (empty) response is zero-fixes JSON, which GeneratePlan rejects before +// ApplyPlan runs, so any bump to `calls` in that path exposes the +// regression. Tests that need a successful plan (e.g. audit-mode) set +// `response` to a valid-fixes JSON string. +type budgetTestLLMClient struct { + response string + calls atomic.Int32 +} + +func (f *budgetTestLLMClient) Complete(_ context.Context, _ llm.Request) (*llm.Response, error) { + f.calls.Add(1) + body := f.response + if body == "" { + body = `{"analysis":"x","fixes":[]}` + } + return &llm.Response{Content: body, Model: "fake"}, nil +} +func (f *budgetTestLLMClient) Provider() llm.Provider { return "fake" } +func (f *budgetTestLLMClient) Close() error { return nil } + var _ = Describe("RemediationPolicy Controller", func() { Context("When reconciling a resource", func() { const resourceName = "test-resource" @@ -348,6 +400,248 @@ var _ = Describe("RemediationPolicy Controller", func() { Expect(planFailures).To(HaveLen(2)) }) }) + + // Regression guard for the maxConcurrentPRs cap. Historically the cap + // was enforced as a per-reconcile-cycle bound only: prsCreated started + // at 0 every cycle and the loop broke when that counter hit the cap. + // With a 5-minute requeue and the same budget each cycle, a policy + // with maxConcurrentPRs: 3 could accumulate dozens of open PRs. + // + // Fix: count already-open PRs on the provider at the start of each + // cycle and subtract from the cap to get the per-cycle budget. When + // the provider already has `maxConcurrentPRs` open, no new PRs should + // be created until existing ones merge or close. + Context("When maxConcurrentPRs is already reached by open PRs", func() { + const ( + policyName = "budget-test-policy" + repoName = "budget-test-repo" + ns = "default" + ) + + ctx := context.Background() + policyKey := types.NamespacedName{Name: policyName, Namespace: ns} + repoKey := types.NamespacedName{Name: repoName, Namespace: ns} + + BeforeEach(func() { + By("creating a GitOpsRepository for the policy to target") + // AuthSecret references a Secret that does not exist. The + // controller tolerates the missing secret (silently skips + // GitOps-engine initialization from the Secret) which leaves + // our pre-registered fake gitops engine in place on the + // remediation engine — exactly what this test needs. + repo := &zelyov1alpha1.GitOpsRepository{ + ObjectMeta: metav1.ObjectMeta{Name: repoName, Namespace: ns}, + Spec: zelyov1alpha1.GitOpsRepositorySpec{ + URL: "https://github.com/zelyo-ai/budget-test.git", + Branch: "main", + Paths: []string{"."}, + Provider: "github", + AuthSecret: "nonexistent-secret", + }, + } + Expect(k8sClient.Create(ctx, repo)).To(Succeed()) + + By("creating a RemediationPolicy with maxConcurrentPRs=3") + policy := &zelyov1alpha1.RemediationPolicy{ + ObjectMeta: metav1.ObjectMeta{Name: policyName, Namespace: ns}, + Spec: zelyov1alpha1.RemediationPolicySpec{ + GitOpsRepository: repoName, + MaxConcurrentPRs: 3, + SeverityFilter: "high", + }, + } + Expect(k8sClient.Create(ctx, policy)).To(Succeed()) + }) + + AfterEach(func() { + policy := &zelyov1alpha1.RemediationPolicy{} + if err := k8sClient.Get(ctx, policyKey, policy); err == nil { + Expect(k8sClient.Delete(ctx, policy)).To(Succeed()) + } + repo := &zelyov1alpha1.GitOpsRepository{} + if err := k8sClient.Get(ctx, repoKey, repo); err == nil { + Expect(k8sClient.Delete(ctx, repo)).To(Succeed()) + } + }) + + It("does not create new PRs and records openPRs in status", func() { + By("priming the correlator with two events that form a high-severity incident") + corrEngine := correlator.NewEngine(&correlator.Config{CorrelationWindow: 5 * time.Minute}) + corrEngine.Ingest(&correlator.Event{ + Type: correlator.EventSecurityViolation, + Severity: "high", + Namespace: "prod", + Resource: "nginx", + Message: "Privileged container", + }) + incident := corrEngine.Ingest(&correlator.Event{ + Type: correlator.EventAnomaly, + Severity: "high", + Namespace: "prod", + Resource: "nginx", + Message: "Restart spike", + }) + Expect(incident).NotTo(BeNil(), + "correlator must surface an open incident so the controller enters the budget check") + + By("wiring a fake gitops engine that reports 3 already-open PRs") + fakeGit := &budgetTestGitOpsEngine{ + openPRs: []gitops.PullRequestResult{ + {Number: 11, Branch: "zelyo-operator/fix/a", URL: "https://example/1"}, + {Number: 12, Branch: "zelyo-operator/fix/b", URL: "https://example/2"}, + {Number: 13, Branch: "zelyo-operator/fix/c", URL: "https://example/3"}, + }, + } + fakeLLM := &budgetTestLLMClient{} + remEngine := remediation.NewEngine(fakeLLM, fakeGit, + remediation.EngineConfig{Strategy: remediation.StrategyGitOpsPR}, + logr.Discard()) + + controllerReconciler := &RemediationPolicyReconciler{ + Client: k8sClient, + Scheme: k8sClient.Scheme(), + Recorder: record.NewFakeRecorder(100), + CorrelatorEngine: corrEngine, + RemediationEngine: remEngine, + } + + By("reconciling the policy") + _, err := controllerReconciler.Reconcile(ctx, reconcile.Request{NamespacedName: policyKey}) + Expect(err).NotTo(HaveOccurred()) + + By("asserting no new PRs were created") + Expect(fakeGit.createCalls.Load()).To(Equal(int32(0)), + "CreatePullRequest must not run when the cap is already met by open PRs") + Expect(fakeLLM.calls.Load()).To(Equal(int32(0)), + "LLM plan generation must be skipped when the cap is already met") + + By("asserting the open incident was not resolved") + Expect(corrEngine.GetOpenIncidents()).To(HaveLen(1), + "the incident must remain open for a later cycle after existing PRs merge") + + By("asserting status.openPRs reflects the provider count") + var updated zelyov1alpha1.RemediationPolicy + Expect(k8sClient.Get(ctx, policyKey, &updated)).To(Succeed()) + Expect(updated.Status.OpenPRs).To(Equal(int32(3))) + Expect(updated.Status.RemediationsApplied).To(Equal(int32(0))) + }) + }) + + // Regression guard for audit-mode status accounting. In StrategyDryRun + // (and StrategyReport), remediation.Engine.ApplyPlan returns (nil, nil) + // — an incident is "charged" for budget purposes but no actual PR is + // created. Previously processIncidents returned (true, true) on the + // success path unconditionally, so `status.openPRs = openPRs + + // prsCreated` reported phantom PRs under audit mode. This test pins + // the fix: status.openPRs stays at the provider count and + // status.remediationsApplied does not change, even though the LLM ran. + // + // NOTE: exercises the engine-level StrategyDryRun path (set by + // ZelyoConfig.spec.mode=audit), NOT the per-policy policy.Spec.DryRun + // path covered by the "Dry-Run against a fake GitOps engine" suite — + // both paths must produce the same accurate status. + Context("When the remediation engine strategy is StrategyDryRun (audit mode)", func() { + const ( + policyName = "audit-test-policy" + repoName = "audit-test-repo" + ns = "default" + ) + ctx := context.Background() + policyKey := types.NamespacedName{Name: policyName, Namespace: ns} + repoKey := types.NamespacedName{Name: repoName, Namespace: ns} + + BeforeEach(func() { + Expect(k8sClient.Create(ctx, &zelyov1alpha1.GitOpsRepository{ + ObjectMeta: metav1.ObjectMeta{Name: repoName, Namespace: ns}, + Spec: zelyov1alpha1.GitOpsRepositorySpec{ + URL: "https://github.com/zelyo-ai/audit-test.git", + Branch: "main", + Paths: []string{"."}, + Provider: "github", + AuthSecret: "nonexistent-secret", + }, + })).To(Succeed()) + Expect(k8sClient.Create(ctx, &zelyov1alpha1.RemediationPolicy{ + ObjectMeta: metav1.ObjectMeta{Name: policyName, Namespace: ns}, + Spec: zelyov1alpha1.RemediationPolicySpec{ + GitOpsRepository: repoName, + MaxConcurrentPRs: 5, + SeverityFilter: "high", + // policy.Spec.DryRun intentionally false — we are + // testing the engine-level strategy, not the per-policy + // dry-run switch. + }, + })).To(Succeed()) + }) + + AfterEach(func() { + policy := &zelyov1alpha1.RemediationPolicy{} + if err := k8sClient.Get(ctx, policyKey, policy); err == nil { + Expect(k8sClient.Delete(ctx, policy)).To(Succeed()) + } + repo := &zelyov1alpha1.GitOpsRepository{} + if err := k8sClient.Get(ctx, repoKey, repo); err == nil { + Expect(k8sClient.Delete(ctx, repo)).To(Succeed()) + } + }) + + It("does not inflate status.openPRs or status.remediationsApplied", func() { + // One high-severity incident so processIncidents enters the loop. + corrEngine := correlator.NewEngine(&correlator.Config{CorrelationWindow: 5 * time.Minute}) + corrEngine.Ingest(&correlator.Event{ + Type: correlator.EventSecurityViolation, Severity: "high", + Namespace: "prod", Resource: "payments", Message: "privileged container", + }) + incident := corrEngine.Ingest(&correlator.Event{ + Type: correlator.EventAnomaly, Severity: "high", + Namespace: "prod", Resource: "payments", Message: "restart spike", + }) + Expect(incident).NotTo(BeNil()) + + // Provider has 1 unrelated Zelyo PR already open. After + // reconcile, status.openPRs MUST stay at 1 — not 2 — because + // audit mode doesn't open a real PR even though ApplyPlan + // returned nil-error. + fakeGit := &budgetTestGitOpsEngine{ + openPRs: []gitops.PullRequestResult{ + {Number: 1, Branch: "zelyo-operator/fix/preexisting", URL: "https://example/1"}, + }, + } + // LLM returns a valid plan so GeneratePlan succeeds and we + // reach ApplyPlan (which short-circuits in dry-run strategy). + fakeLLM := &budgetTestLLMClient{ + response: `{"analysis":"x","fixes":[{"file_path":"k8s/a.yaml","description":"d","patch":"apiVersion: v1","operation":"update"}]}`, + } + remEngine := remediation.NewEngine(fakeLLM, fakeGit, + remediation.EngineConfig{Strategy: remediation.StrategyDryRun}, + logr.Discard()) + + reconciler := &RemediationPolicyReconciler{ + Client: k8sClient, + Scheme: k8sClient.Scheme(), + Recorder: record.NewFakeRecorder(100), + CorrelatorEngine: corrEngine, + RemediationEngine: remEngine, + } + + _, err := reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: policyKey}) + Expect(err).NotTo(HaveOccurred()) + + By("asserting the LLM ran (incident was charged) but no real PR was created") + Expect(fakeLLM.calls.Load()).To(Equal(int32(1)), + "audit mode must still generate the plan so operators see the analysis") + Expect(fakeGit.createCalls.Load()).To(Equal(int32(0)), + "audit mode must NOT call CreatePullRequest") + + By("asserting status.openPRs reflects provider count only — no phantom PR") + var updated zelyov1alpha1.RemediationPolicy + Expect(k8sClient.Get(ctx, policyKey, &updated)).To(Succeed()) + Expect(updated.Status.OpenPRs).To(Equal(int32(1)), + "must equal the 1 pre-existing PR; audit mode must not inflate to 2") + Expect(updated.Status.RemediationsApplied).To(Equal(int32(0)), + "audit mode must not increment the lifetime PR counter") + }) + }) }) // drainEventsContaining non-destructively reads everything currently buffered diff --git a/internal/github/engine.go b/internal/github/engine.go index 34fcf67..98065a7 100644 --- a/internal/github/engine.go +++ b/internal/github/engine.go @@ -147,55 +147,78 @@ func (e *GitHubEngine) GetFile(ctx context.Context, owner, repo, path, ref strin return decoded[:n], nil } -// ListOpenPRs implements gitops.Engine.ListOpenPRs. +// ListOpenPRs implements gitops.Engine.ListOpenPRs by paginating over +// GitHub's list-PRs endpoint. A single page is 100 PRs (the endpoint's max); +// we keep requesting successive pages until a page comes back short or we +// hit listOpenPRsMaxPages. The cap guards against runaway calls if the +// provider ever misbehaves — at 100/page × 10 pages, callers see up to +// 1000 open PRs before the count saturates, which is far beyond any sane +// MaxConcurrentPRs configuration. func (e *GitHubEngine) ListOpenPRs(ctx context.Context, owner, repo string) ([]gitops.PullRequestResult, error) { - reqURL := fmt.Sprintf("%s/repos/%s/%s/pulls?state=open&per_page=100", e.baseURL, owner, repo) + var results []gitops.PullRequestResult + for page := 1; page <= listOpenPRsMaxPages; page++ { + reqURL := fmt.Sprintf("%s/repos/%s/%s/pulls?state=open&per_page=%d&page=%d", + e.baseURL, owner, repo, listOpenPRsPageSize, page) - body, err := e.doRequest(ctx, http.MethodGet, reqURL, nil) - if err != nil { - return nil, fmt.Errorf("listing open PRs: %w", err) - } + body, err := e.doRequest(ctx, http.MethodGet, reqURL, nil) + if err != nil { + return nil, fmt.Errorf("listing open PRs (page %d): %w", page, err) + } - var prs []struct { - Number int `json:"number"` - HTMLURL string `json:"html_url"` - Head struct { - Ref string `json:"ref"` - } `json:"head"` - CreatedAt time.Time `json:"created_at"` - Labels []struct { - Name string `json:"name"` - } `json:"labels"` - } - if err := json.Unmarshal(body, &prs); err != nil { - return nil, fmt.Errorf("decoding PRs response: %w", err) - } + var prs []struct { + Number int `json:"number"` + HTMLURL string `json:"html_url"` + Head struct { + Ref string `json:"ref"` + } `json:"head"` + CreatedAt time.Time `json:"created_at"` + Labels []struct { + Name string `json:"name"` + } `json:"labels"` + } + if err := json.Unmarshal(body, &prs); err != nil { + return nil, fmt.Errorf("decoding PRs response (page %d): %w", page, err) + } - // Filter to Zelyo Operator-created PRs. - var results []gitops.PullRequestResult - for _, pr := range prs { - isZelyoOperator := strings.HasPrefix(pr.Head.Ref, "zelyo-operator/") - if !isZelyoOperator { - for _, l := range pr.Labels { - if l.Name == "zelyo-operator" { - isZelyoOperator = true - break + // Filter to Zelyo Operator-created PRs. + for _, pr := range prs { + isZelyoOperator := strings.HasPrefix(pr.Head.Ref, "zelyo-operator/") + if !isZelyoOperator { + for _, l := range pr.Labels { + if l.Name == "zelyo-operator" { + isZelyoOperator = true + break + } } } + if isZelyoOperator { + results = append(results, gitops.PullRequestResult{ + Number: pr.Number, + URL: pr.HTMLURL, + Branch: pr.Head.Ref, + CreatedAt: pr.CreatedAt, + }) + } } - if isZelyoOperator { - results = append(results, gitops.PullRequestResult{ - Number: pr.Number, - URL: pr.HTMLURL, - Branch: pr.Head.Ref, - CreatedAt: pr.CreatedAt, - }) + + // Short page → we've drained results. Full page → try the next one. + if len(prs) < listOpenPRsPageSize { + return results, nil + } + if page == listOpenPRsMaxPages { + e.log.Info("ListOpenPRs page cap reached — count may be an undercount", + "owner", owner, "repo", repo, + "maxPages", listOpenPRsMaxPages, "pageSize", listOpenPRsPageSize) } } - return results, nil } +const ( + listOpenPRsPageSize = 100 // GitHub's max page size for list-PRs. + listOpenPRsMaxPages = 10 // Safety cap: 100 × 10 = 1000 PRs. +) + // Close implements gitops.Engine.Close. func (e *GitHubEngine) Close() error { e.http.CloseIdleConnections() diff --git a/internal/github/engine_test.go b/internal/github/engine_test.go index 8ac910b..54997d2 100644 --- a/internal/github/engine_test.go +++ b/internal/github/engine_test.go @@ -154,6 +154,112 @@ func TestGitHubEngine_ListOpenPRs(t *testing.T) { } } +// TestGitHubEngine_ListOpenPRs_Paginates asserts that ListOpenPRs walks +// successive pages until a short page arrives. Without pagination, the +// maxConcurrentPRs cap silently under-counts once a repo has >100 open +// Zelyo PRs — letting the controller exceed the cap. This guards against +// regressing to single-page behavior. +func TestGitHubEngine_ListOpenPRs_Paginates(t *testing.T) { + mux := http.NewServeMux() + var requests []string + + // Pages: 100, 100, 37. 237 PRs total, all Zelyo-branched. + pages := [][]map[string]interface{}{ + makePRPage(1, 100), + makePRPage(101, 100), + makePRPage(201, 37), + } + + mux.HandleFunc("GET /repos/o/r/pulls", func(w http.ResponseWriter, req *http.Request) { + requests = append(requests, req.URL.RawQuery) + pageParam := req.URL.Query().Get("page") + idx := 0 + if pageParam != "" { + fmt.Sscanf(pageParam, "%d", &idx) //nolint:errcheck + idx-- + } + if idx < 0 || idx >= len(pages) { + _ = json.NewEncoder(w).Encode([]map[string]interface{}{}) + return + } + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(pages[idx]) + }) + + server := httptest.NewServer(mux) + defer server.Close() + + engine := &GitHubEngine{ + http: server.Client(), + log: logr.Discard(), + baseURL: server.URL, + } + + prs, err := engine.ListOpenPRs(context.Background(), "o", "r") + if err != nil { + t.Fatalf("ListOpenPRs failed: %v", err) + } + if len(prs) != 237 { + t.Fatalf("expected 237 PRs after paginating 3 pages, got %d", len(prs)) + } + if got, want := len(requests), 3; got != want { + t.Errorf("expected %d HTTP calls (one per page), got %d (%v)", want, got, requests) + } +} + +// TestGitHubEngine_ListOpenPRs_PageCap asserts we stop after the safety +// cap when the provider keeps returning full pages forever. Without the +// cap a misbehaving provider could hang the controller indefinitely. +func TestGitHubEngine_ListOpenPRs_PageCap(t *testing.T) { + mux := http.NewServeMux() + var requests int + + mux.HandleFunc("GET /repos/o/r/pulls", func(w http.ResponseWriter, req *http.Request) { + requests++ + page := 1 + if p := req.URL.Query().Get("page"); p != "" { + fmt.Sscanf(p, "%d", &page) //nolint:errcheck + } + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(makePRPage((page-1)*100+1, 100)) + }) + + server := httptest.NewServer(mux) + defer server.Close() + + engine := &GitHubEngine{ + http: server.Client(), + log: logr.Discard(), + baseURL: server.URL, + } + + prs, err := engine.ListOpenPRs(context.Background(), "o", "r") + if err != nil { + t.Fatalf("ListOpenPRs failed: %v", err) + } + if requests != listOpenPRsMaxPages { + t.Errorf("expected exactly %d HTTP calls at the page cap, got %d", listOpenPRsMaxPages, requests) + } + if got, want := len(prs), listOpenPRsMaxPages*listOpenPRsPageSize; got != want { + t.Errorf("expected %d PRs at the page cap, got %d", want, got) + } +} + +func makePRPage(startNum, count int) []map[string]interface{} { + out := make([]map[string]interface{}, 0, count) + for i := 0; i < count; i++ { + n := startNum + i + out = append(out, map[string]interface{}{ + "number": n, + "html_url": fmt.Sprintf("https://github.com/o/r/pull/%d", n), + "head": map[string]string{"ref": fmt.Sprintf("zelyo-operator/fix-%d", n)}, + "created_at": time.Now().Format(time.RFC3339), + "labels": []map[string]string{}, + }) + } + return out +} + func TestGitHubEngine_GetFile(t *testing.T) { mux := http.NewServeMux() mux.HandleFunc("GET /repos/testowner/testrepo/contents/k8s/deployment.yaml", func(w http.ResponseWriter, _ *http.Request) { diff --git a/internal/remediation/engine.go b/internal/remediation/engine.go index 151abee..46e32f5 100644 --- a/internal/remediation/engine.go +++ b/internal/remediation/engine.go @@ -127,9 +127,11 @@ func (e *Engine) getGitOpsEngine(owner, repo string) gitops.Engine { return e.gitopsEngine } -// GitOpsEngineForRepo is the public accessor callers (RemediationPolicy -// controller) use to consult the gitops engine for read-only calls like -// ListOpenPRs — needed for PR dedup before ApplyPlan. +// GitOpsEngineForRepo is the public accessor the RemediationPolicy controller +// uses to consult the gitops engine for read-only calls (ListOpenPRs) — needed +// both for PR dedup before ApplyPlan and for counting open PRs toward the +// maxConcurrentPRs cap. Falls back to the default engine when no repo-specific +// engine is registered; returns nil when nothing is configured. func (e *Engine) GitOpsEngineForRepo(owner, repo string) gitops.Engine { return e.getGitOpsEngine(owner, repo) }