diff --git a/pkg/apis/api.kusion.io/v1/types.go b/pkg/apis/api.kusion.io/v1/types.go index 34b3d257d..cc541298c 100644 --- a/pkg/apis/api.kusion.io/v1/types.go +++ b/pkg/apis/api.kusion.io/v1/types.go @@ -741,6 +741,9 @@ const ( // ReleasePhaseApplying indicates the stage of applying. ReleasePhaseApplying ReleasePhase = "applying" + // ReleasePhaseRollbacking indicates the stage of rollbacking. + ReleasePhaseRollbacking ReleasePhase = "rollbacking" + // ReleasePhaseDestroying indicates the stage of destroying. ReleasePhaseDestroying ReleasePhase = "destroying" diff --git a/pkg/cmd/apply/apply.go b/pkg/cmd/apply/apply.go index 4a87ebd65..683ea7cc1 100644 --- a/pkg/cmd/apply/apply.go +++ b/pkg/cmd/apply/apply.go @@ -15,45 +15,27 @@ package apply import ( - "bytes" - "context" "errors" "fmt" - "io" "os" "path/filepath" - "reflect" "strings" "sync" - "time" - "github.com/liu-hm19/pterm" + "kusionstack.io/kusion/pkg/engine/apply" + applystate "kusionstack.io/kusion/pkg/engine/apply/state" + "github.com/spf13/cobra" - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - "k8s.io/apimachinery/pkg/watch" "k8s.io/cli-runtime/pkg/genericiooptions" "k8s.io/kubectl/pkg/util/templates" - "gopkg.in/yaml.v3" - apiv1 "kusionstack.io/kusion/pkg/apis/api.kusion.io/v1" - v1 "kusionstack.io/kusion/pkg/apis/status/v1" "kusionstack.io/kusion/pkg/cmd/generate" "kusionstack.io/kusion/pkg/cmd/preview" cmdutil "kusionstack.io/kusion/pkg/cmd/util" - "kusionstack.io/kusion/pkg/engine" - "kusionstack.io/kusion/pkg/engine/operation" - "kusionstack.io/kusion/pkg/engine/operation/models" - "kusionstack.io/kusion/pkg/engine/printers" "kusionstack.io/kusion/pkg/engine/release" - "kusionstack.io/kusion/pkg/engine/resource/graph" - "kusionstack.io/kusion/pkg/engine/runtime" - runtimeinit "kusionstack.io/kusion/pkg/engine/runtime/init" - "kusionstack.io/kusion/pkg/log" "kusionstack.io/kusion/pkg/util/i18n" - "kusionstack.io/kusion/pkg/util/kcl" "kusionstack.io/kusion/pkg/util/pretty" - "kusionstack.io/kusion/pkg/util/signal" "kusionstack.io/kusion/pkg/util/terminal" ) @@ -91,20 +73,6 @@ var ( kusion apply --port-forward=8080`) ) -// To handle the release phase update when panic occurs. -// Fixme: adopt a more centralized approach to manage the release update before exiting, instead of -// scattering them across different go-routines. -var ( - rel *apiv1.Release - gph *apiv1.Graph - relLock = &sync.Mutex{} - releaseCreated = false - releaseStorage release.Storage - portForwarded = false -) - -var errExit = errors.New("receive SIGTERM or SIGINT, exit cmd") - // ApplyFlags directly reflect the information that CLI is gathering via flags. They will be converted to // ApplyOptions, which reflect the runtime requirements for the command. // @@ -121,20 +89,6 @@ type ApplyFlags struct { genericiooptions.IOStreams } -// ApplyOptions defines flags and other configuration parameters for the `apply` command. -type ApplyOptions struct { - *preview.PreviewOptions - - SpecFile string - Yes bool - DryRun bool - Watch bool - Timeout int - PortForward int - - genericiooptions.IOStreams -} - // NewApplyFlags returns a default ApplyFlags func NewApplyFlags(ui *terminal.UI, streams genericiooptions.IOStreams) *ApplyFlags { return &ApplyFlags{ @@ -189,7 +143,6 @@ func (f *ApplyFlags) ToOptions() (*ApplyOptions, error) { o := &ApplyOptions{ PreviewOptions: previewOptions, - SpecFile: f.SpecFile, Yes: f.Yes, DryRun: f.DryRun, Watch: f.Watch, @@ -240,102 +193,40 @@ func (o *ApplyOptions) Validate(cmd *cobra.Command, args []string) error { return nil } -// Run executes the `apply` command. -func (o *ApplyOptions) Run() (err error) { - // update release to succeeded or failed - defer func() { - if !releaseCreated { - return - } - if err != nil { - release.UpdateReleasePhase(rel, apiv1.ReleasePhaseFailed, relLock) - // Join the errors if update apply release failed. - err = errors.Join([]error{err, release.UpdateApplyRelease(releaseStorage, rel, o.DryRun, relLock)}...) - } else { - release.UpdateReleasePhase(rel, apiv1.ReleasePhaseSucceeded, relLock) - err = release.UpdateApplyRelease(releaseStorage, rel, o.DryRun, relLock) - } - }() +func (o *ApplyOptions) prepareApply() (state *applystate.State, err error) { + defer cmdutil.RecoverErr(&err) - // set no style - if o.NoStyle { - pterm.DisableStyling() + // init apply state + state = &applystate.State{ + Metadata: &applystate.Metadata{ + Project: o.RefProject.Name, + Workspace: o.RefWorkspace.Name, + Stack: o.RefStack.Name, + }, + RelLock: &sync.Mutex{}, + PortForward: o.PortForward, + DryRun: o.DryRun, + Watch: o.Watch, + Ls: &applystate.LineSummary{}, } // create release - releaseStorage, err = o.Backend.ReleaseStorage(o.RefProject.Name, o.RefWorkspace.Name) + state.ReleaseStorage, err = o.Backend.ReleaseStorage(o.RefProject.Name, o.RefWorkspace.Name) if err != nil { return } - rel, err = release.NewApplyRelease(releaseStorage, o.RefProject.Name, o.RefStack.Name, o.RefWorkspace.Name) + + state.TargetRel, err = release.NewApplyRelease(state.ReleaseStorage, o.RefProject.Name, o.RefStack.Name, o.RefWorkspace.Name) if err != nil { return } + if !o.DryRun { - if err = releaseStorage.Create(rel); err != nil { + if err = state.CreateStorageRelease(state.TargetRel); err != nil { return } - releaseCreated = true } - // Prepare for the timeout timer. - // Fixme: adopt a more centralized approach to manage the gracefully exit interrupted by - // the SIGINT or SIGTERM, instead of scattering them across different go-routines. - var timer <-chan time.Time - errCh := make(chan error, 1) - defer close(errCh) - - // Wait for the SIGTERM or SIGINT. - go func() { - stopCh := signal.SetupSignalHandler() - <-stopCh - errCh <- errExit - }() - - go func() { - errCh <- o.run(rel, releaseStorage) - }() - - // Check whether the kusion apply command has timed out. - if o.Timeout > 0 { - timer = time.After(time.Second * time.Duration(o.Timeout)) - select { - case err = <-errCh: - if errors.Is(err, errExit) && portForwarded { - return nil - } - return err - case <-timer: - err = fmt.Errorf("failed to execute kusion apply as: timeout for %d seconds", o.Timeout) - if !releaseCreated { - return - } - release.UpdateReleasePhase(rel, apiv1.ReleasePhaseFailed, relLock) - err = errors.Join([]error{err, release.UpdateApplyRelease(releaseStorage, rel, o.DryRun, relLock)}...) - return err - } - } else { - err = <-errCh - if errors.Is(err, errExit) && portForwarded { - return nil - } - } - - return err -} - -// run executes the apply cmd after the release is created. -func (o *ApplyOptions) run(rel *apiv1.Release, releaseStorage release.Storage) (err error) { - defer func() { - if !releaseCreated { - return - } - if err != nil { - release.UpdateReleasePhase(rel, apiv1.ReleasePhaseFailed, relLock) - err = errors.Join([]error{err, release.UpdateApplyRelease(releaseStorage, rel, o.DryRun, relLock)}...) - } - }() - // build parameters parameters := make(map[string]string) for _, value := range o.PreviewOptions.Values { @@ -357,836 +248,31 @@ func (o *ApplyOptions) run(rel *apiv1.Release, releaseStorage release.Storage) ( // return immediately if no resource found in stack if spec == nil || len(spec.Resources) == 0 { fmt.Println(pretty.GreenBold("\nNo resource found in this stack.")) - return nil - } - - // update release phase to previewing - rel.Spec = spec - release.UpdateReleasePhase(rel, apiv1.ReleasePhasePreviewing, relLock) - if err = release.UpdateApplyRelease(releaseStorage, rel, o.DryRun, relLock); err != nil { - return - } - - // compute changes for preview - changes, err := preview.Preview(o.PreviewOptions, releaseStorage, rel.Spec, rel.State, o.RefProject, o.RefStack) - if err != nil { - return - } - - if allUnChange(changes) { - fmt.Println("All resources are reconciled. No diff found") - return nil - } - - // summary preview table - changes.Summary(o.IOStreams.Out, o.NoStyle) - - // detail detection - if o.Detail && o.All { - changes.OutputDiff("all") - if !o.Yes { - return nil - } - } - - // prompt - if !o.Yes { - for { - var input string - input, err = prompt(o.UI) - if err != nil { - return err - } - if input == "yes" { - break - } else if input == "details" { - var target string - target, err = changes.PromptDetails(o.UI) - if err != nil { - return err - } - changes.OutputDiff(target) - } else { - fmt.Println("Operation apply canceled") - return nil - } - } - } - - // update release phase to applying - release.UpdateReleasePhase(rel, apiv1.ReleasePhaseApplying, relLock) - if err = release.UpdateApplyRelease(releaseStorage, rel, o.DryRun, relLock); err != nil { - return - } - - // Get graph storage directory, create if not exist - graphStorage, err := o.Backend.GraphStorage(o.RefProject.Name, o.RefWorkspace.Name) - if err != nil { - return err - } - - // Try to get existing graph, use the graph if exists - if graphStorage.CheckGraphStorageExistence() { - gph, err = graphStorage.Get() - if err != nil { - return err - } - err = graph.ValidateGraph(gph) - if err != nil { - return err - } - // Put new resources from the generated spec to graph - gph, err = graph.GenerateGraph(spec.Resources, gph) - } else { - // Create a new graph to be used globally if no graph is stored in the storage - gph = &apiv1.Graph{ - Project: o.RefProject.Name, - Workspace: o.RefWorkspace.Name, - } - gph, err = graph.GenerateGraph(spec.Resources, gph) - } - if err != nil { - return err - } - - // start applying - fmt.Printf("\nStart applying diffs ...\n") - - // NOTE: release should be updated in the process of apply, so as to avoid the problem - // of being unable to update after being terminated by SIGINT or SIGTERM. - _, err = Apply(o, releaseStorage, rel, gph, changes) - if err != nil { - return - } - - // if dry run, print the hint - if o.DryRun { - fmt.Printf("\nNOTE: Currently running in the --dry-run mode, the above configuration does not really take effect\n") - return nil - } - - if o.PortForward > 0 { - fmt.Printf("\nStart port-forwarding ...\n") - portForwarded = true - if err = PortForward(o, rel.Spec); err != nil { - return - } + return state, nil } + // prepare target rel done + state.TargetRel.Spec = spec return } -// The Apply function will apply the resources changes through the execution kusion engine. -// You can customize the runtime of engine and the release releaseStorage through `runtime` and `releaseStorage` parameters. -func Apply( - o *ApplyOptions, - releaseStorage release.Storage, - rel *apiv1.Release, - gph *apiv1.Graph, - changes *models.Changes, -) (*apiv1.Release, error) { - var err error - // Update the release before exit. - defer func() { - if p := recover(); p != nil { - cmdutil.RecoverErr(&err) - log.Error(err) - } - if err != nil { - if !releaseCreated { - return - } - release.UpdateReleasePhase(rel, apiv1.ReleasePhaseFailed, relLock) - err = errors.Join([]error{err, release.UpdateApplyRelease(releaseStorage, rel, o.DryRun, relLock)}...) - } - - // Update graph and write to storage if not dry run. - if !o.DryRun { - // Use resources in the state to get resource Cloud ID. - for _, resource := range rel.State.Resources { - // Get information of each of the resources - info, err := graph.GetResourceInfo(&resource) - if err != nil { - return - } - // Update information of each of the resources. - graphResource := graph.FindGraphResourceByID(gph.Resources, resource.ID) - if graphResource != nil { - graphResource.CloudResourceID = info.CloudResourceID - graphResource.Type = info.ResourceType - graphResource.Name = info.ResourceName - } - } - // Get the directory to store the graph. - graphStorage, err := o.Backend.GraphStorage(o.RefProject.Name, o.RefWorkspace.Name) - if err != nil { - return - } - - // Update graph if exists, otherwise create a new graph file. - if graphStorage.CheckGraphStorageExistence() { - // No need to store resource index - graph.RemoveResourceIndex(gph) - err := graphStorage.Update(gph) - if err != nil { - return - } - } else { - graph.RemoveResourceIndex(gph) - err := graphStorage.Create(gph) - if err != nil { - return - } - } - } - }() - - // construct the apply operation - ac := &operation.ApplyOperation{ - Operation: models.Operation{ - Stack: changes.Stack(), - ReleaseStorage: releaseStorage, - MsgCh: make(chan models.Message), - IgnoreFields: o.IgnoreFields, - }, - } - - // Init a watch channel with a sufficient buffer when it is necessary to perform watching. - if o.Watch && !o.DryRun { - ac.WatchCh = make(chan string, 100) - } - - // line summary - var ls lineSummary - // Get the multi printer from UI option. - multi := o.UI.MultiPrinter - // Max length of resource ID for progressbar width. - maxLen := 0 - - // Prepare the writer to print the operation progress and results. - changesWriterMap := make(map[string]*pterm.SpinnerPrinter) - for _, key := range changes.Values() { - // Get the maximum length of the resource ID. - if len(key.ID) > maxLen { - maxLen = len(key.ID) - } - // Init a spinner printer for the resource to print the apply status. - changesWriterMap[key.ID], err = o.UI.SpinnerPrinter. - WithWriter(multi.NewWriter()). - Start(fmt.Sprintf("Pending %s", pterm.Bold.Sprint(key.ID))) - if err != nil { - return nil, fmt.Errorf("failed to init change step spinner printer: %v", err) - } - } - - // Init a writer for progressbar. - pbWriter := multi.NewWriter() - // progress bar, print dag walk detail - progressbar, err := o.UI.ProgressbarPrinter. - WithTotal(len(changes.StepKeys)). - WithWriter(pbWriter). - WithRemoveWhenDone(). - WithShowCount(false). - WithMaxWidth(maxLen + 32). - Start() - if err != nil { - return nil, err - } - - // The writer below is for operation error printing. - errWriter := multi.NewWriter() - - multi.WithUpdateDelay(time.Millisecond * 100) - multi.Start() - defer multi.Stop() - - // wait msgCh close - var wg sync.WaitGroup - // receive msg and print detail - go PrintApplyDetails( - ac, - &err, - &errWriter, - &wg, - changes, - changesWriterMap, - progressbar, - &ls, - o.DryRun, - o.Watch, - gph.Resources, - ) - - watchErrCh := make(chan error) - // Apply while watching the resources. - if o.Watch && !o.DryRun { - Watch( - ac, - changes, - &err, - o.DryRun, - watchErrCh, - multi, - changesWriterMap, - gph, - ) - } - - var updatedRel *apiv1.Release - if o.DryRun { - for _, r := range rel.Spec.Resources { - ac.MsgCh <- models.Message{ - ResourceID: r.ResourceKey(), - OpResult: models.Success, - OpErr: nil, - } - } - close(ac.MsgCh) - } else { - // parse cluster in arguments - rsp, st := ac.Apply(&operation.ApplyRequest{ - Request: models.Request{ - Project: changes.Project(), - Stack: changes.Stack(), - }, - Release: rel, - Graph: gph, - }) - if v1.IsErr(st) { - errWriter.(*bytes.Buffer).Reset() - err = fmt.Errorf("apply failed, status:\n%v", st) - return nil, err - } - // Update the release with that in the apply response if not dryrun. - updatedRel = rsp.Release - *rel = *updatedRel - gph = rsp.Graph - } - - // wait for msgCh closed - wg.Wait() - // Wait for watchWg closed if need to perform watching. - if o.Watch && !o.DryRun { - shouldBreak := false - for !shouldBreak { - select { - case watchErr := <-watchErrCh: - if watchErr != nil { - return nil, watchErr - } - shouldBreak = true - default: - continue - } - } - } - - // print summary - pterm.Fprintln(pbWriter, fmt.Sprintf("\nApply complete! Resources: %d created, %d updated, %d deleted.", ls.created, ls.updated, ls.deleted)) - return updatedRel, nil -} - -// PrintApplyDetails function will receive the messages of the apply operation and print the details. -// Fixme: abstract the input variables into a struct. -func PrintApplyDetails( - ac *operation.ApplyOperation, - err *error, - errWriter *io.Writer, - wg *sync.WaitGroup, - changes *models.Changes, - changesWriterMap map[string]*pterm.SpinnerPrinter, - progressbar *pterm.ProgressbarPrinter, - ls *lineSummary, - dryRun bool, - watch bool, - gphResources *apiv1.GraphResources, -) { - defer func() { - if p := recover(); p != nil { - cmdutil.RecoverErr(err) - log.Error(*err) - } - if *err != nil { - if !releaseCreated { - return - } - release.UpdateReleasePhase(rel, apiv1.ReleasePhaseFailed, relLock) - *err = errors.Join([]error{*err, release.UpdateApplyRelease(releaseStorage, rel, dryRun, relLock)}...) - } - (*errWriter).(*bytes.Buffer).Reset() - }() - wg.Add(1) - - for { - select { - // Get operation results from the message channel. - case msg, ok := <-ac.MsgCh: - if !ok { - wg.Done() - return - } - changeStep := changes.Get(msg.ResourceID) - - // Update the progressbar and spinner printer according to the operation result. - switch msg.OpResult { - case models.Success, models.Skip: - var title string - if changeStep.Action == models.UnChanged { - title = fmt.Sprintf("Skipped %s", pterm.Bold.Sprint(changeStep.ID)) - changesWriterMap[msg.ResourceID].Success(title) - } else { - if watch && !dryRun { - title = fmt.Sprintf("%s %s", - changeStep.Action.Ing(), - pterm.Bold.Sprint(changeStep.ID), - ) - changesWriterMap[msg.ResourceID].UpdateText(title) - } else { - changesWriterMap[msg.ResourceID].Success(fmt.Sprintf("Succeeded %s", pterm.Bold.Sprint(msg.ResourceID))) - } - } - - // Update resource status - if !dryRun && changeStep.Action != models.UnChanged { - gphResource := graph.FindGraphResourceByID(gphResources, msg.ResourceID) - if gphResource != nil { - // Delete resource from the graph if it's deleted during apply - if changeStep.Action == models.Delete { - graph.RemoveResource(gph, gphResource) - } else { - gphResource.Status = apiv1.ApplySucceed - } - } - } - - progressbar.Increment() - ls.Count(changeStep.Action) - case models.Failed: - title := fmt.Sprintf("Failed %s", pterm.Bold.Sprint(changeStep.ID)) - changesWriterMap[msg.ResourceID].Fail(title) - errStr := pretty.ErrorT.Sprintf("apply %s failed as: %s\n", msg.ResourceID, msg.OpErr.Error()) - pterm.Fprintln(*errWriter, errStr) - if !dryRun { - // Update resource status, in case anything like update fail happened - gphResource := graph.FindGraphResourceByID(gphResources, msg.ResourceID) - if gphResource != nil { - gphResource.Status = apiv1.ApplyFail - } - } - default: - title := fmt.Sprintf("%s %s", - changeStep.Action.Ing(), - pterm.Bold.Sprint(changeStep.ID), - ) - changesWriterMap[msg.ResourceID].UpdateText(title) - } - } - } -} - -// Watch function will watch the changed Kubernetes and Terraform resources. -// Fixme: abstract the input variables into a struct. -func Watch( - ac *operation.ApplyOperation, - changes *models.Changes, - err *error, - dryRun bool, - watchErrCh chan error, - multi *pterm.MultiPrinter, - changesWriterMap map[string]*pterm.SpinnerPrinter, - gph *apiv1.Graph, -) { - resourceMap := make(map[string]apiv1.Resource) - ioWriterMap := make(map[string]io.Writer) - toBeWatched := apiv1.Resources{} - - // Get the resources to be watched. - for _, res := range rel.Spec.Resources { - if changes.ChangeOrder.ChangeSteps[res.ResourceKey()].Action != models.UnChanged { - resourceMap[res.ResourceKey()] = res - toBeWatched = append(toBeWatched, res) - } - } - - go func() { - defer func() { - if p := recover(); p != nil { - cmdutil.RecoverErr(err) - log.Error(*err) - } - if *err != nil { - if !releaseCreated { - return - } - release.UpdateReleasePhase(rel, apiv1.ReleasePhaseFailed, relLock) - _ = release.UpdateApplyRelease(releaseStorage, rel, dryRun, relLock) - } - - watchErrCh <- *err - }() - // Init the runtimes according to the resource types. - runtimes, s := runtimeinit.Runtimes(*rel.Spec, *rel.State) - if v1.IsErr(s) { - panic(fmt.Errorf("failed to init runtimes: %s", s.String())) - } - - // Prepare the tables for printing the details of the resources. - tables := make(map[string]*printers.Table, len(toBeWatched)) - ticker := time.NewTicker(time.Millisecond * 100) - defer ticker.Stop() - - // Record the watched and finished resources. - watchedIDs := []string{} - finished := make(map[string]bool) - - for !(len(finished) == len(toBeWatched)) { - select { - // Get the resource ID to be watched. - case id := <-ac.WatchCh: - res := resourceMap[id] - // Set the timeout duration for watch context, here we set an experiential value of 60 minutes. - ctx, cancel := context.WithTimeout(context.Background(), time.Minute*time.Duration(60)) - defer cancel() - - // Get the event channel for watching the resource. - rsp := runtimes[res.Type].Watch(ctx, &runtime.WatchRequest{Resource: &res}) - if rsp == nil { - log.Debug("unsupported resource type: %s", res.Type) - continue - } - if v1.IsErr(rsp.Status) { - panic(fmt.Errorf("failed to watch %s as %s", id, rsp.Status.String())) - } - - w := rsp.Watchers - table := printers.NewTable(w.IDs) - tables[id] = table - - // Setup a go-routine to concurrently watch K8s and TF resources. - if res.Type == apiv1.Kubernetes { - healthPolicy, kind := getResourceInfo(&res) - go watchK8sResources(id, kind, w.Watchers, table, tables, gph, dryRun, healthPolicy) - } else if res.Type == apiv1.Terraform { - go watchTFResources(id, w.TFWatcher, table, dryRun) - } else { - log.Debug("unsupported resource type to watch: %s", string(res.Type)) - continue - } - - // Record the io writer related to the resource ID. - ioWriterMap[id] = multi.NewWriter() - watchedIDs = append(watchedIDs, id) - - // Refresh the tables printing details of the resources to be watched. - default: - for _, id := range watchedIDs { - w, ok := ioWriterMap[id] - if !ok { - panic(fmt.Errorf("failed to get io writer while watching %s", id)) - } - printTable(&w, id, tables) - } - for id, table := range tables { - if finished[id] { - continue - } - - if table.AllCompleted() { - finished[id] = true - changesWriterMap[id].Success(fmt.Sprintf("Succeeded %s", pterm.Bold.Sprint(id))) - - // Update resource status to reconciled. - resource := graph.FindGraphResourceByID(gph.Resources, id) - if resource != nil { - resource.Status = apiv1.Reconciled - } - } - } - <-ticker.C - } - } - }() -} - -// PortForward function will forward the specified port from local to the project Kubernetes Service. -// -// Example: -// -// o := newApplyOptions() -// spec, err := generate.GenerateSpecWithSpinner(o.RefProject, o.RefStack, o.RefWorkspace, nil, o.NoStyle) -// -// if err != nil { -// return err -// } -// -// err = PortForward(o, spec) -// -// if err != nil { -// return err -// } -// -// Fixme: gracefully exit when interrupted by SIGINT or SIGTERM. -func PortForward( - o *ApplyOptions, - spec *apiv1.Spec, -) error { - if o.DryRun { - fmt.Println("NOTE: Portforward doesn't work in DryRun mode") - return nil - } - - // portforward operation - wo := &operation.PortForwardOperation{} - if err := wo.PortForward(&operation.PortForwardRequest{ - Spec: spec, - Port: o.PortForward, - }); err != nil { - return err - } - - fmt.Println("Portforward has been completed!") - return nil -} - -type lineSummary struct { - created, updated, deleted int -} - -func (ls *lineSummary) Count(op models.ActionType) { - switch op { - case models.Create: - ls.created++ - case models.Update: - ls.updated++ - case models.Delete: - ls.deleted++ - } -} +// Run executes the `apply` command. +func (o *ApplyOptions) Run() (err error) { + // prepare apply + applyState, err := o.prepareApply() -func allUnChange(changes *models.Changes) bool { - for _, v := range changes.ChangeSteps { - if v.Action != models.UnChanged { - return false + if err != nil && applyState != nil { + updateErr := applyState.UpdateReleasePhaseFailed() + if updateErr != nil { + err = errors.Join(err, updateErr) } } - return true -} - -func prompt(ui *terminal.UI) (string, error) { - // don`t display yes item when only preview - options := []string{"yes", "details", "no"} - input, err := ui.InteractiveSelectPrinter. - WithFilter(false). - WithDefaultText(`Do you want to apply these diffs?`). - WithOptions(options). - WithDefaultOption("details"). - // To gracefully exit if interrupted by SIGINT or SIGTERM. - WithOnInterruptFunc(func() { - release.UpdateReleasePhase(rel, apiv1.ReleasePhaseFailed, relLock) - release.UpdateApplyRelease(releaseStorage, rel, false, relLock) - os.Exit(1) - }). - Show() if err != nil { - fmt.Printf("Prompt failed: %v\n", err) - return "", err - } - - return input, nil -} - -func watchK8sResources( - id, kind string, - chs []<-chan watch.Event, - table *printers.Table, - tables map[string]*printers.Table, - gph *apiv1.Graph, - dryRun bool, - healthPolicy interface{}, -) { - defer func() { - var err error - if p := recover(); p != nil { - cmdutil.RecoverErr(&err) - log.Error(err) - } - if err != nil { - if !releaseCreated { - return - } - release.UpdateReleasePhase(rel, apiv1.ReleasePhaseFailed, relLock) - _ = release.UpdateApplyRelease(releaseStorage, rel, dryRun, relLock) - } - }() - - // Set resource status to `reconcile failed` before reconcile successfully. - resource := graph.FindGraphResourceByID(gph.Resources, id) - if resource != nil { - resource.Status = apiv1.ReconcileFail - } - - // Resources selects - cases := createSelectCases(chs) - // Default select - cases = append(cases, reflect.SelectCase{ - Dir: reflect.SelectDefault, - Chan: reflect.Value{}, - Send: reflect.Value{}, - }) - - for { - chosen, recv, recvOK := reflect.Select(cases) - if cases[chosen].Dir == reflect.SelectDefault { - continue - } - if recvOK { - e := recv.Interface().(watch.Event) - o := e.Object.(*unstructured.Unstructured) - var detail string - var ready bool - if e.Type == watch.Deleted { - detail = fmt.Sprintf("%s has beed deleted", o.GetName()) - ready = true - } else { - // Restore to actual type - target := printers.Convert(o) - // Check reconcile status with customized health policy for specific resource - if healthPolicy != nil && kind == o.GetObjectKind().GroupVersionKind().Kind { - if code, ok := kcl.ConvertKCLCode(healthPolicy); ok { - resByte, err := yaml.Marshal(o.Object) - if err != nil { - log.Error(err) - return - } - detail, ready = printers.PrintCustomizedHealthCheck(code, resByte) - } else { - detail, ready = printers.Generate(target) - } - } else { - // Check reconcile status with default setup - detail, ready = printers.Generate(target) - } - } - - // Mark ready for breaking loop - if ready { - e.Type = printers.READY - } - - // Save watched msg - table.Update( - engine.BuildIDForKubernetes(o), - printers.NewRow(e.Type, o.GetKind(), o.GetName(), detail)) - - // Write back - tables[id] = table - } - - // Break when completed - if table.AllCompleted() { - break - } - } -} - -func watchTFResources( - id string, - ch <-chan runtime.TFEvent, - table *printers.Table, - dryRun bool, -) { - defer func() { - var err error - if p := recover(); p != nil { - cmdutil.RecoverErr(&err) - log.Error(err) - } - if err != nil { - if !releaseCreated { - return - } - release.UpdateReleasePhase(rel, apiv1.ReleasePhaseFailed, relLock) - _ = release.UpdateApplyRelease(releaseStorage, rel, dryRun, relLock) - } - }() - - for { - parts := strings.Split(id, engine.Separator) - // A valid Terraform resource ID should consist of 4 parts, including the information of the provider type - // and resource name, for example: hashicorp:random:random_password:example-dev-kawesome. - if len(parts) != 4 { - panic(fmt.Errorf("invalid Terraform resource id: %s", id)) - } - - tfEvent := <-ch - if tfEvent == runtime.TFApplying { - table.Update( - id, - printers.NewRow(watch.EventType("Applying"), - strings.Join([]string{parts[1], parts[2]}, engine.Separator), parts[3], "Applying...")) - } else if tfEvent == runtime.TFSucceeded { - table.Update( - id, - printers.NewRow(printers.READY, - strings.Join([]string{parts[1], parts[2]}, engine.Separator), parts[3], "Apply succeeded")) - } else { - table.Update( - id, - printers.NewRow(watch.EventType("Failed"), - strings.Join([]string{parts[1], parts[2]}, engine.Separator), parts[3], "Apply failed")) - } - - // Break when all completed. - if table.AllCompleted() { - break - } - } -} - -func createSelectCases(chs []<-chan watch.Event) []reflect.SelectCase { - cases := make([]reflect.SelectCase, 0, len(chs)) - for _, ch := range chs { - cases = append(cases, reflect.SelectCase{ - Dir: reflect.SelectRecv, - Chan: reflect.ValueOf(ch), - }) - } - return cases -} - -func printTable(w *io.Writer, id string, tables map[string]*printers.Table) { - // Reset the buffer for live flushing. - (*w).(*bytes.Buffer).Reset() - - // Print resource Key as heading text - _, _ = fmt.Fprintln(*w, pretty.LightCyanBold("[%s]", id)) - - table, ok := tables[id] - if !ok { - // Unsupported resource, leave a hint - _, _ = fmt.Fprintln(*w, "Skip monitoring unsupported resources") - } else { - // Print table - data := table.Print() - _ = pterm.DefaultTable. - WithStyle(pterm.NewStyle(pterm.FgDefault)). - WithHeaderStyle(pterm.NewStyle(pterm.FgDefault)). - WithHasHeader().WithSeparator(" ").WithData(data).WithWriter(*w).Render() + return } -} -// getResourceInfo get health policy and kind from resource for customized health check purpose -func getResourceInfo(res *apiv1.Resource) (healthPolicy interface{}, kind string) { - var ok bool - if res.Extensions != nil { - healthPolicy = res.Extensions[apiv1.FieldHealthPolicy] - } - if res.Attributes == nil { - panic(fmt.Errorf("resource has no Attributes field in the Spec: %s", res)) - } - if kind, ok = res.Attributes[apiv1.FieldKind].(string); !ok { - panic(fmt.Errorf("failed to get kind from resource attributes: %s", res.Attributes)) - } - return healthPolicy, kind + // apply action + err = apply.Apply(o, applyState) + return } diff --git a/pkg/cmd/apply/apply_options.go b/pkg/cmd/apply/apply_options.go new file mode 100644 index 000000000..9ceb1ec5c --- /dev/null +++ b/pkg/cmd/apply/apply_options.go @@ -0,0 +1,39 @@ +package apply + +import ( + "k8s.io/cli-runtime/pkg/genericiooptions" + "kusionstack.io/kusion/pkg/cmd/preview" +) + +// ApplyOptions defines flags and other configuration parameters for the `apply` command. +type ApplyOptions struct { + *preview.PreviewOptions + + Yes bool + DryRun bool + Watch bool + Timeout int + PortForward int + + genericiooptions.IOStreams +} + +func (o *ApplyOptions) GetYes() bool { + return o.Yes +} + +func (o *ApplyOptions) GetDryRun() bool { + return o.DryRun +} + +func (o *ApplyOptions) GetWatch() bool { + return o.Watch +} + +func (o *ApplyOptions) GetTimeout() int { + return o.Timeout +} + +func (o *ApplyOptions) GetPortForward() int { + return o.PortForward +} diff --git a/pkg/cmd/apply/apply_test.go b/pkg/cmd/apply/apply_test.go index 9f079530d..0427f02df 100644 --- a/pkg/cmd/apply/apply_test.go +++ b/pkg/cmd/apply/apply_test.go @@ -15,37 +15,9 @@ package apply import ( - "bytes" "context" - "errors" - "io" - "testing" - "time" - "github.com/bytedance/mockey" - "github.com/liu-hm19/pterm" - "github.com/stretchr/testify/assert" - corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - "k8s.io/apimachinery/pkg/watch" - - apiv1 "kusionstack.io/kusion/pkg/apis/api.kusion.io/v1" - v1 "kusionstack.io/kusion/pkg/apis/status/v1" - "kusionstack.io/kusion/pkg/backend/storages" - "kusionstack.io/kusion/pkg/cmd/generate" - "kusionstack.io/kusion/pkg/cmd/meta" - "kusionstack.io/kusion/pkg/cmd/preview" - "kusionstack.io/kusion/pkg/engine" - "kusionstack.io/kusion/pkg/engine/operation" - "kusionstack.io/kusion/pkg/engine/operation/models" - "kusionstack.io/kusion/pkg/engine/printers" - releasestorages "kusionstack.io/kusion/pkg/engine/release/storages" - "kusionstack.io/kusion/pkg/engine/resource/graph" - graphstorages "kusionstack.io/kusion/pkg/engine/resource/graph/storages" "kusionstack.io/kusion/pkg/engine/runtime" - "kusionstack.io/kusion/pkg/engine/runtime/kubernetes" - "kusionstack.io/kusion/pkg/util/terminal" - workspacestorages "kusionstack.io/kusion/pkg/workspace/storages" ) var _ runtime.Runtime = (*fakerRuntime)(nil) @@ -83,566 +55,3 @@ func (f *fakerRuntime) Delete(_ context.Context, _ *runtime.DeleteRequest) *runt func (f *fakerRuntime) Watch(_ context.Context, _ *runtime.WatchRequest) *runtime.WatchResponse { return nil } - -var ( - proj = &apiv1.Project{ - Name: "fake-proj", - } - stack = &apiv1.Stack{ - Name: "fake-stack", - } - workspace = &apiv1.Workspace{ - Name: "fake-workspace", - } -) - -func newApplyOptions() *ApplyOptions { - return &ApplyOptions{ - PreviewOptions: &preview.PreviewOptions{ - MetaOptions: &meta.MetaOptions{ - RefProject: proj, - RefStack: stack, - RefWorkspace: workspace, - Backend: &storages.LocalStorage{}, - }, - Detail: false, - All: false, - NoStyle: false, - Output: "", - IgnoreFields: nil, - UI: terminal.DefaultUI(), - }, - } -} - -func mockGenerateSpecWithSpinner() { - mockey.Mock(generate.GenerateSpecWithSpinner).To(func( - project *apiv1.Project, - stack *apiv1.Stack, - workspace *apiv1.Workspace, - parameters map[string]string, - ui *terminal.UI, - noStyle bool, - ) (*apiv1.Spec, error) { - return &apiv1.Spec{Resources: []apiv1.Resource{sa1, sa2, sa3}}, nil - }).Build() -} - -func mockPatchNewKubernetesRuntime() *mockey.Mocker { - return mockey.Mock(kubernetes.NewKubernetesRuntime).To(func() (runtime.Runtime, error) { - return &fakerRuntime{}, nil - }).Build() -} - -func mockPatchOperationPreview() *mockey.Mocker { - return mockey.Mock((*operation.PreviewOperation).Preview).To(func( - *operation.PreviewOperation, - *operation.PreviewRequest, - ) (rsp *operation.PreviewResponse, s v1.Status) { - return &operation.PreviewResponse{ - Order: &models.ChangeOrder{ - StepKeys: []string{sa1.ID, sa2.ID, sa3.ID}, - ChangeSteps: map[string]*models.ChangeStep{ - sa1.ID: { - ID: sa1.ID, - Action: models.Create, - From: &sa1, - }, - sa2.ID: { - ID: sa2.ID, - Action: models.UnChanged, - From: &sa2, - }, - sa3.ID: { - ID: sa3.ID, - Action: models.Undefined, - From: &sa1, - }, - }, - }, - }, nil - }).Build() -} - -func mockWorkspaceStorage() { - mockey.Mock((*storages.LocalStorage).WorkspaceStorage).Return(&workspacestorages.LocalStorage{}, nil).Build() -} - -func mockReleaseStorage() { - mockey.Mock((*storages.LocalStorage).ReleaseStorage).Return(&releasestorages.LocalStorage{}, nil).Build() - mockey.Mock((*releasestorages.LocalStorage).Create).Return(nil).Build() - mockey.Mock((*releasestorages.LocalStorage).Update).Return(nil).Build() - mockey.Mock((*releasestorages.LocalStorage).GetLatestRevision).Return(0).Build() - mockey.Mock((*releasestorages.LocalStorage).Get).Return(&apiv1.Release{State: &apiv1.State{}, Phase: apiv1.ReleasePhaseSucceeded}, nil).Build() -} - -func mockGraphStorage() { - mockey.Mock((*storages.LocalStorage).GraphStorage).Return(&graphstorages.LocalStorage{}, nil).Build() - mockey.Mock((*graphstorages.LocalStorage).Create).Return(nil).Build() - mockey.Mock((*graphstorages.LocalStorage).Delete).Return(nil).Build() - mockey.Mock((*graphstorages.LocalStorage).Update).Return(nil).Build() - mockey.Mock((*graphstorages.LocalStorage).Get).Return(&apiv1.Graph{ - Project: "", - Workspace: "", - Resources: &apiv1.GraphResources{ - WorkloadResources: map[string]*apiv1.GraphResource{}, - DependencyResources: map[string]*apiv1.GraphResource{}, - OtherResources: map[string]*apiv1.GraphResource{}, - ResourceIndex: map[string]*apiv1.ResourceEntry{}, - }, - }, nil).Build() -} - -func TestApplyOptions_Run(t *testing.T) { - mockey.PatchConvey("DryRun is true", t, func() { - mockGenerateSpecWithSpinner() - mockPatchNewKubernetesRuntime() - mockPatchOperationPreview() - mockWorkspaceStorage() - mockReleaseStorage() - mockOperationApply(models.Success) - - o := newApplyOptions() - o.DryRun = true - mockPromptOutput("yes") - err := o.Run() - assert.Nil(t, err) - }) -} - -const ( - apiVersion = "v1" - kind = "ServiceAccount" - namespace = "test-ns" -) - -var ( - sa1 = newSA("sa1") - sa2 = newSA("sa2") - sa3 = newSA("sa3") -) - -func newSA(name string) apiv1.Resource { - return apiv1.Resource{ - ID: engine.BuildID(apiVersion, kind, namespace, name), - Type: "Kubernetes", - Attributes: map[string]interface{}{ - "apiVersion": apiVersion, - "kind": kind, - "metadata": map[string]interface{}{ - "name": name, - "namespace": namespace, - }, - }, - } -} - -func TestApply(t *testing.T) { - loc, _ := time.LoadLocation("Asia/Shanghai") - mockey.PatchConvey("dry run", t, func() { - mockey.Mock((*releasestorages.LocalStorage).Update).Return(nil).Build() - rel := &apiv1.Release{ - Project: "fake-project", - Workspace: "fake-workspace", - Revision: 1, - Stack: "fake-stack", - Spec: &apiv1.Spec{Resources: []apiv1.Resource{sa1}}, - State: &apiv1.State{}, - Phase: apiv1.ReleasePhaseApplying, - CreateTime: time.Date(2024, 5, 20, 19, 39, 0, 0, loc), - ModifiedTime: time.Date(2024, 5, 20, 19, 39, 0, 0, loc), - } - order := &models.ChangeOrder{ - StepKeys: []string{sa1.ID}, - ChangeSteps: map[string]*models.ChangeStep{ - sa1.ID: { - ID: sa1.ID, - Action: models.Create, - From: sa1, - }, - }, - } - - changes := models.NewChanges(proj, stack, order) - graph := &apiv1.Graph{} - o := newApplyOptions() - o.DryRun = true - _, err := Apply(o, &releasestorages.LocalStorage{}, rel, graph, changes) - assert.Nil(t, err) - }) - mockey.PatchConvey("apply success", t, func() { - mockOperationApply(models.Success) - mockey.Mock((*releasestorages.LocalStorage).Update).Return(nil).Build() - mockey.Mock((*storages.LocalStorage).GraphStorage).Return(&graphstorages.LocalStorage{}, nil).Build() - mockey.Mock((*graphstorages.LocalStorage).Create).Return(nil).Build() - // mockGraphStorage() - o := newApplyOptions() - rel := &apiv1.Release{ - Project: "fake-project", - Workspace: "fake-workspace", - Revision: 1, - Stack: "fake-stack", - Spec: &apiv1.Spec{Resources: []apiv1.Resource{sa1, sa2}}, - State: &apiv1.State{}, - Phase: apiv1.ReleasePhaseApplying, - CreateTime: time.Date(2024, 5, 20, 19, 39, 0, 0, loc), - ModifiedTime: time.Date(2024, 5, 20, 19, 39, 0, 0, loc), - } - order := &models.ChangeOrder{ - StepKeys: []string{sa1.ID, sa2.ID}, - ChangeSteps: map[string]*models.ChangeStep{ - sa1.ID: { - ID: sa1.ID, - Action: models.Create, - From: &sa1, - }, - sa2.ID: { - ID: sa2.ID, - Action: models.UnChanged, - From: &sa2, - }, - }, - } - - changes := models.NewChanges(proj, stack, order) - gph := &apiv1.Graph{ - Project: rel.Project, - Workspace: rel.Workspace, - } - graph.GenerateGraph(rel.Spec.Resources, gph) - _, err := Apply(o, &releasestorages.LocalStorage{}, rel, gph, changes) - assert.Nil(t, err) - }) - mockey.PatchConvey("apply failed", t, func() { - mockOperationApply(models.Failed) - mockey.Mock((*releasestorages.LocalStorage).Update).Return(nil).Build() - mockGraphStorage() - o := newApplyOptions() - rel := &apiv1.Release{ - Project: "fake-project", - Workspace: "fake-workspace", - Revision: 1, - Stack: "fake-stack", - Spec: &apiv1.Spec{Resources: []apiv1.Resource{sa1}}, - State: &apiv1.State{}, - Phase: apiv1.ReleasePhaseApplying, - CreateTime: time.Date(2024, 5, 20, 19, 39, 0, 0, loc), - ModifiedTime: time.Date(2024, 5, 20, 19, 39, 0, 0, loc), - } - order := &models.ChangeOrder{ - StepKeys: []string{sa1.ID}, - ChangeSteps: map[string]*models.ChangeStep{ - sa1.ID: { - ID: sa1.ID, - Action: models.Create, - From: &sa1, - }, - }, - } - changes := models.NewChanges(proj, stack, order) - gph := &apiv1.Graph{} - graph.GenerateGraph(rel.Spec.Resources, gph) - _, err := Apply(o, &releasestorages.LocalStorage{}, rel, gph, changes) - assert.NotNil(t, err) - }) -} - -func mockOperationApply(res models.OpResult) { - mockey.Mock((*operation.ApplyOperation).Apply).To( - func(o *operation.ApplyOperation, request *operation.ApplyRequest) (*operation.ApplyResponse, v1.Status) { - var err error - if res == models.Failed { - err = errors.New("mock error") - } - for _, r := range request.Release.Spec.Resources { - // ing -> $res - o.MsgCh <- models.Message{ - ResourceID: r.ResourceKey(), - OpResult: "", - OpErr: nil, - } - o.MsgCh <- models.Message{ - ResourceID: r.ResourceKey(), - OpResult: res, - OpErr: err, - } - } - close(o.MsgCh) - if res == models.Failed { - return nil, v1.NewErrorStatus(err) - } - return &operation.ApplyResponse{}, nil - }).Build() -} - -func mockPromptOutput(res string) { - mockey.Mock((*pterm.InteractiveSelectPrinter).Show).Return(res, nil).Build() -} - -func TestPrompt(t *testing.T) { - mockey.PatchConvey("prompt error", t, func() { - mockey.Mock((*pterm.InteractiveSelectPrinter).Show).Return("", errors.New("mock error")).Build() - _, err := prompt(terminal.DefaultUI()) - assert.NotNil(t, err) - }) - - mockey.PatchConvey("prompt yes", t, func() { - mockPromptOutput("yes") - _, err := prompt(terminal.DefaultUI()) - assert.Nil(t, err) - }) -} - -func TestWatchK8sResources(t *testing.T) { - t.Run("successfully apply default K8s resources", func(t *testing.T) { - id := "v1:Namespace:example" - chs := make([]<-chan watch.Event, 1) - events := []watch.Event{ - { - Type: watch.Added, - Object: &unstructured.Unstructured{ - Object: map[string]interface{}{ - "apiVersion": "v1", - "kind": "Namespace", - "metadata": map[string]interface{}{ - "name": "example", - }, - "spec": map[string]interface{}{}, - }, - }, - }, - { - Type: watch.Added, - Object: &unstructured.Unstructured{ - Object: map[string]interface{}{ - "apiVersion": "v1", - "kind": "Namespace", - "metadata": map[string]interface{}{ - "name": "example", - }, - "spec": map[string]interface{}{}, - "status": map[string]interface{}{ - "phase": corev1.NamespaceActive, - }, - }, - }, - }, - } - - out := make(chan watch.Event, 10) - for _, e := range events { - out <- e - } - chs[0] = out - table := &printers.Table{ - IDs: []string{id}, - Rows: map[string]*printers.Row{}, - } - tables := map[string]*printers.Table{ - id: table, - } - resource := &apiv1.GraphResource{ - ID: id, - Type: "", - Name: "", - CloudResourceID: "", - Status: "", - Dependents: []string{}, - Dependencies: []string{}, - } - gph := &apiv1.Graph{ - Project: "example project", - Workspace: "example workspace", - Resources: &apiv1.GraphResources{ - WorkloadResources: map[string]*apiv1.GraphResource{"id": resource}, - DependencyResources: map[string]*apiv1.GraphResource{}, - OtherResources: map[string]*apiv1.GraphResource{}, - ResourceIndex: map[string]*apiv1.ResourceEntry{}, - }, - } - graph.UpdateResourceIndex(gph.Resources) - watchK8sResources(id, "", chs, table, tables, gph, true, nil) - - assert.Equal(t, true, table.AllCompleted()) - }) - t.Run("successfully apply customized K8s resources", func(t *testing.T) { - id := "v1:Deployment:example" - chs := make([]<-chan watch.Event, 1) - events := []watch.Event{ - { - Type: watch.Added, - Object: &unstructured.Unstructured{ - Object: map[string]interface{}{ - "apiVersion": "v1", - "kind": "Deployment", - "metadata": map[string]interface{}{ - "name": "example", - "generation": 1, - }, - "spec": map[string]interface{}{}, - }, - }, - }, - } - - out := make(chan watch.Event, 10) - for _, e := range events { - out <- e - } - chs[0] = out - table := &printers.Table{ - IDs: []string{id}, - Rows: map[string]*printers.Row{}, - } - tables := map[string]*printers.Table{ - id: table, - } - var policyInterface interface{} - healthPolicy := map[string]interface{}{ - "health.kcl": "assert res.metadata.generation == 1", - } - policyInterface = healthPolicy - resource := &apiv1.GraphResource{ - ID: id, - Type: "", - Name: "", - CloudResourceID: "", - Status: "", - Dependents: []string{}, - Dependencies: []string{}, - } - gph := &apiv1.Graph{ - Project: "example project", - Workspace: "example workspace", - Resources: &apiv1.GraphResources{ - WorkloadResources: map[string]*apiv1.GraphResource{"id": resource}, - DependencyResources: map[string]*apiv1.GraphResource{}, - OtherResources: map[string]*apiv1.GraphResource{}, - ResourceIndex: map[string]*apiv1.ResourceEntry{}, - }, - } - graph.UpdateResourceIndex(gph.Resources) - watchK8sResources(id, "Deployment", chs, table, tables, gph, false, policyInterface) - - assert.Equal(t, true, table.AllCompleted()) - }) -} - -func TestWatchTFResources(t *testing.T) { - t.Run("successfully apply TF resources", func(t *testing.T) { - eventCh := make(chan runtime.TFEvent, 10) - events := []runtime.TFEvent{ - runtime.TFApplying, - runtime.TFApplying, - runtime.TFSucceeded, - } - for _, e := range events { - eventCh <- e - } - - id := "hashicorp:random:random_password:example-dev-kawesome" - table := &printers.Table{ - IDs: []string{id}, - Rows: map[string]*printers.Row{ - "hashicorp:random:random_password:example-dev-kawesome": {}, - }, - } - - watchTFResources(id, eventCh, table, true) - - assert.Equal(t, true, table.AllCompleted()) - }) -} - -func TestPrintTable(t *testing.T) { - w := io.Writer(bytes.NewBufferString("")) - id := "fake-resource-id" - tables := map[string]*printers.Table{ - "fake-resource-id": printers.NewTable([]string{ - "fake-resource-id", - }), - } - - t.Run("skip unsupported resources", func(t *testing.T) { - printTable(&w, "fake-fake-resource-id", tables) - assert.Contains(t, w.(*bytes.Buffer).String(), "Skip monitoring unsupported resources") - }) - - t.Run("update table", func(t *testing.T) { - printTable(&w, id, tables) - tableStr, err := pterm.DefaultTable. - WithStyle(pterm.NewStyle(pterm.FgDefault)). - WithHeaderStyle(pterm.NewStyle(pterm.FgDefault)). - WithHasHeader().WithSeparator(" ").WithData(tables[id].Print()).Srender() - - assert.Nil(t, err) - assert.Contains(t, w.(*bytes.Buffer).String(), tableStr) - }) -} - -func TestGetResourceInfo(t *testing.T) { - tests := []struct { - name string - resource *apiv1.Resource - expectedKind string - expectPanic bool - }{ - { - name: "with valid resource", - resource: &apiv1.Resource{ - Attributes: map[string]interface{}{ - apiv1.FieldKind: "Service", - }, - Extensions: map[string]interface{}{ - apiv1.FieldHealthPolicy: "policyValue", - }, - }, - expectedKind: "Service", - expectPanic: false, - }, - { - name: "with nil Attributes", - resource: &apiv1.Resource{ - Attributes: nil, - Extensions: map[string]interface{}{ - apiv1.FieldHealthPolicy: "policyValue", - }, - }, - expectPanic: true, - }, - { - name: "with non-string kind", - resource: &apiv1.Resource{ - Attributes: map[string]interface{}{ - apiv1.FieldKind: 123, - }, - Extensions: map[string]interface{}{ - apiv1.FieldHealthPolicy: "policyValue", - }, - }, - expectPanic: true, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - if tt.expectPanic { - defer func() { - if r := recover(); r == nil { - t.Errorf("expected panic for test case '%s', but got none", tt.name) - } - }() - } - - healthPolicy, kind := getResourceInfo(tt.resource) - if !tt.expectPanic { - if kind != tt.expectedKind { - t.Errorf("expected kind '%s', but got '%s'", tt.expectedKind, kind) - } - if healthPolicy != "policyValue" && !tt.expectPanic { - t.Errorf("expected healthPolicy to be 'policyValue', but got '%v'", healthPolicy) - } - } - }) - } -} diff --git a/pkg/cmd/cmd.go b/pkg/cmd/cmd.go index 722b87175..c40939f9d 100644 --- a/pkg/cmd/cmd.go +++ b/pkg/cmd/cmd.go @@ -146,7 +146,7 @@ Find more information at: https://www.kusionstack.io`), { Message: "Release Management Commands:", Commands: []*cobra.Command{ - rel.NewCmdRel(o.IOStreams), + rel.NewCmdRel(o.UI, o.IOStreams), }, }, } diff --git a/pkg/cmd/meta/meta.go b/pkg/cmd/meta/meta.go index 9cc335764..001142ffc 100644 --- a/pkg/cmd/meta/meta.go +++ b/pkg/cmd/meta/meta.go @@ -16,7 +16,6 @@ package meta import ( "github.com/spf13/cobra" - v1 "kusionstack.io/kusion/pkg/apis/api.kusion.io/v1" "kusionstack.io/kusion/pkg/backend" "kusionstack.io/kusion/pkg/project" @@ -35,21 +34,6 @@ type MetaFlags struct { WorkDir *string } -// MetaOptions are the meta-options that are available on all or most commands. -type MetaOptions struct { - // RefProject references the project for this CLI invocation. - RefProject *v1.Project - - // RefStack referenced the stack for this CLI invocation. - RefStack *v1.Stack - - // RefWorkspace referenced the workspace for this CLI invocation. - RefWorkspace *v1.Workspace - - // Backend referenced the target storage backend for this CLI invocation. - Backend backend.Backend -} - // NewMetaFlags provides default flags and values for use in other commands. func NewMetaFlags() *MetaFlags { workspace := "" diff --git a/pkg/cmd/meta/meta_options.go b/pkg/cmd/meta/meta_options.go new file mode 100644 index 000000000..384031646 --- /dev/null +++ b/pkg/cmd/meta/meta_options.go @@ -0,0 +1,37 @@ +package meta + +import ( + v1 "kusionstack.io/kusion/pkg/apis/api.kusion.io/v1" + "kusionstack.io/kusion/pkg/backend" +) + +// MetaOptions are the meta-options that are available on all or most commands. +type MetaOptions struct { + // RefProject references the project for this CLI invocation. + RefProject *v1.Project + + // RefStack referenced the stack for this CLI invocation. + RefStack *v1.Stack + + // RefWorkspace referenced the workspace for this CLI invocation. + RefWorkspace *v1.Workspace + + // Backend referenced the target storage backend for this CLI invocation. + Backend backend.Backend +} + +func (o *MetaOptions) GetRefProject() *v1.Project { + return o.RefProject +} + +func (o *MetaOptions) GetRefStack() *v1.Stack { + return o.RefStack +} + +func (o *MetaOptions) GetRefWorkspace() *v1.Workspace { + return o.RefWorkspace +} + +func (o *MetaOptions) GetBackend() backend.Backend { + return o.Backend +} diff --git a/pkg/cmd/preview/preview.go b/pkg/cmd/preview/preview.go index 72b68f1c4..e114942d3 100644 --- a/pkg/cmd/preview/preview.go +++ b/pkg/cmd/preview/preview.go @@ -21,21 +21,18 @@ import ( "path/filepath" "strings" + "kusionstack.io/kusion/pkg/engine/apply" + "github.com/liu-hm19/pterm" "github.com/spf13/cobra" "k8s.io/cli-runtime/pkg/genericiooptions" "k8s.io/kubectl/pkg/util/templates" apiv1 "kusionstack.io/kusion/pkg/apis/api.kusion.io/v1" - v1 "kusionstack.io/kusion/pkg/apis/status/v1" "kusionstack.io/kusion/pkg/cmd/generate" "kusionstack.io/kusion/pkg/cmd/meta" cmdutil "kusionstack.io/kusion/pkg/cmd/util" - "kusionstack.io/kusion/pkg/engine/operation" - "kusionstack.io/kusion/pkg/engine/operation/models" "kusionstack.io/kusion/pkg/engine/release" - "kusionstack.io/kusion/pkg/engine/runtime/terraform" - "kusionstack.io/kusion/pkg/log" "kusionstack.io/kusion/pkg/util/diff" "kusionstack.io/kusion/pkg/util/i18n" "kusionstack.io/kusion/pkg/util/pretty" @@ -91,23 +88,6 @@ type PreviewFlags struct { genericiooptions.IOStreams } -// PreviewOptions defines flags and other configuration parameters for the `preview` command. -type PreviewOptions struct { - *meta.MetaOptions - - Detail bool - All bool - NoStyle bool - Output string - SpecFile string - IgnoreFields []string - Values []string - - UI *terminal.UI - - genericiooptions.IOStreams -} - // NewPreviewFlags returns a default PreviewFlags func NewPreviewFlags(ui *terminal.UI, streams genericiooptions.IOStreams) *PreviewFlags { return &PreviewFlags{ @@ -262,7 +242,7 @@ func (o *PreviewOptions) Run() error { } // compute changes for preview - changes, err := Preview(o, storage, spec, state, o.RefProject, o.RefStack) + changes, err := apply.Preview(o, storage, spec, state, o.RefProject, o.RefStack) if err != nil { return err } @@ -313,72 +293,3 @@ func (o *PreviewOptions) Run() error { } return nil } - -// The Preview function calculates the upcoming actions of each resource -// through the execution Kusion Engine, and you can customize the -// runtime of engine and the state storage through `runtime` and -// `storage` parameters. -// -// Example: -// -// o := newPreviewOptions() -// stateStorage := &states.FileSystemState{ -// Path: filepath.Join(o.WorkDir, states.KusionState) -// } -// kubernetesRuntime, err := runtime.NewKubernetesRuntime() -// if err != nil { -// return err -// } -// -// changes, err := Preview(o, kubernetesRuntime, stateStorage, -// planResources, project, stack, os.Stdout) -// if err != nil { -// return err -// } -func Preview( - opts *PreviewOptions, - storage release.Storage, - planResources *apiv1.Spec, - priorResources *apiv1.State, - project *apiv1.Project, - stack *apiv1.Stack, -) (*models.Changes, error) { - log.Info("Start compute preview changes ...") - - // check and install terraform executable binary for - // resources with the type of Terraform. - tfInstaller := terraform.CLIInstaller{ - Intent: planResources, - } - if err := tfInstaller.CheckAndInstall(); err != nil { - return nil, err - } - - // construct the preview operation - pc := &operation.PreviewOperation{ - Operation: models.Operation{ - OperationType: models.ApplyPreview, - Stack: stack, - ReleaseStorage: storage, - IgnoreFields: opts.IgnoreFields, - ChangeOrder: &models.ChangeOrder{StepKeys: []string{}, ChangeSteps: map[string]*models.ChangeStep{}}, - }, - } - - log.Info("Start call pc.Preview() ...") - - // parse cluster in arguments - rsp, s := pc.Preview(&operation.PreviewRequest{ - Request: models.Request{ - Project: project, - Stack: stack, - }, - Spec: planResources, - State: priorResources, - }) - if v1.IsErr(s) { - return nil, fmt.Errorf("preview failed.\n%s", s.String()) - } - - return models.NewChanges(project, stack, rsp.Order), nil -} diff --git a/pkg/cmd/preview/preview_options.go b/pkg/cmd/preview/preview_options.go new file mode 100644 index 000000000..69a7a623f --- /dev/null +++ b/pkg/cmd/preview/preview_options.go @@ -0,0 +1,56 @@ +package preview + +import ( + "k8s.io/cli-runtime/pkg/genericiooptions" + "kusionstack.io/kusion/pkg/cmd/meta" + "kusionstack.io/kusion/pkg/util/terminal" +) + +// PreviewOptions defines flags and other configuration parameters for the `preview` command. +type PreviewOptions struct { + *meta.MetaOptions + + Detail bool + All bool + NoStyle bool + Output string + SpecFile string + IgnoreFields []string + Values []string + + UI *terminal.UI + + genericiooptions.IOStreams +} + +func (o *PreviewOptions) GetDetail() bool { + return o.Detail +} + +func (o *PreviewOptions) GetAll() bool { + return o.All +} + +func (o *PreviewOptions) GetNoStyle() bool { + return o.NoStyle +} + +func (o *PreviewOptions) GetOutput() string { + return o.Output +} + +func (o *PreviewOptions) GetSpecFile() string { + return o.SpecFile +} + +func (o *PreviewOptions) GetIgnoreFields() []string { + return o.IgnoreFields +} + +func (o *PreviewOptions) GetValues() []string { + return o.Values +} + +func (o *PreviewOptions) GetUI() *terminal.UI { + return o.UI +} diff --git a/pkg/cmd/preview/preview_test.go b/pkg/cmd/preview/preview_test.go index 9f39c7e75..cc2acc1b2 100644 --- a/pkg/cmd/preview/preview_test.go +++ b/pkg/cmd/preview/preview_test.go @@ -18,6 +18,8 @@ import ( "context" "testing" + "kusionstack.io/kusion/pkg/engine/apply" + "github.com/bytedance/mockey" "github.com/stretchr/testify/assert" @@ -188,7 +190,7 @@ func TestPreview(t *testing.T) { mockReleaseStorageOperation() o := &PreviewOptions{} - _, err := Preview(o, &releasestorages.LocalStorage{}, &apiv1.Spec{Resources: []apiv1.Resource{sa1, sa2, sa3}}, &apiv1.State{}, proj, stack) + _, err := apply.Preview(o, &releasestorages.LocalStorage{}, &apiv1.Spec{Resources: []apiv1.Resource{sa1, sa2, sa3}}, &apiv1.State{}, proj, stack) assert.Nil(t, err) }) } diff --git a/pkg/cmd/release/release.go b/pkg/cmd/release/release.go index d10eeae10..2dd8ed02c 100644 --- a/pkg/cmd/release/release.go +++ b/pkg/cmd/release/release.go @@ -6,6 +6,7 @@ import ( "k8s.io/kubectl/pkg/util/templates" cmdutil "kusionstack.io/kusion/pkg/cmd/util" "kusionstack.io/kusion/pkg/util/i18n" + "kusionstack.io/kusion/pkg/util/terminal" ) var relLong = i18n.T(` @@ -14,7 +15,7 @@ var relLong = i18n.T(` These commands help you observe and operate the Kusion release files of a Project in a Workspace. `) // NewCmdRel returns an initialized Command instance for 'release' sub command. -func NewCmdRel(streams genericiooptions.IOStreams) *cobra.Command { +func NewCmdRel(ui *terminal.UI, streams genericiooptions.IOStreams) *cobra.Command { cmd := &cobra.Command{ Use: "release", DisableFlagsInUseLine: true, @@ -23,7 +24,7 @@ func NewCmdRel(streams genericiooptions.IOStreams) *cobra.Command { Run: cmdutil.DefaultSubCommandRun(streams.ErrOut), } - cmd.AddCommand(NewCmdUnlock(streams), NewCmdList(streams), NewCmdShow(streams)) + cmd.AddCommand(NewCmdUnlock(streams), NewCmdList(streams), NewCmdShow(streams), NewCmdRollback(ui, streams)) return cmd } diff --git a/pkg/cmd/release/release_test.go b/pkg/cmd/release/release_test.go index 02004f74f..3fecf970e 100644 --- a/pkg/cmd/release/release_test.go +++ b/pkg/cmd/release/release_test.go @@ -11,7 +11,7 @@ func TestNewCmdRel(t *testing.T) { t.Run("successfully get release help", func(t *testing.T) { streams, _, _, _ := genericiooptions.NewTestIOStreams() - cmd := NewCmdRel(streams) + cmd := NewCmdRel(nil, streams) assert.NotNil(t, cmd) }) } diff --git a/pkg/cmd/release/rollback.go b/pkg/cmd/release/rollback.go new file mode 100644 index 000000000..2b23e38a8 --- /dev/null +++ b/pkg/cmd/release/rollback.go @@ -0,0 +1,205 @@ +package rel + +import ( + "errors" + "sync" + + applystate "kusionstack.io/kusion/pkg/engine/apply/state" + + "kusionstack.io/kusion/pkg/cmd/apply" + applyaction "kusionstack.io/kusion/pkg/engine/apply" + "kusionstack.io/kusion/pkg/util/terminal" + + "k8s.io/cli-runtime/pkg/genericiooptions" + "k8s.io/kubectl/pkg/util/templates" + + "kusionstack.io/kusion/pkg/engine/release" + "kusionstack.io/kusion/pkg/util/i18n" + + cmdutil "kusionstack.io/kusion/pkg/cmd/util" + + "github.com/spf13/cobra" +) + +var ( + rollbackShort = i18n.T("Rollback to a specific release of the current or specified stack") + + rollbackLong = i18n.T(` + Rollback to a specific release of the current or specified stack. + + This command reverts the current project in the current or a specified workspace + to the state of a specified release. + `) + + rollbackExample = i18n.T(` + # Rollback to the latest release of the current project in the current workspace + kusion release rollback + + # Rollback with specified work directory + kusion release rollback -w /path/to/workdir + + # Rollback to a specific release of the current project in the current workspace + kusion release rollback --revision=1 + + # Skip interactive approval of preview details before rollback + kusion release rollback --yes + + # Rollback without output style and color + kusion release rollback --no-style=true + + # Rollback without watching the resource changes and waiting for reconciliation + kusion release rollback --watch=false + + # Rollback with the specified timeout duration for kusion apply command, measured in second(s) + kusion release rollback --timeout=120 + + # Rollback with localhost port forwarding + kusion release rollback --port-forward=8080 + `) +) + +// RollbackFlags reflects the information that CLI is gathering via flags, +// which will be converted into RollbackOptions. +type RollbackFlags struct { + *apply.ApplyFlags + + Revision uint64 + + genericiooptions.IOStreams +} + +// RollbackOptions defines the configuration parameters for the `kusion release rollback` command. +type RollbackOptions struct { + *apply.ApplyOptions + + Revision uint64 + + genericiooptions.IOStreams +} + +// NewRollbackFlags returns a default RollbackFlags. +func NewRollbackFlags(ui *terminal.UI, streams genericiooptions.IOStreams) *RollbackFlags { + return &RollbackFlags{ + ApplyFlags: apply.NewApplyFlags(ui, streams), + IOStreams: streams, + } +} + +// NewCmdRollback creates the `kusion release rollback` command. +func NewCmdRollback(ui *terminal.UI, streams genericiooptions.IOStreams) *cobra.Command { + flags := NewRollbackFlags(ui, streams) + + cmd := &cobra.Command{ + Use: "rollback", + Short: rollbackShort, + Long: templates.LongDesc(rollbackLong), + Example: templates.Examples(rollbackExample), + RunE: func(cmd *cobra.Command, args []string) (err error) { + o, err := flags.ToOptions() + defer cmdutil.RecoverErr(&err) + cmdutil.CheckErr(err) + cmdutil.CheckErr(o.Validate(cmd, args)) + cmdutil.CheckErr(o.Run()) + + return + }, + } + + flags.AddFlags(cmd) + + return cmd +} + +// AddFlags adds flags for a RollbackOptions struct to the specified command. +func (f *RollbackFlags) AddFlags(cmd *cobra.Command) { + f.PreviewFlags.AddFlags(cmd) + + cmd.Flags().Uint64VarP(&f.Revision, "revision", "", 0, i18n.T("The revision number of the release to rollback to")) +} + +// ToOptions converts RollbackFlags to RollbackOptions. +func (f *RollbackFlags) ToOptions() (*RollbackOptions, error) { + // Convert preview options + applyOptions, err := f.ApplyFlags.ToOptions() + if err != nil { + return nil, err + } + + o := &RollbackOptions{ + ApplyOptions: applyOptions, + Revision: f.Revision, + IOStreams: f.IOStreams, + } + + return o, nil +} + +// Validate checks the provided options for the `kusion release rollback` command. +func (o *RollbackOptions) Validate(cmd *cobra.Command, args []string) error { + if len(args) != 0 { + return cmdutil.UsageErrorf(cmd, "Unexpected args: %v", args) + } + + if o.PortForward < 0 || o.PortForward > 65535 { + return cmdutil.UsageErrorf(cmd, "Invalid port number to forward: %d, must be between 1 and 65535", o.PortForward) + } + return nil +} + +func (o *RollbackOptions) prepareRollback() (state *applystate.State, err error) { + defer cmdutil.RecoverErr(&err) + + // init apply state + state = &applystate.State{ + Metadata: &applystate.Metadata{ + Project: o.RefProject.Name, + Workspace: o.RefWorkspace.Name, + Stack: o.RefStack.Name, + }, + RelLock: &sync.Mutex{}, + PortForward: o.PortForward, + DryRun: o.DryRun, + Watch: o.Watch, + CallbackRevision: o.Revision, + Ls: &applystate.LineSummary{}, + } + + // create release + state.ReleaseStorage, err = o.Backend.ReleaseStorage(o.RefProject.Name, o.RefWorkspace.Name) + if err != nil { + return + } + + state.TargetRel, err = release.NewRollbackRelease(state.ReleaseStorage, o.RefProject.Name, o.RefStack.Name, o.RefWorkspace.Name, o.Revision) + if err != nil { + return + } + + if !o.DryRun { + if err = state.CreateStorageRelease(state.TargetRel); err != nil { + return + } + } + return +} + +// Run executes the `release rollback` command. +func (o *RollbackOptions) Run() (err error) { + // prepare apply + applyState, err := o.prepareRollback() + + if err != nil && applyState != nil { + updateErr := applyState.UpdateReleasePhaseFailed() + if updateErr != nil { + err = errors.Join(err, updateErr) + } + } + + if err != nil { + return + } + + // apply action + err = applyaction.Apply(o.ApplyOptions, applyState) + return +} diff --git a/pkg/cmd/release/rollback_test.go b/pkg/cmd/release/rollback_test.go new file mode 100644 index 000000000..85e63570b --- /dev/null +++ b/pkg/cmd/release/rollback_test.go @@ -0,0 +1 @@ +package rel diff --git a/pkg/engine/apply/apply.go b/pkg/engine/apply/apply.go new file mode 100644 index 000000000..0fe61146a --- /dev/null +++ b/pkg/engine/apply/apply.go @@ -0,0 +1,443 @@ +package apply + +import ( + "bytes" + "errors" + "fmt" + "os" + "time" + + "github.com/liu-hm19/pterm" + apiv1 "kusionstack.io/kusion/pkg/apis/api.kusion.io/v1" + v1 "kusionstack.io/kusion/pkg/apis/status/v1" + cmdutil "kusionstack.io/kusion/pkg/cmd/util" + "kusionstack.io/kusion/pkg/engine/apply/options" + applystate "kusionstack.io/kusion/pkg/engine/apply/state" + "kusionstack.io/kusion/pkg/engine/operation" + "kusionstack.io/kusion/pkg/engine/operation/models" + "kusionstack.io/kusion/pkg/engine/resource/graph" + "kusionstack.io/kusion/pkg/log" + "kusionstack.io/kusion/pkg/util/signal" + "kusionstack.io/kusion/pkg/util/terminal" +) + +var errExit = errors.New("receive SIGTERM or SIGINT, exit cmd") + +// The Apply function will apply the resources changes through the execution kusion engine. +// You can customize the runtime of engine and the release releaseStorage through `runtime` and `releaseStorage` parameters. +func Apply( + o options.ApplyOptions, + state *applystate.State, +) (err error) { + // update release to succeeded or failed + defer func() { + if err != nil { + errUpdate := state.UpdateReleasePhaseFailed() + if errUpdate != nil { + // Join the errors if update apply release failed. + err = errors.Join(err, errUpdate) + } + log.Error(err) + } else { + err = state.UpdateReleasePhaseSucceeded() + if err != nil { + log.Error(err) + } + } + state.ExitClear() + }() + defer cmdutil.RecoverErr(&err) + + // set no style + if o.GetNoStyle() { + pterm.DisableStyling() + } + // Prepare for the timeout timer and error channel. + var timer <-chan time.Time + stopCh := signal.SetupSignalHandler() + + // Start the main task in a goroutine. + taskResult := make(chan error) + go task(o, state, taskResult) + + // If timeout is set, initialize the timer. + if o.GetTimeout() > 0 { + timer = time.After(time.Second * time.Duration(o.GetTimeout())) + } + + // Centralized event handling. + for { + select { + case <-stopCh: + // Handle SIGINT or SIGTERM + err = errExit + if state.PortForwarded { + return nil + } + return err + + case err = <-taskResult: + // Handle task completion + if errors.Is(err, errExit) && state.PortForwarded { + return nil + } + return err + + case <-timer: + // Handle timeout + err = fmt.Errorf("failed to execute kusion apply as: timeout for %d seconds", o.GetTimeout()) + return err + } + } +} + +func task(o options.ApplyOptions, state *applystate.State, taskResult chan<- error) { + var err error + defer func() { + if err != nil { + err = WrappedErr(err, "Apply task err") + } + taskResult <- err + close(taskResult) + }() + + defer cmdutil.RecoverErr(&err) + + // start preview + if err = state.UpdateReleasePhasePreviewing(); err != nil { + return + } + + var changes *models.Changes + + // compute changes for preview + changes, err = Preview(o, state.ReleaseStorage, state.TargetRel.Spec, state.TargetRel.State, o.GetRefProject(), o.GetRefStack()) + if err != nil { + return + } + + if allUnChange(changes) { + fmt.Println("All resources are reconciled. No diff found") + return + } + + // summary preview table + changes.Summary(state.IO.Out, o.GetNoStyle()) + + // detail detection + if o.GetDetail() && o.GetAll() { + changes.OutputDiff("all") + if !o.GetYes() { + return + } + } + + // prompt + if !o.GetYes() { + for { + var input string + input, err = prompt(o.GetUI(), state) + if err != nil { + return + } + if input == "yes" { + break + } else if input == "details" { + var target string + target, err = changes.PromptDetails(o.GetUI()) + if err != nil { + return + } + changes.OutputDiff(target) + } else { + fmt.Println("Operation apply canceled") + return + } + } + } + + // update release phase to applying + err = state.UpdateReleasePhaseApplying() + if err != nil { + return + } + + // Get graph storage directory, create if not exist + state.GraphStorage, err = o.GetBackend().GraphStorage(o.GetRefProject().Name, o.GetRefWorkspace().Name) + if err != nil { + return + } + + // Try to get existing graph, use the graph if exists + if state.GraphStorage.CheckGraphStorageExistence() { + state.Gph, err = state.GraphStorage.Get() + if err != nil { + return + } + err = graph.ValidateGraph(state.Gph) + if err != nil { + return + } + // Put new resources from the generated spec to graph + state.Gph, err = graph.GenerateGraph(state.TargetRel.Spec.Resources, state.Gph) + } else { + // Create a new graph to be used globally if no graph is stored in the storage + state.Gph = &apiv1.Graph{ + Project: o.GetRefProject().Name, + Workspace: o.GetRefWorkspace().Name, + } + state.Gph, err = graph.GenerateGraph(state.TargetRel.Spec.Resources, state.Gph) + } + if err != nil { + return + } + + // start applying + fmt.Printf("\nStart applying diffs ...\n") + + err = apply(o, state, changes) + if err != nil { + return + } + + // if dry run, print the hint + if o.GetDryRun() { + fmt.Printf("\nNOTE: Currently running in the --dry-run mode, the above configuration does not really take effect\n") + return + } + + if state.PortForward > 0 { + fmt.Printf("\nStart port-forwarding ...\n") + state.PortForwarded = true + if err = PortForward(state); err != nil { + return + } + } +} + +func apply(o options.ApplyOptions, state *applystate.State, changes *models.Changes) (err error) { + // Update the release before exit. + defer func() { + var finishErr error + // Update graph and write to storage if not dry run. + if !state.DryRun { + // Use resources in the state to get resource Cloud ID. + for _, resource := range state.TargetRel.State.Resources { + var info *graph.ResourceInfo + // Get information of each of the resources + info, finishErr = graph.GetResourceInfo(&resource) + if finishErr != nil { + err = errors.Join(err, finishErr) + return + } + // Update information of each of the resources. + graphResource := graph.FindGraphResourceByID(state.Gph.Resources, resource.ID) + if graphResource != nil { + graphResource.CloudResourceID = info.CloudResourceID + graphResource.Type = info.ResourceType + graphResource.Name = info.ResourceName + } + } + + // Update graph if exists, otherwise create a new graph file. + if state.GraphStorage.CheckGraphStorageExistence() { + // No need to store resource index + graph.RemoveResourceIndex(state.Gph) + finishErr = state.GraphStorage.Update(state.Gph) + if finishErr != nil { + err = errors.Join(err, finishErr) + return + } + } else { + graph.RemoveResourceIndex(state.Gph) + finishErr = state.GraphStorage.Create(state.Gph) + if finishErr != nil { + err = errors.Join(err, finishErr) + return + } + } + } + }() + + defer cmdutil.RecoverErr(&err) + + // construct the apply operation + ac := &operation.ApplyOperation{ + Operation: models.Operation{ + Stack: changes.Stack(), + ReleaseStorage: state.ReleaseStorage, + MsgCh: make(chan models.Message), + IgnoreFields: o.GetIgnoreFields(), + }, + } + + // Init a watch channel with a sufficient buffer when it is necessary to perform watching. + isWatch := state.Watch && !state.DryRun + if isWatch { + ac.WatchCh = make(chan string, 100) + } + + // Get the multi printer from UI option. + multi := o.GetUI().MultiPrinter + // Max length of resource ID for progressbar width. + maxLen := 0 + + // Prepare the writer to print the operation progress and results. + changesWriterMap := make(map[string]*pterm.SpinnerPrinter) + for _, key := range changes.Values() { + // Get the maximum length of the resource ID. + if len(key.ID) > maxLen { + maxLen = len(key.ID) + } + // Init a spinner printer for the resource to print the apply status. + changesWriterMap[key.ID], err = o.GetUI().SpinnerPrinter. + WithWriter(multi.NewWriter()). + Start(fmt.Sprintf("Pending %s", pterm.Bold.Sprint(key.ID))) + if err != nil { + return fmt.Errorf("failed to init change step spinner printer: %v", err) + } + } + + // Init a writer for progressbar. + pbWriter := multi.NewWriter() + // progress bar, print dag walk detail + progressbar, err := o.GetUI().ProgressbarPrinter. + WithTotal(len(changes.StepKeys)). + WithWriter(pbWriter). + WithRemoveWhenDone(). + WithShowCount(false). + WithMaxWidth(maxLen + 32). + Start() + if err != nil { + return err + } + + // The writer below is for operation error printing. + errWriter := multi.NewWriter() + + multi.WithUpdateDelay(time.Millisecond * 100) + multi.Start() + defer multi.Stop() + + // apply result + applyResult := make(chan error) + // receive msg and print detail + go PrintApplyDetails(state, ac.MsgCh, applyResult, changesWriterMap, progressbar, changes) + + var watchResult chan error + // Apply while watching the resources. + if isWatch { + watchResult = make(chan error) + go Watch(state, watchResult, ac.WatchCh, multi, changesWriterMap, changes) + } + + var updatedRel *apiv1.Release + if state.DryRun { + for _, r := range state.TargetRel.Spec.Resources { + ac.MsgCh <- models.Message{ + ResourceID: r.ResourceKey(), + OpResult: models.Success, + OpErr: nil, + } + } + close(ac.MsgCh) + } else { + // parse cluster in arguments + rsp, st := ac.Apply(&operation.ApplyRequest{ + Request: models.Request{ + Project: changes.Project(), + Stack: changes.Stack(), + }, + Release: state.TargetRel, + Graph: state.Gph, + }) + if v1.IsErr(st) { + errWriter.(*bytes.Buffer).Reset() + err = fmt.Errorf("apply failed, status:\n%v", st) + return err + } + // Update the release with that in the apply response if not dryrun. + updatedRel = rsp.Release + if updatedRel != nil { + *state.TargetRel = *updatedRel + } + state.Gph = rsp.Graph + } + + // wait for apply result ( msgCh closed ) + err = <-applyResult + if err != nil { + return + } + + // Wait for watchWg closed if need to perform watching. + if isWatch { + shouldBreak := false + for !shouldBreak { + select { + case watchErr := <-watchResult: + if watchErr != nil { + err = watchErr + return + } + shouldBreak = true + default: + continue + } + } + } + + // print summary + pterm.Fprintln(pbWriter, fmt.Sprintf("\nApply complete! Resources: %d created, %d updated, %d deleted.", state.Ls.GetCreated(), state.Ls.GetUpdated(), state.Ls.GetDeleted())) + return nil +} + +// PortForward function will forward the specified port from local to the project Kubernetes Service. +// +// Example: +// +// o := newApplyOptions() +// spec, err := generate.GenerateSpecWithSpinner(o.RefProject, o.RefStack, o.RefWorkspace, nil, o.NoStyle) +// +// if err != nil { +// return err +// } +// +// err = PortForward(o, spec) +// +// if err != nil { +// return err +// } +// + +func allUnChange(changes *models.Changes) bool { + for _, v := range changes.ChangeSteps { + if v.Action != models.UnChanged { + return false + } + } + + return true +} + +func prompt(ui *terminal.UI, state *applystate.State) (string, error) { + // don`t display yes item when only preview + ops := []string{"yes", "details", "no"} + input, err := ui.InteractiveSelectPrinter. + WithFilter(false). + WithDefaultText(`Do you want to apply these diffs?`). + WithOptions(ops). + WithDefaultOption("details"). + // To gracefully exit if interrupted by SIGINT or SIGTERM. + WithOnInterruptFunc(func() { + state.InterruptFunc() + os.Exit(1) + }). + Show() + if err != nil { + fmt.Printf("Prompt failed: %v\n", err) + return "", err + } + + return input, nil +} diff --git a/pkg/engine/apply/apply_test.go b/pkg/engine/apply/apply_test.go new file mode 100644 index 000000000..535e7a1b9 --- /dev/null +++ b/pkg/engine/apply/apply_test.go @@ -0,0 +1,537 @@ +package apply + +import ( + "bytes" + "errors" + "io" + "sync" + "testing" + "time" + + "github.com/liu-hm19/pterm" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/watch" + v1 "kusionstack.io/kusion/pkg/apis/status/v1" + "kusionstack.io/kusion/pkg/backend" + "kusionstack.io/kusion/pkg/backend/storages" + "kusionstack.io/kusion/pkg/engine" + "kusionstack.io/kusion/pkg/engine/operation" + "kusionstack.io/kusion/pkg/engine/operation/models" + "kusionstack.io/kusion/pkg/engine/printers" + releasestorages "kusionstack.io/kusion/pkg/engine/release/storages" + "kusionstack.io/kusion/pkg/engine/resource/graph" + graphstorages "kusionstack.io/kusion/pkg/engine/resource/graph/storages" + "kusionstack.io/kusion/pkg/engine/runtime" + + "github.com/bytedance/mockey" + "github.com/stretchr/testify/assert" + apiv1 "kusionstack.io/kusion/pkg/apis/api.kusion.io/v1" + applystate "kusionstack.io/kusion/pkg/engine/apply/state" + "kusionstack.io/kusion/pkg/util/terminal" +) + +var ( + proj = &apiv1.Project{ + Name: "fake-proj", + } + stack = &apiv1.Stack{ + Name: "fake-stack", + } + workspace = &apiv1.Workspace{ + Name: "fake-workspace", + } +) + +type MockApplyOptions struct{} + +func (m *MockApplyOptions) GetRefProject() *apiv1.Project { return proj } +func (m *MockApplyOptions) GetRefStack() *apiv1.Stack { return stack } +func (m *MockApplyOptions) GetRefWorkspace() *apiv1.Workspace { return workspace } +func (m *MockApplyOptions) GetBackend() backend.Backend { return &storages.LocalStorage{} } +func (m *MockApplyOptions) GetNoStyle() bool { return true } +func (m *MockApplyOptions) GetYes() bool { return true } +func (m *MockApplyOptions) GetDryRun() bool { return false } +func (m *MockApplyOptions) GetWatch() bool { return false } +func (m *MockApplyOptions) GetTimeout() int { return 0 } +func (m *MockApplyOptions) GetPortForward() int { return 0 } +func (m *MockApplyOptions) GetDetail() bool { return false } +func (m *MockApplyOptions) GetAll() bool { return false } +func (m *MockApplyOptions) GetOutput() string { return "" } +func (m *MockApplyOptions) GetSpecFile() string { return "" } +func (m *MockApplyOptions) GetIgnoreFields() []string { return nil } +func (m *MockApplyOptions) GetValues() []string { return nil } +func (m *MockApplyOptions) GetUI() *terminal.UI { return terminal.DefaultUI() } + +const ( + apiVersion = "v1" + kind = "ServiceAccount" + namespace = "test-ns" +) + +var ( + sa1 = newSA("sa1") + sa2 = newSA("sa2") +) + +var state = &applystate.State{ + DryRun: true, + Gph: &apiv1.Graph{}, + TargetRel: &apiv1.Release{}, + ReleaseStorage: &releasestorages.LocalStorage{}, + GraphStorage: &graphstorages.LocalStorage{}, + RelLock: &sync.Mutex{}, + Ls: &applystate.LineSummary{}, +} + +func newSA(name string) apiv1.Resource { + return apiv1.Resource{ + ID: engine.BuildID(apiVersion, kind, namespace, name), + Type: "Kubernetes", + Attributes: map[string]interface{}{ + "apiVersion": apiVersion, + "kind": kind, + "metadata": map[string]interface{}{ + "name": name, + "namespace": namespace, + }, + }, + } +} + +func mockGraphStorage() { + mockey.Mock((*storages.LocalStorage).GraphStorage).Return(&graphstorages.LocalStorage{}, nil).Build() + mockey.Mock((*graphstorages.LocalStorage).Create).Return(nil).Build() + mockey.Mock((*graphstorages.LocalStorage).Delete).Return(nil).Build() + mockey.Mock((*graphstorages.LocalStorage).Update).Return(nil).Build() + mockey.Mock((*graphstorages.LocalStorage).Get).Return(&apiv1.Graph{ + Project: "", + Workspace: "", + Resources: &apiv1.GraphResources{ + WorkloadResources: map[string]*apiv1.GraphResource{}, + DependencyResources: map[string]*apiv1.GraphResource{}, + OtherResources: map[string]*apiv1.GraphResource{}, + ResourceIndex: map[string]*apiv1.ResourceEntry{}, + }, + }, nil).Build() +} + +func TestApply(t *testing.T) { + loc, _ := time.LoadLocation("Asia/Shanghai") + o := &MockApplyOptions{} + mockey.PatchConvey("dry run", t, func() { + mockey.Mock((*releasestorages.LocalStorage).Update).Return(nil).Build() + rel := &apiv1.Release{ + Project: "fake-project", + Workspace: "fake-workspace", + Revision: 1, + Stack: "fake-stack", + Spec: &apiv1.Spec{Resources: []apiv1.Resource{sa1}}, + State: &apiv1.State{}, + Phase: apiv1.ReleasePhaseApplying, + CreateTime: time.Date(2024, 5, 20, 19, 39, 0, 0, loc), + ModifiedTime: time.Date(2024, 5, 20, 19, 39, 0, 0, loc), + } + order := &models.ChangeOrder{ + StepKeys: []string{sa1.ID}, + ChangeSteps: map[string]*models.ChangeStep{ + sa1.ID: { + ID: sa1.ID, + Action: models.Create, + From: sa1, + }, + }, + } + changes := models.NewChanges(proj, stack, order) + state.TargetRel = rel + state.DryRun = true + err := apply(o, state, changes) + assert.Nil(t, err) + }) + mockey.PatchConvey("apply success", t, func() { + mockOperationApply(models.Success) + mockey.Mock((*releasestorages.LocalStorage).Update).Return(nil).Build() + mockey.Mock((*storages.LocalStorage).GraphStorage).Return(&graphstorages.LocalStorage{}, nil).Build() + mockey.Mock((*graphstorages.LocalStorage).Create).Return(nil).Build() + // mockGraphStorage() + rel := &apiv1.Release{ + Project: "fake-project", + Workspace: "fake-workspace", + Revision: 1, + Stack: "fake-stack", + Spec: &apiv1.Spec{Resources: []apiv1.Resource{sa1, sa2}}, + State: &apiv1.State{}, + Phase: apiv1.ReleasePhaseApplying, + CreateTime: time.Date(2024, 5, 20, 19, 39, 0, 0, loc), + ModifiedTime: time.Date(2024, 5, 20, 19, 39, 0, 0, loc), + } + order := &models.ChangeOrder{ + StepKeys: []string{sa1.ID, sa2.ID}, + ChangeSteps: map[string]*models.ChangeStep{ + sa1.ID: { + ID: sa1.ID, + Action: models.Create, + From: &sa1, + }, + sa2.ID: { + ID: sa2.ID, + Action: models.UnChanged, + From: &sa2, + }, + }, + } + + changes := models.NewChanges(proj, stack, order) + state.TargetRel = rel + state.Gph = &apiv1.Graph{ + Project: rel.Project, + Workspace: rel.Workspace, + } + state.DryRun = false + graph.GenerateGraph(rel.Spec.Resources, state.Gph) + err := apply(o, state, changes) + assert.Nil(t, err) + }) + mockey.PatchConvey("apply failed", t, func() { + mockOperationApply(models.Failed) + mockey.Mock((*releasestorages.LocalStorage).Update).Return(nil).Build() + mockGraphStorage() + rel := &apiv1.Release{ + Project: "fake-project", + Workspace: "fake-workspace", + Revision: 1, + Stack: "fake-stack", + Spec: &apiv1.Spec{Resources: []apiv1.Resource{sa1}}, + State: &apiv1.State{}, + Phase: apiv1.ReleasePhaseApplying, + CreateTime: time.Date(2024, 5, 20, 19, 39, 0, 0, loc), + ModifiedTime: time.Date(2024, 5, 20, 19, 39, 0, 0, loc), + } + order := &models.ChangeOrder{ + StepKeys: []string{sa1.ID}, + ChangeSteps: map[string]*models.ChangeStep{ + sa1.ID: { + ID: sa1.ID, + Action: models.Create, + From: &sa1, + }, + }, + } + changes := models.NewChanges(proj, stack, order) + state.Gph = &apiv1.Graph{} + state.TargetRel = rel + state.DryRun = false + graph.GenerateGraph(rel.Spec.Resources, state.Gph) + err := apply(o, state, changes) + assert.NotNil(t, err) + }) +} + +func mockOperationApply(res models.OpResult) { + mockey.Mock((*operation.ApplyOperation).Apply).To( + func(o *operation.ApplyOperation, request *operation.ApplyRequest) (*operation.ApplyResponse, v1.Status) { + var err error + if res == models.Failed { + err = errors.New("mock error") + } + for _, r := range request.Release.Spec.Resources { + // ing -> $res + o.MsgCh <- models.Message{ + ResourceID: r.ResourceKey(), + OpResult: "", + OpErr: nil, + } + o.MsgCh <- models.Message{ + ResourceID: r.ResourceKey(), + OpResult: res, + OpErr: err, + } + } + close(o.MsgCh) + if res == models.Failed { + return nil, v1.NewErrorStatus(err) + } + return &operation.ApplyResponse{}, nil + }).Build() +} + +func mockPromptOutput(res string) { + mockey.Mock((*pterm.InteractiveSelectPrinter).Show).Return(res, nil).Build() +} + +func TestPrompt(t *testing.T) { + mockey.PatchConvey("prompt error", t, func() { + mockey.Mock((*pterm.InteractiveSelectPrinter).Show).Return("", errors.New("mock error")).Build() + _, err := prompt(terminal.DefaultUI(), &applystate.State{}) + assert.NotNil(t, err) + }) + + mockey.PatchConvey("prompt yes", t, func() { + mockPromptOutput("yes") + _, err := prompt(terminal.DefaultUI(), &applystate.State{}) + assert.Nil(t, err) + }) +} + +func TestWatchK8sResources(t *testing.T) { + t.Run("successfully apply default K8s resources", func(t *testing.T) { + id := "v1:Namespace:example" + chs := make([]<-chan watch.Event, 1) + events := []watch.Event{ + { + Type: watch.Added, + Object: &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "v1", + "kind": "Namespace", + "metadata": map[string]interface{}{ + "name": "example", + }, + "spec": map[string]interface{}{}, + }, + }, + }, + { + Type: watch.Added, + Object: &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "v1", + "kind": "Namespace", + "metadata": map[string]interface{}{ + "name": "example", + }, + "spec": map[string]interface{}{}, + "status": map[string]interface{}{ + "phase": corev1.NamespaceActive, + }, + }, + }, + }, + } + + out := make(chan watch.Event, 10) + for _, e := range events { + out <- e + } + chs[0] = out + table := &printers.Table{ + IDs: []string{id}, + Rows: map[string]*printers.Row{}, + } + tables := map[string]*printers.Table{ + id: table, + } + resource := &apiv1.GraphResource{ + ID: id, + Type: "", + Name: "", + CloudResourceID: "", + Status: "", + Dependents: []string{}, + Dependencies: []string{}, + } + gph := &apiv1.Graph{ + Project: "example project", + Workspace: "example workspace", + Resources: &apiv1.GraphResources{ + WorkloadResources: map[string]*apiv1.GraphResource{"id": resource}, + DependencyResources: map[string]*apiv1.GraphResource{}, + OtherResources: map[string]*apiv1.GraphResource{}, + ResourceIndex: map[string]*apiv1.ResourceEntry{}, + }, + } + + healthPolicy := map[string]interface{}{ + "health.kcl": "assert res.metadata.generation == 1", + } + + graph.UpdateResourceIndex(gph.Resources) + watchResult := make(chan error) + watchK8sResources(id, "", chs, table, tables, healthPolicy, state.Gph.Resources, watchResult) + + assert.Equal(t, true, table.AllCompleted()) + }) + t.Run("successfully apply customized K8s resources", func(t *testing.T) { + id := "v1:Deployment:example" + chs := make([]<-chan watch.Event, 1) + events := []watch.Event{ + { + Type: watch.Added, + Object: &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "v1", + "kind": "Deployment", + "metadata": map[string]interface{}{ + "name": "example", + "generation": 1, + }, + "spec": map[string]interface{}{}, + }, + }, + }, + } + + out := make(chan watch.Event, 10) + for _, e := range events { + out <- e + } + chs[0] = out + table := &printers.Table{ + IDs: []string{id}, + Rows: map[string]*printers.Row{}, + } + tables := map[string]*printers.Table{ + id: table, + } + var policyInterface interface{} + healthPolicy := map[string]interface{}{ + "health.kcl": "assert res.metadata.generation == 1", + } + policyInterface = healthPolicy + resource := &apiv1.GraphResource{ + ID: id, + Type: "", + Name: "", + CloudResourceID: "", + Status: "", + Dependents: []string{}, + Dependencies: []string{}, + } + gph := &apiv1.Graph{ + Project: "example project", + Workspace: "example workspace", + Resources: &apiv1.GraphResources{ + WorkloadResources: map[string]*apiv1.GraphResource{"id": resource}, + DependencyResources: map[string]*apiv1.GraphResource{}, + OtherResources: map[string]*apiv1.GraphResource{}, + ResourceIndex: map[string]*apiv1.ResourceEntry{}, + }, + } + graph.UpdateResourceIndex(gph.Resources) + watchResult := make(chan error) + watchK8sResources(id, "Deployment", chs, table, tables, policyInterface, state.Gph.Resources, watchResult) + + assert.Equal(t, true, table.AllCompleted()) + }) +} + +func TestWatchTFResources(t *testing.T) { + t.Run("successfully apply TF resources", func(t *testing.T) { + eventCh := make(chan runtime.TFEvent, 10) + events := []runtime.TFEvent{ + runtime.TFApplying, + runtime.TFApplying, + runtime.TFSucceeded, + } + for _, e := range events { + eventCh <- e + } + + id := "hashicorp:random:random_password:example-dev-kawesome" + table := &printers.Table{ + IDs: []string{id}, + Rows: map[string]*printers.Row{ + "hashicorp:random:random_password:example-dev-kawesome": {}, + }, + } + + watchResult := make(chan error) + watchTFResources(id, eventCh, table, watchResult) + + assert.Equal(t, true, table.AllCompleted()) + }) +} + +func TestPrintTable(t *testing.T) { + w := io.Writer(bytes.NewBufferString("")) + id := "fake-resource-id" + tables := map[string]*printers.Table{ + "fake-resource-id": printers.NewTable([]string{ + "fake-resource-id", + }), + } + + t.Run("skip unsupported resources", func(t *testing.T) { + printTable(&w, "fake-fake-resource-id", tables) + assert.Contains(t, w.(*bytes.Buffer).String(), "Skip monitoring unsupported resources") + }) + + t.Run("update table", func(t *testing.T) { + printTable(&w, id, tables) + tableStr, err := pterm.DefaultTable. + WithStyle(pterm.NewStyle(pterm.FgDefault)). + WithHeaderStyle(pterm.NewStyle(pterm.FgDefault)). + WithHasHeader().WithSeparator(" ").WithData(tables[id].Print()).Srender() + + assert.Nil(t, err) + assert.Contains(t, w.(*bytes.Buffer).String(), tableStr) + }) +} + +func TestGetResourceInfo(t *testing.T) { + tests := []struct { + name string + resource *apiv1.Resource + expectedKind string + expectPanic bool + }{ + { + name: "with valid resource", + resource: &apiv1.Resource{ + Attributes: map[string]interface{}{ + apiv1.FieldKind: "Service", + }, + Extensions: map[string]interface{}{ + apiv1.FieldHealthPolicy: "policyValue", + }, + }, + expectedKind: "Service", + expectPanic: false, + }, + { + name: "with nil Attributes", + resource: &apiv1.Resource{ + Attributes: nil, + Extensions: map[string]interface{}{ + apiv1.FieldHealthPolicy: "policyValue", + }, + }, + expectPanic: true, + }, + { + name: "with non-string kind", + resource: &apiv1.Resource{ + Attributes: map[string]interface{}{ + apiv1.FieldKind: 123, + }, + Extensions: map[string]interface{}{ + apiv1.FieldHealthPolicy: "policyValue", + }, + }, + expectPanic: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if tt.expectPanic { + defer func() { + if r := recover(); r == nil { + t.Errorf("expected panic for test case '%s', but got none", tt.name) + } + }() + } + + healthPolicy, kind := getResourceInfo(tt.resource) + if !tt.expectPanic { + if kind != tt.expectedKind { + t.Errorf("expected kind '%s', but got '%s'", tt.expectedKind, kind) + } + if healthPolicy != "policyValue" && !tt.expectPanic { + t.Errorf("expected healthPolicy to be 'policyValue', but got '%v'", healthPolicy) + } + } + }) + } +} diff --git a/pkg/engine/apply/options/apply.go b/pkg/engine/apply/options/apply.go new file mode 100644 index 000000000..c92b6a9bf --- /dev/null +++ b/pkg/engine/apply/options/apply.go @@ -0,0 +1,11 @@ +package options + +type ApplyOptions interface { + PreviewOptions + + GetYes() bool + GetDryRun() bool + GetWatch() bool + GetTimeout() int + GetPortForward() int +} diff --git a/pkg/engine/apply/options/meta.go b/pkg/engine/apply/options/meta.go new file mode 100644 index 000000000..f1b756ce0 --- /dev/null +++ b/pkg/engine/apply/options/meta.go @@ -0,0 +1,13 @@ +package options + +import ( + v1 "kusionstack.io/kusion/pkg/apis/api.kusion.io/v1" + "kusionstack.io/kusion/pkg/backend" +) + +type Meta interface { + GetRefProject() *v1.Project + GetRefStack() *v1.Stack + GetRefWorkspace() *v1.Workspace + GetBackend() backend.Backend +} diff --git a/pkg/engine/apply/options/preview.go b/pkg/engine/apply/options/preview.go new file mode 100644 index 000000000..29cfeaede --- /dev/null +++ b/pkg/engine/apply/options/preview.go @@ -0,0 +1,16 @@ +package options + +import "kusionstack.io/kusion/pkg/util/terminal" + +type PreviewOptions interface { + Meta + + GetDetail() bool + GetAll() bool + GetNoStyle() bool + GetOutput() string + GetSpecFile() string + GetIgnoreFields() []string + GetValues() []string + GetUI() *terminal.UI +} diff --git a/pkg/engine/apply/port_forward.go b/pkg/engine/apply/port_forward.go new file mode 100644 index 000000000..f6367cff9 --- /dev/null +++ b/pkg/engine/apply/port_forward.go @@ -0,0 +1,31 @@ +package apply + +import ( + "fmt" + + applystate "kusionstack.io/kusion/pkg/engine/apply/state" + "kusionstack.io/kusion/pkg/engine/operation" +) + +func PortForward( + state *applystate.State, +) error { + if state.DryRun { + fmt.Println("NOTE: Portforward doesn't work in DryRun mode") + return nil + } + + state.PortForwardReady, state.PortForwardStop = make(chan struct{}, 1), make(chan struct{}, 1) + + // portforward operation + wo := &operation.PortForwardOperation{} + if err := wo.PortForward(&operation.PortForwardRequest{ + Spec: state.TargetRel.Spec, + Port: state.PortForward, + }, state.PortForwardStop, state.PortForwardReady); err != nil { + return err + } + + fmt.Println("Portforward has been completed!") + return nil +} diff --git a/pkg/engine/apply/preview.go b/pkg/engine/apply/preview.go new file mode 100644 index 000000000..bbffc3336 --- /dev/null +++ b/pkg/engine/apply/preview.go @@ -0,0 +1,83 @@ +package apply + +import ( + "fmt" + + apiv1 "kusionstack.io/kusion/pkg/apis/api.kusion.io/v1" + v1 "kusionstack.io/kusion/pkg/apis/status/v1" + "kusionstack.io/kusion/pkg/engine/apply/options" + "kusionstack.io/kusion/pkg/engine/operation" + "kusionstack.io/kusion/pkg/engine/operation/models" + "kusionstack.io/kusion/pkg/engine/release" + "kusionstack.io/kusion/pkg/engine/runtime/terraform" + "kusionstack.io/kusion/pkg/log" +) + +// The Preview function calculates the upcoming actions of each resource +// through the execution Kusion Engine, and you can customize the +// runtime of engine and the state storage through `runtime` and +// `storage` parameters. +// +// Example: +// +// o := newPreviewOptions() +// stateStorage := &states.FileSystemState{ +// Path: filepath.Join(o.WorkDir, states.KusionState) +// } +// kubernetesRuntime, err := runtime.NewKubernetesRuntime() +// if err != nil { +// return err +// } +// +// changes, err := Preview(o, kubernetesRuntime, stateStorage, +// planResources, project, stack, os.Stdout) +// if err != nil { +// return err +// } +func Preview( + o options.PreviewOptions, + storage release.Storage, + planResources *apiv1.Spec, + priorResources *apiv1.State, + project *apiv1.Project, + stack *apiv1.Stack, +) (*models.Changes, error) { + log.Info("Start compute preview changes ...") + + // check and install terraform executable binary for + // resources with the type of Terraform. + tfInstaller := terraform.CLIInstaller{ + Intent: planResources, + } + if err := tfInstaller.CheckAndInstall(); err != nil { + return nil, err + } + + // construct the preview operation + pc := &operation.PreviewOperation{ + Operation: models.Operation{ + OperationType: models.ApplyPreview, + Stack: stack, + ReleaseStorage: storage, + IgnoreFields: o.GetIgnoreFields(), + ChangeOrder: &models.ChangeOrder{StepKeys: []string{}, ChangeSteps: map[string]*models.ChangeStep{}}, + }, + } + + log.Info("Start call pc.Preview() ...") + + // parse cluster in arguments + rsp, s := pc.Preview(&operation.PreviewRequest{ + Request: models.Request{ + Project: project, + Stack: stack, + }, + Spec: planResources, + State: priorResources, + }) + if v1.IsErr(s) { + return nil, fmt.Errorf("preview failed.\n%s", s.String()) + } + + return models.NewChanges(project, stack, rsp.Order), nil +} diff --git a/pkg/engine/apply/print_details.go b/pkg/engine/apply/print_details.go new file mode 100644 index 000000000..75801530e --- /dev/null +++ b/pkg/engine/apply/print_details.go @@ -0,0 +1,96 @@ +package apply + +import ( + "errors" + "fmt" + + "github.com/liu-hm19/pterm" + apiv1 "kusionstack.io/kusion/pkg/apis/api.kusion.io/v1" + cmdutil "kusionstack.io/kusion/pkg/cmd/util" + applystate "kusionstack.io/kusion/pkg/engine/apply/state" + "kusionstack.io/kusion/pkg/engine/operation/models" + "kusionstack.io/kusion/pkg/engine/resource/graph" + "kusionstack.io/kusion/pkg/util/pretty" +) + +// PrintApplyDetails function will receive the messages of the apply operation and print the details. +func PrintApplyDetails( + state *applystate.State, + msgChan chan models.Message, + applyResult chan error, + changesWriterMap map[string]*pterm.SpinnerPrinter, + progressbar *pterm.ProgressbarPrinter, + changes *models.Changes, +) { + var err error + defer func() { + applyResult <- err + close(applyResult) + }() + defer cmdutil.RecoverErr(&err) + + for { + select { + // Get operation results from the message channel. + case msg, ok := <-msgChan: + if !ok { + return + } + changeStep := changes.Get(msg.ResourceID) + + // Update the progressbar and spinner printer according to the operation result. + switch msg.OpResult { + case models.Success, models.Skip: + var title string + if changeStep.Action == models.UnChanged { + title = fmt.Sprintf("Skipped %s", pterm.Bold.Sprint(changeStep.ID)) + changesWriterMap[msg.ResourceID].Success(title) + } else { + if state.Watch && !state.DryRun { + title = fmt.Sprintf("%s %s", + changeStep.Action.Ing(), + pterm.Bold.Sprint(changeStep.ID), + ) + changesWriterMap[msg.ResourceID].UpdateText(title) + } else { + changesWriterMap[msg.ResourceID].Success(fmt.Sprintf("Succeeded %s", pterm.Bold.Sprint(msg.ResourceID))) + } + } + + // Update resource status + if !state.DryRun && changeStep.Action != models.UnChanged { + gphResource := graph.FindGraphResourceByID(state.Gph.Resources, msg.ResourceID) + if gphResource != nil { + // Delete resource from the graph if it's deleted during apply + if changeStep.Action == models.Delete { + graph.RemoveResource(state.Gph, gphResource) + } else { + gphResource.Status = apiv1.ApplySucceed + } + } + } + + progressbar.Increment() + state.Ls.Count(changeStep.Action) + case models.Failed: + title := fmt.Sprintf("Failed %s", pterm.Bold.Sprint(changeStep.ID)) + changesWriterMap[msg.ResourceID].Fail(title) + errStr := pretty.ErrorT.Sprintf("apply %s failed as: %s\n", msg.ResourceID, msg.OpErr.Error()) + err = errors.Join(err, errors.New(errStr)) + if !state.DryRun { + // Update resource status, in case anything like update fail happened + gphResource := graph.FindGraphResourceByID(state.Gph.Resources, msg.ResourceID) + if gphResource != nil { + gphResource.Status = apiv1.ApplyFail + } + } + default: + title := fmt.Sprintf("%s %s", + changeStep.Action.Ing(), + pterm.Bold.Sprint(changeStep.ID), + ) + changesWriterMap[msg.ResourceID].UpdateText(title) + } + } + } +} diff --git a/pkg/engine/apply/state/state.go b/pkg/engine/apply/state/state.go new file mode 100644 index 000000000..51bddd739 --- /dev/null +++ b/pkg/engine/apply/state/state.go @@ -0,0 +1,136 @@ +package state + +import ( + "sync" + "time" + + "k8s.io/cli-runtime/pkg/genericiooptions" + "kusionstack.io/kusion/pkg/engine/release" + + apiv1 "kusionstack.io/kusion/pkg/apis/api.kusion.io/v1" + "kusionstack.io/kusion/pkg/engine/resource/graph" +) + +type Metadata struct { + Project string + Stack string + Workspace string +} + +// State release state +type State struct { + *Metadata + + // apply state options + DryRun bool + PortForward int + Watch bool + IO genericiooptions.IOStreams + + // release status data + CurrentRel *apiv1.Release + TargetRel *apiv1.Release + Gph *apiv1.Graph + RelLock *sync.Mutex + + // port forwarded + PortForwarded bool + PortForwardStop chan struct{} + PortForwardReady chan struct{} + + GraphStorage graph.Storage + + // release storage + ReleaseStorage release.Storage + ReleaseCreated bool + + // summary + Ls *LineSummary + + // callback revision + CallbackRevision uint64 +} + +func (s *State) CreateStorageRelease(rel *apiv1.Release) error { + err := s.ReleaseStorage.Create(rel) + if err != nil { + return err + } + s.ReleaseCreated = true + return nil +} + +func (s *State) UpdateReleasePhaseFailed() (err error) { + if !s.ReleaseCreated { + return + } + if s.TargetRel == nil || s.ReleaseStorage == nil { + return nil + } + release.UpdateReleasePhase(s.TargetRel, apiv1.ReleasePhaseFailed, s.RelLock) + if err = release.UpdateApplyRelease(s.ReleaseStorage, s.TargetRel, s.DryRun, s.RelLock); err != nil { + return + } + return nil +} + +func (s *State) UpdateReleasePhaseSucceeded() (err error) { + if !s.ReleaseCreated { + return + } + release.UpdateReleasePhase(s.TargetRel, apiv1.ReleasePhaseSucceeded, s.RelLock) + if err = release.UpdateApplyRelease(s.ReleaseStorage, s.TargetRel, s.DryRun, s.RelLock); err != nil { + return + } + return nil +} + +func (s *State) UpdateReleasePhasePreviewing() (err error) { + if !s.ReleaseCreated { + return + } + release.UpdateReleasePhase(s.TargetRel, apiv1.ReleasePhasePreviewing, s.RelLock) + if err = release.UpdateApplyRelease(s.ReleaseStorage, s.TargetRel, s.DryRun, s.RelLock); err != nil { + return + } + return +} + +func (s *State) UpdateReleasePhaseApplying() (err error) { + if !s.ReleaseCreated { + return + } + release.UpdateReleasePhase(s.TargetRel, apiv1.ReleasePhaseApplying, s.RelLock) + if err = release.UpdateApplyRelease(s.ReleaseStorage, s.TargetRel, s.DryRun, s.RelLock); err != nil { + return + } + return +} + +func (s *State) InterruptFunc() { + if !s.ReleaseCreated { + return + } + release.UpdateReleasePhase(s.TargetRel, apiv1.ReleasePhaseFailed, s.RelLock) + _ = release.UpdateApplyRelease(s.ReleaseStorage, s.TargetRel, false, s.RelLock) +} + +func (s *State) ExitClear() { + finish := make(chan struct{}) + + // clear port forward + timeOut := time.NewTimer(5 * time.Second) + + go func() { + if s.PortForwarded { + s.PortForwardStop <- struct{}{} + } + finish <- struct{}{} + }() + select { + case <-timeOut.C: + return + case <-finish: + return + } +} diff --git a/pkg/engine/apply/state/summary.go b/pkg/engine/apply/state/summary.go new file mode 100644 index 000000000..c529247ea --- /dev/null +++ b/pkg/engine/apply/state/summary.go @@ -0,0 +1,30 @@ +package state + +import "kusionstack.io/kusion/pkg/engine/operation/models" + +type LineSummary struct { + created, updated, deleted int +} + +func (ls *LineSummary) Count(op models.ActionType) { + switch op { + case models.Create: + ls.created++ + case models.Update: + ls.updated++ + case models.Delete: + ls.deleted++ + } +} + +func (ls *LineSummary) GetCreated() int { + return ls.created +} + +func (ls *LineSummary) GetUpdated() int { + return ls.updated +} + +func (ls *LineSummary) GetDeleted() int { + return ls.deleted +} diff --git a/pkg/engine/apply/utils.go b/pkg/engine/apply/utils.go new file mode 100644 index 000000000..52e9dec39 --- /dev/null +++ b/pkg/engine/apply/utils.go @@ -0,0 +1,7 @@ +package apply + +import "fmt" + +func WrappedErr(err error, context string) error { + return fmt.Errorf("%s: %w", context, err) +} diff --git a/pkg/engine/apply/watch_resource.go b/pkg/engine/apply/watch_resource.go new file mode 100644 index 000000000..67ea8eab4 --- /dev/null +++ b/pkg/engine/apply/watch_resource.go @@ -0,0 +1,335 @@ +package apply + +import ( + "bytes" + "context" + "fmt" + "io" + "reflect" + "strings" + "time" + + "github.com/liu-hm19/pterm" + "gopkg.in/yaml.v3" + + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/watch" + apiv1 "kusionstack.io/kusion/pkg/apis/api.kusion.io/v1" + v1 "kusionstack.io/kusion/pkg/apis/status/v1" + cmdutil "kusionstack.io/kusion/pkg/cmd/util" + "kusionstack.io/kusion/pkg/engine" + applystate "kusionstack.io/kusion/pkg/engine/apply/state" + "kusionstack.io/kusion/pkg/engine/operation/models" + "kusionstack.io/kusion/pkg/engine/printers" + "kusionstack.io/kusion/pkg/engine/resource/graph" + "kusionstack.io/kusion/pkg/engine/runtime" + runtimeinit "kusionstack.io/kusion/pkg/engine/runtime/init" + "kusionstack.io/kusion/pkg/log" + "kusionstack.io/kusion/pkg/util/kcl" + "kusionstack.io/kusion/pkg/util/pretty" +) + +// Watch function will watch the changed Kubernetes and Terraform resources. +func Watch( + state *applystate.State, + watchResult chan error, + watchChan chan string, + multi *pterm.MultiPrinter, + changesWriterMap map[string]*pterm.SpinnerPrinter, + changes *models.Changes, +) { + var err error + defer func() { + watchResult <- err + close(watchResult) + }() + defer cmdutil.RecoverErr(&err) + + resourceMap := make(map[string]apiv1.Resource) + ioWriterMap := make(map[string]io.Writer) + toBeWatched := apiv1.Resources{} + + // Get the resources to be watched. + for _, res := range state.TargetRel.Spec.Resources { + if changes.ChangeOrder.ChangeSteps[res.ResourceKey()].Action != models.UnChanged { + resourceMap[res.ResourceKey()] = res + toBeWatched = append(toBeWatched, res) + } + } + + // Init the runtimes according to the resource types. + runtimes, s := runtimeinit.Runtimes(*state.TargetRel.Spec, *state.TargetRel.State) + if v1.IsErr(s) { + panic(fmt.Errorf("failed to init runtimes: %s", s.String())) + } + + // Prepare the tables for printing the details of the resources. + tables := make(map[string]*printers.Table, len(toBeWatched)) + ticker := time.NewTicker(time.Millisecond * 100) + defer ticker.Stop() + + // Record the watched and finished resources. + watchedIDs := []string{} + finished := make(map[string]bool) + + // Resource error chan + watchErrChan := make(chan error) + defer close(watchErrChan) + + for !(len(finished) == len(toBeWatched)) { + select { + // Get the resource ID to be watched. + case id := <-watchChan: + res := resourceMap[id] + // Set the timeout duration for watch context, here we set an experiential value of 60 minutes. + ctx, cancel := context.WithTimeout(context.Background(), time.Minute*time.Duration(60)) + defer cancel() + + // Get the event channel for watching the resource. + rsp := runtimes[res.Type].Watch(ctx, &runtime.WatchRequest{Resource: &res}) + if rsp == nil { + log.Debug("unsupported resource type: %s", res.Type) + continue + } + if v1.IsErr(rsp.Status) { + panic(fmt.Errorf("failed to watch %s as %s", id, rsp.Status.String())) + } + + w := rsp.Watchers + table := printers.NewTable(w.IDs) + tables[id] = table + + // Setup a go-routine to concurrently watch K8s and TF resources. + if res.Type == apiv1.Kubernetes { + healthPolicy, kind := getResourceInfo(&res) + go watchK8sResources(id, kind, w.Watchers, table, tables, healthPolicy, state.Gph.Resources, watchErrChan) + } else if res.Type == apiv1.Terraform { + go watchTFResources(id, w.TFWatcher, table, watchErrChan) + } else { + log.Debug("unsupported resource type to watch: %s", string(res.Type)) + continue + } + + // Record the io writer related to the resource ID. + ioWriterMap[id] = multi.NewWriter() + watchedIDs = append(watchedIDs, id) + + case err = <-watchErrChan: + if err != nil { + return + } + + // Refresh the tables printing details of the resources to be watched. + default: + for _, id := range watchedIDs { + w, ok := ioWriterMap[id] + if !ok { + panic(fmt.Errorf("failed to get io writer while watching %s", id)) + } + printTable(&w, id, tables) + } + for id, table := range tables { + if finished[id] { + continue + } + + if table.AllCompleted() { + finished[id] = true + changesWriterMap[id].Success(fmt.Sprintf("Succeeded %s", pterm.Bold.Sprint(id))) + + // Update resource status to reconciled. + resource := graph.FindGraphResourceByID(state.Gph.Resources, id) + if resource != nil { + resource.Status = apiv1.Reconciled + } + } + } + <-ticker.C + } + } +} + +func watchTFResources( + id string, + ch <-chan runtime.TFEvent, + table *printers.Table, + errChan chan<- error, +) { + var err error + defer func() { + if err != nil { + errChan <- WrappedErr(err, "watchTFResources err") + close(errChan) + } + }() + + defer cmdutil.RecoverErr(&err) + + for { + parts := strings.Split(id, engine.Separator) + // A valid Terraform resource ID should consist of 4 parts, including the information of the provider type + // and resource name, for example: hashicorp:random:random_password:example-dev-kawesome. + if len(parts) != 4 { + panic(fmt.Errorf("invalid Terraform resource id: %s", id)) + } + + tfEvent := <-ch + if tfEvent == runtime.TFApplying { + table.Update( + id, + printers.NewRow(watch.EventType("Applying"), + strings.Join([]string{parts[1], parts[2]}, engine.Separator), parts[3], "Applying...")) + } else if tfEvent == runtime.TFSucceeded { + table.Update( + id, + printers.NewRow(printers.READY, + strings.Join([]string{parts[1], parts[2]}, engine.Separator), parts[3], "Apply succeeded")) + } else { + table.Update( + id, + printers.NewRow(watch.EventType("Failed"), + strings.Join([]string{parts[1], parts[2]}, engine.Separator), parts[3], "Apply failed")) + } + + // Break when all completed. + if table.AllCompleted() { + break + } + } +} + +func watchK8sResources( + id, kind string, + chs []<-chan watch.Event, + table *printers.Table, + tables map[string]*printers.Table, + healthPolicy interface{}, + gphResource *apiv1.GraphResources, + errChan chan<- error, +) { + var err error + defer func() { + if err != nil { + errChan <- WrappedErr(err, "watchK8sResources err") + close(errChan) + } + }() + + defer cmdutil.RecoverErr(&err) + + // Set resource status to `reconcile failed` before reconcile successfully. + resource := graph.FindGraphResourceByID(gphResource, id) + if resource != nil { + resource.Status = apiv1.ReconcileFail + } + + // Resources selects + cases := createSelectCases(chs) + // Default select + cases = append(cases, reflect.SelectCase{ + Dir: reflect.SelectDefault, + Chan: reflect.Value{}, + Send: reflect.Value{}, + }) + + for { + chosen, recv, recvOK := reflect.Select(cases) + if cases[chosen].Dir == reflect.SelectDefault { + continue + } + if recvOK { + e := recv.Interface().(watch.Event) + o := e.Object.(*unstructured.Unstructured) + var detail string + var ready bool + if e.Type == watch.Deleted { + detail = fmt.Sprintf("%s has beed deleted", o.GetName()) + ready = true + } else { + // Restore to actual type + target := printers.Convert(o) + // Check reconcile status with customized health policy for specific resource + if healthPolicy != nil && kind == o.GetObjectKind().GroupVersionKind().Kind { + if code, ok := kcl.ConvertKCLCode(healthPolicy); ok { + var resByte []byte + resByte, err = yaml.Marshal(o.Object) + if err != nil { + return + } + detail, ready = printers.PrintCustomizedHealthCheck(code, resByte) + } else { + detail, ready = printers.Generate(target) + } + } else { + // Check reconcile status with default setup + detail, ready = printers.Generate(target) + } + } + + // Mark ready for breaking loop + if ready { + e.Type = printers.READY + } + + // Save watched msg + table.Update( + engine.BuildIDForKubernetes(o), + printers.NewRow(e.Type, o.GetKind(), o.GetName(), detail)) + + // Write back + tables[id] = table + } + + // Break when completed + if table.AllCompleted() { + break + } + } +} + +func createSelectCases(chs []<-chan watch.Event) []reflect.SelectCase { + cases := make([]reflect.SelectCase, 0, len(chs)) + for _, ch := range chs { + cases = append(cases, reflect.SelectCase{ + Dir: reflect.SelectRecv, + Chan: reflect.ValueOf(ch), + }) + } + return cases +} + +// getResourceInfo get health policy and kind from resource for customized health check purpose +func getResourceInfo(res *apiv1.Resource) (healthPolicy interface{}, kind string) { + var ok bool + if res.Extensions != nil { + healthPolicy = res.Extensions[apiv1.FieldHealthPolicy] + } + if res.Attributes == nil { + panic(fmt.Errorf("resource has no Attributes field in the Spec: %s", res)) + } + if kind, ok = res.Attributes[apiv1.FieldKind].(string); !ok { + panic(fmt.Errorf("failed to get kind from resource attributes: %s", res.Attributes)) + } + return healthPolicy, kind +} + +func printTable(w *io.Writer, id string, tables map[string]*printers.Table) { + // Reset the buffer for live flushing. + (*w).(*bytes.Buffer).Reset() + + // Print resource Key as heading text + _, _ = fmt.Fprintln(*w, pretty.LightCyanBold("[%s]", id)) + + table, ok := tables[id] + if !ok { + // Unsupported resource, leave a hint + _, _ = fmt.Fprintln(*w, "Skip monitoring unsupported resources") + } else { + // Print table + data := table.Print() + _ = pterm.DefaultTable. + WithStyle(pterm.NewStyle(pterm.FgDefault)). + WithHeaderStyle(pterm.NewStyle(pterm.FgDefault)). + WithHasHeader().WithSeparator(" ").WithData(data).WithWriter(*w).Render() + } +} diff --git a/pkg/engine/operation/apply.go b/pkg/engine/operation/apply.go index 15eb3a9c3..3fca796db 100644 --- a/pkg/engine/operation/apply.go +++ b/pkg/engine/operation/apply.go @@ -43,7 +43,6 @@ type ApplyResponse struct { func (ao *ApplyOperation) Apply(req *ApplyRequest) (rsp *ApplyResponse, s v1.Status) { log.Infof("engine: Apply start!") o := ao.Operation - defer func() { close(o.MsgCh) diff --git a/pkg/engine/operation/port_forward.go b/pkg/engine/operation/port_forward.go index ba1da1c51..3c994809d 100644 --- a/pkg/engine/operation/port_forward.go +++ b/pkg/engine/operation/port_forward.go @@ -42,10 +42,8 @@ type PortForwardRequest struct { Port int } -func (bpo *PortForwardOperation) PortForward(req *PortForwardRequest) error { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - +func (bpo *PortForwardOperation) PortForward(req *PortForwardRequest, stopChan chan struct{}, readyChan chan struct{}) error { + ctx := context.Background() if err := validatePortForwardRequest(req); err != nil { return err } @@ -130,11 +128,10 @@ func (bpo *PortForwardOperation) PortForward(req *PortForwardRequest) error { } go func() { - err = ForwardPort(ctx, cfg, clientset, namespace, serviceName, servicePort, servicePort) + err = ForwardPort(ctx, cfg, clientset, namespace, serviceName, servicePort, servicePort, stopChan, readyChan) failed <- err }() } - err := <-failed return err } @@ -145,6 +142,7 @@ func ForwardPort( clientset *kubernetes.Clientset, namespace, serviceName string, servicePort, localPort int, + stopChan chan struct{}, readyChan chan struct{}, ) error { svc, err := clientset.CoreV1().Services(namespace).Get(ctx, serviceName, metav1.GetOptions{}) if err != nil { @@ -187,11 +185,9 @@ func ForwardPort( ports := []string{fmt.Sprintf("%d:%d", localPort, servicePort)} dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport}, "POST", url) - - stop, ready := make(chan struct{}, 1), make(chan struct{}) out, errOut := os.Stdout, os.Stderr - fw, err := portforward.NewOnAddresses(dialer, []string{"localhost"}, ports, stop, ready, out, errOut) + fw, err := portforward.NewOnAddresses(dialer, []string{"localhost"}, ports, stopChan, readyChan, out, errOut) if err != nil { return err } diff --git a/pkg/engine/operation/port_forward_test.go b/pkg/engine/operation/port_forward_test.go index 6a44719c6..0b38e86d2 100644 --- a/pkg/engine/operation/port_forward_test.go +++ b/pkg/engine/operation/port_forward_test.go @@ -85,7 +85,8 @@ func TestPortForwardOperation_PortForward(t *testing.T) { for _, tc := range testcases { t.Run(tc.name, func(t *testing.T) { bpo := &PortForwardOperation{} - err := bpo.PortForward(tc.req) + stopCh, readyCh := make(chan struct{}), make(chan struct{}) + err := bpo.PortForward(tc.req, stopCh, readyCh) if tc.expectedErr { assert.Error(t, err) diff --git a/pkg/engine/release/util.go b/pkg/engine/release/util.go index d0d077fe7..7ae34b5cc 100644 --- a/pkg/engine/release/util.go +++ b/pkg/engine/release/util.go @@ -79,6 +79,51 @@ func NewApplyRelease(storage Storage, project, stack, workspace string) (*v1.Rel return rel, nil } +// NewRollbackRelease news a release object for rollback operation, but no creation in the storage. +func NewRollbackRelease(storage Storage, project, stack, workspace string, revision uint64) (*v1.Release, error) { + if storage == nil { + return nil, fmt.Errorf("storage cannot be nil") + } + + latestRevision := storage.GetLatestRevision() + if latestRevision <= 0 { + return nil, fmt.Errorf("invalid latest revision: %d", latestRevision) + } + + lastRelease, err := storage.Get(latestRevision) + if err != nil { + return nil, err + } + if lastRelease == nil { + return nil, fmt.Errorf("last release not found for revision: %d", latestRevision) + } + + if lastRelease.Phase != v1.ReleasePhaseSucceeded && lastRelease.Phase != v1.ReleasePhaseFailed { + return nil, fmt.Errorf("cannot create a new release of project: %s, workspace: %s. There is a release:%v in progress", + project, workspace, lastRelease.Revision) + } + + if revision <= 0 { + revision = latestRevision - 1 + } + + rollbackRelease, err := storage.Get(revision) + if err != nil { + return nil, err + } + if rollbackRelease == nil { + return nil, fmt.Errorf("rollback release not found for revision: %d", revision) + } + + if rollbackRelease.Phase != v1.ReleasePhaseSucceeded { + return nil, fmt.Errorf("cannot create a new rollback release of project: %s, workspace: %s. There is a release:%v not succeeded", + project, workspace, rollbackRelease.Revision) + } + + rollbackRelease.Revision = latestRevision + 1 + return rollbackRelease, nil +} + // UpdateApplyRelease updates the release in the storage if dryRun is false. If release phase is failed, // only logging with no error return. func UpdateApplyRelease(storage Storage, rel *v1.Release, dryRun bool, relLock *sync.Mutex) error { diff --git a/pkg/engine/resource/graph/util.go b/pkg/engine/resource/graph/util.go index e1338b790..1e2c091f6 100644 --- a/pkg/engine/resource/graph/util.go +++ b/pkg/engine/resource/graph/util.go @@ -148,6 +148,9 @@ func RemoveResource(gph *v1.Graph, resource *v1.GraphResource) { // RemoveResourceIndex clears the entire resource index of the Graph. func RemoveResourceIndex(gph *v1.Graph) { + if gph == nil || gph.Resources == nil { + return + } gph.Resources.ResourceIndex = nil } diff --git a/pkg/util/signal/signal_posix.go b/pkg/util/signal/signal_posix.go index 0f76763aa..fd4c5fbc4 100644 --- a/pkg/util/signal/signal_posix.go +++ b/pkg/util/signal/signal_posix.go @@ -23,4 +23,4 @@ import ( "syscall" ) -var shutdownSignals = []os.Signal{os.Interrupt, syscall.SIGTERM} +var shutdownSignals = []os.Signal{os.Interrupt, syscall.SIGTERM, syscall.SIGTSTP, syscall.SIGINT}