Skip to content

Commit

Permalink
Update by Kevin's advice
Browse files Browse the repository at this point in the history
Signed-off-by: Future-Outlier <[email protected]>
  • Loading branch information
Future-Outlier committed Jan 17, 2025
1 parent c992eae commit 0b91b5c
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 17 deletions.
30 changes: 30 additions & 0 deletions flyteadmin/pkg/repositories/transformers/node_execution_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,36 @@ func TestAddTerminalState_Error(t *testing.T) {
assert.Equal(t, time.Minute, nodeExecutionModel.Duration)
}

func TestAddTerminalState_DeckURIInFailedExecution(t *testing.T) {
error := &core.ExecutionError{
Code: "foo",
}
request := admin.NodeExecutionEventRequest{
Event: &event.NodeExecutionEvent{
Phase: core.NodeExecution_FAILED,
OutputResult: &event.NodeExecutionEvent_Error{
Error: error,
},
OccurredAt: occurredAtProto,
DeckUri: DeckURI,
},
}
startedAt := occurredAt.Add(-time.Minute)
startedAtProto, _ := ptypes.TimestampProto(startedAt)
nodeExecutionModel := models.NodeExecution{
StartedAt: &startedAt,
}
closure := admin.NodeExecutionClosure{
StartedAt: startedAtProto,
}
err := addTerminalState(context.TODO(), &request, &nodeExecutionModel, &closure,
interfaces.InlineEventDataPolicyStoreInline, commonMocks.GetMockStorageClient())
assert.Nil(t, err)
assert.True(t, proto.Equal(error, closure.GetError()))
assert.Equal(t, time.Minute, nodeExecutionModel.Duration)
assert.Equal(t, DeckURI, closure.GetDeckUri())
}

func TestCreateNodeExecutionModel(t *testing.T) {
parentTaskExecID := uint(8)
request := &admin.NodeExecutionEventRequest{
Expand Down
32 changes: 15 additions & 17 deletions flytepropeller/pkg/controller/nodes/task/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,26 +91,21 @@ func (p *pluginRequestedTransition) AddDeckURI(tCtx *taskExecutionContext) {
p.execInfo.OutputInfo.DeckURI = deckURI
}

func (p *pluginRequestedTransition) AddDeckURIIfDeckExists(ctx context.Context, tCtx *taskExecutionContext) error {
func (p *pluginRequestedTransition) RemoveDeckURIIfDeckNotExists(ctx context.Context, tCtx *taskExecutionContext) error {
reader := tCtx.ow.GetReader()
if reader == nil && p.execInfo.OutputInfo != nil {
p.execInfo.OutputInfo.DeckURI = nil
if reader == nil {
return nil
}

Check warning on line 98 in flytepropeller/pkg/controller/nodes/task/handler.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/task/handler.go#L97-L98

Added lines #L97 - L98 were not covered by tests

exists, err := reader.DeckExists(ctx)
if err != nil {
logger.Errorf(ctx, "Failed to check deck file existence. Error: %v", err)
p.execInfo.OutputInfo.DeckURI = nil
return regErrors.Wrapf(err, "failed to check existence of deck file")
}

Check warning on line 105 in flytepropeller/pkg/controller/nodes/task/handler.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/task/handler.go#L102-L105

Added lines #L102 - L105 were not covered by tests

if p.execInfo.OutputInfo == nil {
p.execInfo.OutputInfo = &handler.OutputInfo{}
}

if exists {
deckURIValue := tCtx.ow.GetDeckPath()
p.execInfo.OutputInfo.DeckURI = &deckURIValue
if !exists && p.execInfo.OutputInfo != nil {
p.execInfo.OutputInfo.DeckURI = nil
}

return nil
Expand Down Expand Up @@ -548,20 +543,23 @@ func (t Handler) invokePlugin(ctx context.Context, p pluginCore.Plugin, tCtx *ta
if err != nil {
return nil, err
}

Check warning on line 545 in flytepropeller/pkg/controller/nodes/task/handler.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/task/handler.go#L544-L545

Added lines #L544 - L545 were not covered by tests

if deckStatus == DeckEnabled {
pluginTrns.AddDeckURI(tCtx)
}

Check warning on line 549 in flytepropeller/pkg/controller/nodes/task/handler.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/task/handler.go#L548-L549

Added lines #L548 - L549 were not covered by tests

defer func() {
if (deckStatus == DeckUnknown || deckStatus == DeckEnabled) && pluginTrns.pInfo.Phase().IsTerminal() {
if err := pluginTrns.RemoveDeckURIIfDeckNotExists(ctx, tCtx); err != nil {
logger.Errorf(ctx, "Failed to remove deck URI if deck does not exist. Error: %v", err)
}

Check warning on line 555 in flytepropeller/pkg/controller/nodes/task/handler.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/task/handler.go#L554-L555

Added lines #L554 - L555 were not covered by tests
}
}()

switch pluginTrns.pInfo.Phase() {
case pluginCore.PhaseSuccess:
if deckStatus == DeckUnknown {
// This is for backward compatibility with older Flytekit versions.
// Older Flytekit versions did not set the `generates_deck` flag in the task template's metadata.
// So, we need to add deck URI to the event if it exists.
err = pluginTrns.AddDeckURIIfDeckExists(ctx, tCtx)
}
if err != nil {
return pluginTrns, err
pluginTrns.AddDeckURI(tCtx)
}
// -------------------------------------
// TODO: @kumare create Issue# Remove the code after we use closures to handle dynamic nodes
Expand Down

0 comments on commit 0b91b5c

Please sign in to comment.