@@ -8,19 +8,27 @@ const { getArnPartition } = require('../../utils/arn');
8
8
9
9
const logger = require ( '../../utils/logger' ) ;
10
10
11
- function getTaskStates ( states ) {
11
+ function getTaskStates ( states , stateMachineName ) {
12
12
return _ . flatMap ( states , ( state ) => {
13
13
switch ( state . Type ) {
14
14
case 'Task' : {
15
15
return [ state ] ;
16
16
}
17
17
case 'Parallel' : {
18
18
const parallelStates = _ . flatMap ( state . Branches , branch => _ . values ( branch . States ) ) ;
19
- return getTaskStates ( parallelStates ) ;
19
+ return getTaskStates ( parallelStates , stateMachineName ) ;
20
20
}
21
21
case 'Map' : {
22
22
const mapStates = state . ItemProcessor ? state . ItemProcessor . States : state . Iterator . States ;
23
- const taskStates = getTaskStates ( mapStates ) ;
23
+ const taskStates = getTaskStates ( mapStates , stateMachineName ) ;
24
+ if ( state . ItemProcessor && state . ItemProcessor . ProcessorConfig
25
+ && state . ItemProcessor . ProcessorConfig . Mode === 'DISTRIBUTED' ) {
26
+ taskStates . push ( {
27
+ Resource : 'arn:aws:states:::states:startExecution' ,
28
+ Mode : 'DISTRIBUTED' ,
29
+ StateMachineName : stateMachineName ,
30
+ } ) ;
31
+ }
24
32
if ( state . ItemReader ) {
25
33
taskStates . push ( state . ItemReader ) ;
26
34
}
@@ -303,9 +311,16 @@ function getLambdaPermissions(state) {
303
311
}
304
312
305
313
function getStepFunctionsPermissions ( state ) {
306
- const stateMachineArn = state . Parameters [ 'StateMachineArn.$' ]
307
- ? '*'
308
- : state . Parameters . StateMachineArn ;
314
+ let stateMachineArn = state . Mode === 'DISTRIBUTED' ? {
315
+ 'Fn::Sub' : [
316
+ `arn:aws:states:\${AWS::Region}:\${AWS::AccountId}:stateMachine:${ state . StateMachineName } ` ,
317
+ ] ,
318
+ } : null ;
319
+
320
+ if ( ! stateMachineArn ) {
321
+ stateMachineArn = state . Parameters [ 'StateMachineArn.$' ] ? '*'
322
+ : state . Parameters . StateMachineArn ;
323
+ }
309
324
310
325
return [ {
311
326
action : 'states:StartExecution' ,
@@ -591,7 +606,7 @@ module.exports = {
591
606
throw new Error ( `Missing "definition" for state machine ${ stateMachineName } ` ) ;
592
607
}
593
608
594
- const taskStates = getTaskStates ( stateMachineObj . definition . States ) ;
609
+ const taskStates = getTaskStates ( stateMachineObj . definition . States , stateMachineName ) ;
595
610
let iamPermissions = getIamPermissions . bind ( this ) ( taskStates ) ;
596
611
597
612
if ( stateMachineObj . loggingConfig ) {
0 commit comments