Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

v3.5.2-atlan-0.9 #10

Open
wants to merge 12 commits into
base: release-3.5.2
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions pkg/apis/workflow/v1alpha1/cluster_workflow_template_types.go
Original file line number Diff line number Diff line change
@@ -57,6 +57,11 @@ func (cwftmpl *ClusterWorkflowTemplate) GetResourceScope() ResourceScope {
return ResourceScopeCluster
}

// GetPodMetadata returns the PodMetadata of cluster workflow template.
func (cwftmpl *ClusterWorkflowTemplate) GetPodMetadata() *Metadata {
return cwftmpl.Spec.PodMetadata
}

// GetWorkflowSpec returns the WorkflowSpec of cluster workflow template.
func (cwftmpl *ClusterWorkflowTemplate) GetWorkflowSpec() *WorkflowSpec {
return &cwftmpl.Spec
1 change: 1 addition & 0 deletions pkg/apis/workflow/v1alpha1/common.go
Original file line number Diff line number Diff line change
@@ -20,6 +20,7 @@ type TemplateHolder interface {
GroupVersionKind() schema.GroupVersionKind
GetTemplateByName(name string) *Template
GetResourceScope() ResourceScope
GetPodMetadata() *Metadata
}

// WorkflowSpecHolder is an object that holds a WorkflowSpec; e.g., WorkflowTemplate, and ClusterWorkflowTemplate
5 changes: 5 additions & 0 deletions pkg/apis/workflow/v1alpha1/workflow_template_types.go
Original file line number Diff line number Diff line change
@@ -56,6 +56,11 @@ func (wftmpl *WorkflowTemplate) GetResourceScope() ResourceScope {
return ResourceScopeNamespaced
}

// GetPodMetadata returns the PodMetadata of workflow template.
func (wftmpl *WorkflowTemplate) GetPodMetadata() *Metadata {
return wftmpl.Spec.PodMetadata
}

// GetWorkflowSpec returns the WorkflowSpec of workflow template.
func (wftmpl *WorkflowTemplate) GetWorkflowSpec() *WorkflowSpec {
return &wftmpl.Spec
5 changes: 5 additions & 0 deletions pkg/apis/workflow/v1alpha1/workflow_types.go
Original file line number Diff line number Diff line change
@@ -3353,6 +3353,11 @@ func (wf *Workflow) GetResourceScope() ResourceScope {
return ResourceScopeLocal
}

// GetPodMetadata returns the PodMetadata of a workflow.
func (wf *Workflow) GetPodMetadata() *Metadata {
return wf.Spec.PodMetadata
}

// GetWorkflowSpec returns the Spec of a workflow.
func (wf *Workflow) GetWorkflowSpec() WorkflowSpec {
return wf.Spec
3 changes: 3 additions & 0 deletions workflow/artifacts/azure/azure.go
Original file line number Diff line number Diff line change
@@ -121,16 +121,19 @@ func (azblobDriver *ArtifactDriver) Load(artifact *wfv1.Artifact, path string) e
}
isEmptyFile = true
} else if !bloberror.HasCode(origErr, bloberror.BlobNotFound) {
_ = os.Remove(path)
return fmt.Errorf("unable to download blob %s: %s", artifact.Azure.Blob, origErr)
}

isDir, err := azblobDriver.IsDirectory(artifact)
if err != nil {
_ = os.Remove(path)
return fmt.Errorf("unable to determine if %s is a directory: %s", artifact.Azure.Blob, err)
}

// It's not a directory and the file doesn't exist, Return the original NoSuchKey error.
if !isDir && !isEmptyFile {
_ = os.Remove(path)
return argoerrors.New(argoerrors.CodeNotFound, origErr.Error())
}

2 changes: 1 addition & 1 deletion workflow/controller/agent.go
Original file line number Diff line number Diff line change
@@ -240,7 +240,7 @@ func (woc *wfOperationCtx) createAgentPod(ctx context.Context) (*apiv1.Pod, erro
}

tmpl := &wfv1.Template{}
addSchedulingConstraints(pod, woc.execWf.Spec.DeepCopy(), tmpl)
woc.addSchedulingConstraints(pod, woc.execWf.Spec.DeepCopy(), tmpl, "")
woc.addMetadata(pod, tmpl)

if woc.controller.Config.InstanceID != "" {
4 changes: 2 additions & 2 deletions workflow/controller/container_set_template.go
Original file line number Diff line number Diff line change
@@ -7,7 +7,7 @@ import (
wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
)

func (woc *wfOperationCtx) executeContainerSet(ctx context.Context, nodeName string, templateScope string, tmpl *wfv1.Template, orgTmpl wfv1.TemplateReferenceHolder, opts *executeTemplateOpts) (*wfv1.NodeStatus, error) {
func (woc *wfOperationCtx) executeContainerSet(ctx context.Context, nodeName string, templateScope string, tmpl *wfv1.Template, orgTmpl wfv1.TemplateReferenceHolder, opts *executeTemplateOpts, localParams map[string]string) (*wfv1.NodeStatus, error) {
node, err := woc.wf.GetNodeByName(nodeName)
if err != nil {
node = woc.initializeExecutableNode(nodeName, wfv1.NodeTypePod, templateScope, tmpl, orgTmpl, opts.boundaryID, wfv1.NodePending, opts.nodeFlag)
@@ -21,7 +21,7 @@ func (woc *wfOperationCtx) executeContainerSet(ctx context.Context, nodeName str
includeScriptOutput: includeScriptOutput,
onExitPod: opts.onExitTemplate,
executionDeadline: opts.executionDeadline,
})
}, localParams)
if err != nil {
return woc.requeueIfTransientErr(err, node.Name)
}
51 changes: 37 additions & 14 deletions workflow/controller/controller.go
Original file line number Diff line number Diff line change
@@ -632,13 +632,23 @@ func (wfc *WorkflowController) deleteOffloadedNodesForWorkflow(uid string, versi
if !ok {
return fmt.Errorf("object %+v is not an unstructured", workflows[0])
}
key := un.GetNamespace() + "/" + un.GetName()
wfc.workflowKeyLock.Lock(key)
defer wfc.workflowKeyLock.Unlock(key)

obj, ok := wfc.getWorkflowByKey(key)
if !ok {
return fmt.Errorf("failed to get workflow by key after locking")
}
un, ok = obj.(*unstructured.Unstructured)
if !ok {
return fmt.Errorf("object %+v is not an unstructured", obj)
}
wf, err = util.FromUnstructured(un)
if err != nil {
return err
}
key := wf.ObjectMeta.Namespace + "/" + wf.ObjectMeta.Name
wfc.workflowKeyLock.Lock(key)
defer wfc.workflowKeyLock.Unlock(key)

// workflow might still be hydrated
if wfc.hydrator.IsHydrated(wf) {
log.WithField("uid", wf.UID).Info("Hydrated workflow encountered")
@@ -712,20 +722,14 @@ func (wfc *WorkflowController) processNextItem(ctx context.Context) bool {
}
defer wfc.wfQueue.Done(key)

obj, exists, err := wfc.wfInformer.GetIndexer().GetByKey(key.(string))
if err != nil {
log.WithFields(log.Fields{"key": key, "error": err}).Error("Failed to get workflow from informer")
return true
}
if !exists {
// This happens after a workflow was labeled with completed=true
// or was deleted, but the work queue still had an entry for it.
return true
}

wfc.workflowKeyLock.Lock(key.(string))
defer wfc.workflowKeyLock.Unlock(key.(string))

obj, ok := wfc.getWorkflowByKey(key.(string))
if !ok {
return true
}

// The workflow informer receives unstructured objects to deal with the possibility of invalid
// workflow manifests that are unable to unmarshal to workflow objects
un, ok := obj.(*unstructured.Unstructured)
@@ -794,6 +798,20 @@ func (wfc *WorkflowController) processNextItem(ctx context.Context) bool {
return true
}

func (wfc *WorkflowController) getWorkflowByKey(key string) (interface{}, bool) {
obj, exists, err := wfc.wfInformer.GetIndexer().GetByKey(key)
if err != nil {
log.WithFields(log.Fields{"key": key, "error": err}).Error("Failed to get workflow from informer")
return nil, false
}
if !exists {
// This happens after a workflow was labeled with completed=true
// or was deleted, but the work queue still had an entry for it.
return nil, false
}
return obj, true
}

func reconciliationNeeded(wf metav1.Object) bool {
return wf.GetLabels()[common.LabelKeyCompleted] != "true" || slices.Contains(wf.GetFinalizers(), common.FinalizerArtifactGC)
}
@@ -929,6 +947,11 @@ func (wfc *WorkflowController) archiveWorkflow(ctx context.Context, obj interfac
}
wfc.workflowKeyLock.Lock(key)
defer wfc.workflowKeyLock.Unlock(key)
key, err = cache.MetaNamespaceKeyFunc(obj)
if err != nil {
log.Error("failed to get key for object after locking")
return
}
err = wfc.archiveWorkflowAux(ctx, obj)
if err != nil {
log.WithField("key", key).WithError(err).Error("failed to archive workflow")
Loading