Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,21 @@ integTest(
eventId: expect.stringContaining(':1'),
}),
}),
expect.objectContaining({
counters: expect.objectContaining({
buildAsset_cnt: expect.any(Number),
buildAsset_ms: expect.any(Number),
fileAsset: 1,
publishAsset_cnt: expect.any(Number),
publishAsset_ms: expect.any(Number),
}),
identifiers: expect.objectContaining({
eventId: expect.stringContaining(':2'),
}),
event: expect.objectContaining({
eventType: 'ASSET',
}),
}),
expect.objectContaining({
event: expect.objectContaining({
command: expect.objectContaining({
Expand All @@ -39,7 +54,7 @@ integTest(
eventType: 'DEPLOY',
}),
identifiers: expect.objectContaining({
eventId: expect.stringContaining(':2'),
eventId: expect.stringContaining(':3'),
}),
}),
expect.objectContaining({
Expand All @@ -51,7 +66,15 @@ integTest(
eventType: 'INVOKE',
}),
identifiers: expect.objectContaining({
eventId: expect.stringContaining(':3'),
eventId: expect.stringContaining(':4'),
}),
counters: expect.objectContaining({
init_ms: expect.any(Number),
load_ms: expect.any(Number),
publishAssetST_ms: expect.any(Number),
totalDeployTime_ms: expect.any(Number),
totalDeployedResources: expect.any(Number),
totalDeployedStacks: expect.any(Number),
}),
}),
]);
Expand Down
18 changes: 16 additions & 2 deletions packages/@aws-cdk/toolkit-lib/lib/api/io/private/span.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import '../../../private/dispose-polyfill';
import { randomUUID } from 'node:crypto';
import * as util from 'node:util';
import type { IoDefaultMessages } from './io-default-messages';
Expand Down Expand Up @@ -117,13 +118,19 @@ export interface IMessageSpan<E extends SpanEnd> extends IActionAwareIoHost {
* `<name>_cnt` keys.
*/
startTimer(name: string): ITimer;

/**
* Record a finished time with a given duration
*/
addTimer(name: string, duration: number): void;
}

/**
* A timer to time an operation in a span.
*/
export interface ITimer {
stop(): void;
[Symbol.dispose](): void;
}

/**
Expand Down Expand Up @@ -235,14 +242,21 @@ class MessageSpan<S extends object, E extends SpanEnd> implements IMessageSpan<E
const t: ITimer = {
stop: () => {
this.openTimers.delete(t);
this.incCounter(`${name}_ms`, Math.floor(Date.now() - start) / 1000);
this.incCounter(`${name}_cnt`, 1);
this.addTimer(name, Math.floor(Date.now() - start));
},
[Symbol.dispose]() {
this.stop();
},
};
this.openTimers.add(t);
return t;
}

public addTimer(name: string, durationMs: number): void {
this.incCounter(`${name}_ms`, Math.floor(durationMs));
this.incCounter(`${name}_cnt`, 1);
}

private time() {
const elapsedTime = new Date().getTime() - this.startTime;
return {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { AssetManifest, type IManifestEntry } from '@aws-cdk/cdk-assets-lib';
import * as cxapi from '@aws-cdk/cloud-assembly-api';
import { WorkGraph } from './work-graph';
import type { AssetBuildNode, WorkNode } from './work-graph-types';
import type { AssetBuildNode, MarkerNode, WorkNode } from './work-graph-types';
import { DeploymentState } from './work-graph-types';
import { ToolkitError } from '../../toolkit/toolkit-error';
import { contentHashAny } from '../../util';
Expand All @@ -11,6 +11,9 @@ export class WorkGraphBuilder {
/**
* Default priorities for nodes
*
* Messages have the highest priority to ensure they are emitted as soon as
* possible (because they will be used for measuring timings).
*
* Assets builds have higher priority than the other two operations, to make good on our promise that
* '--prebuild-assets' will actually do assets before stacks (if it can). Unfortunately it is the
* default :(
Expand All @@ -21,6 +24,7 @@ export class WorkGraphBuilder {
'asset-build': 10,
'asset-publish': 0,
'stack': 5,
'marker': 100,
};
private readonly graph: WorkGraph;
private readonly ioHelper: IoHelper;
Expand Down Expand Up @@ -53,8 +57,37 @@ export class WorkGraphBuilder {
// Just the artifact identifier
const assetId = asset.id.assetId;

const buildId = `build-${assetId}-${contentHashAny([assetId, asset.genericSource]).substring(0, 10)}`;
const publishId = `publish-${assetId}-${contentHashAny([assetId, asset.genericDestination]).substring(0, 10)}`;
// Build node, contains hash of source (build only once)
const sourceHash = contentHashAny([assetId, asset.genericSource]).substring(0, 10);
const buildId = `build-${assetId}-${sourceHash}`;
// Publish node, contains hash of both source and dest
const publishId = `publish-${assetId}-${sourceHash}${contentHashAny([assetId, asset.genericDestination]).substring(0, 10)}`;

// Message to emit when we start on an asset
const startId = `start-${assetId}`;
if (!this.graph.tryGetNode(startId)) {
this.graph.addNodes({
type: 'marker',
id: startId,
dependencies: new Set(),
deploymentState: DeploymentState.PENDING,
priority: WorkGraphBuilder.PRIORITIES.marker,
marker: { type: 'start-asset', asset },
} satisfies MarkerNode);
}

// Message to emit when we end an asset
const endId = `end-${assetId}`;
if (!this.graph.tryGetNode(endId)) {
this.graph.addNodes({
type: 'marker',
id: endId,
dependencies: new Set(),
deploymentState: DeploymentState.PENDING,
priority: WorkGraphBuilder.PRIORITIES.marker,
marker: { type: 'end-asset', asset },
} satisfies MarkerNode);
}

// Build node only gets added once because they are all the same
if (!this.graph.tryGetNode(buildId)) {
Expand All @@ -63,6 +96,8 @@ export class WorkGraphBuilder {
id: buildId,
note: asset.displayName(false),
dependencies: new Set([
// Build depends on the start message having been emitted.
startId,
...this.stackArtifactIds(assetManifestArtifact.dependencies),
// If we disable prebuild, then assets inherit (stack) dependencies from their parent stack
...!this.prebuildAssets ? this.stackArtifactIds(onlyStacks(parentStack.dependencies)) : [],
Expand Down Expand Up @@ -93,6 +128,9 @@ export class WorkGraphBuilder {
deploymentState: DeploymentState.PENDING,
priority: WorkGraphBuilder.PRIORITIES['asset-publish'],
});

// Every publish node that we add must complete before we fire the "done with publishing this asset" message.
this.graph.addDependency(endId, publishId);
}

for (const inheritedDep of this.stackArtifactIds(onlyStacks(parentStack.dependencies))) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,21 @@ export enum DeploymentState {
SKIPPED = 'skipped',
}

export type WorkNode = StackNode | AssetBuildNode | AssetPublishNode;
export type WorkNode = StackNode | AssetBuildNode | AssetPublishNode | MarkerNode;

export interface WorkNodeCommon {
readonly id: string;
readonly dependencies: Set<string>;
deploymentState: DeploymentState;
/** Some readable information to attach to the id, which may be unreadable */
readonly note?: string;
/** Sort by priority when picking up work, higher is earlier */
readonly priority?: number;
}

export interface StackNode extends WorkNodeCommon {
readonly type: 'stack';
readonly stack: cxapi.CloudFormationStackArtifact;
/** Sort by priority when picking up work, higher is earlier */
readonly priority?: number;
}

export interface AssetBuildNode extends WorkNodeCommon {
Expand All @@ -37,8 +37,6 @@ export interface AssetBuildNode extends WorkNodeCommon {
readonly parentStack: cxapi.CloudFormationStackArtifact;
/** The asset that needs to be built */
readonly asset: IManifestEntry;
/** Sort by priority when picking up work, higher is earlier */
readonly priority?: number;
}

export interface AssetPublishNode extends WorkNodeCommon {
Expand All @@ -51,6 +49,18 @@ export interface AssetPublishNode extends WorkNodeCommon {
readonly parentStack: cxapi.CloudFormationStackArtifact;
/** The asset that needs to be published */
readonly asset: IManifestEntry;
/** Sort by priority when picking up work, higher is earlier */
readonly priority?: number;
}

/**
* A node that represents a message to the IoHost to be emitted at a certain point in the graph execution.
*
* These are used to bookend certain parts of the graph.
*/
export interface MarkerNode extends WorkNodeCommon {
readonly type: 'marker';

/** The event to emit when this node is reached. */
readonly marker:
| { type: 'start-asset'; asset: IManifestEntry }
| { type: 'end-asset'; asset: IManifestEntry };
}
10 changes: 9 additions & 1 deletion packages/@aws-cdk/toolkit-lib/lib/api/work-graph/work-graph.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { WorkNode, StackNode, AssetBuildNode, AssetPublishNode } from './work-graph-types';
import type { WorkNode, StackNode, AssetBuildNode, AssetPublishNode, MarkerNode } from './work-graph-types';
import { DeploymentState } from './work-graph-types';
import { ToolkitError } from '../../toolkit/toolkit-error';
import { parallelPromises } from '../../util';
Expand Down Expand Up @@ -30,6 +30,8 @@ export class WorkGraph {
throw new ToolkitError('DuplicateNodeId', `Duplicate use of node id: ${node.id}`);
}

// If there are dependencies added before the node itself is added, copy them onto the target
// object and remove them from the lazy table.
const ld = this.lazyDependencies.get(node.id);
if (ld) {
for (const x of ld) {
Expand Down Expand Up @@ -120,6 +122,9 @@ export class WorkGraph {
case 'asset-publish':
await actions.publishAsset(x);
break;
case 'marker':
await actions.marker(x);
break;
}
});
}
Expand Down Expand Up @@ -157,6 +162,7 @@ export class WorkGraph {
'asset-build': n,
'asset-publish': n,
'stack': n,
'marker': 1,
} : n;
const totalMax = typeof n === 'number' ? n : sum(Object.values(n));

Expand All @@ -165,6 +171,7 @@ export class WorkGraph {
'asset-build': 0,
'asset-publish': 0,
'stack': 0,
'marker': 0,
};
function totalActive() {
return sum(Object.values(active));
Expand Down Expand Up @@ -406,6 +413,7 @@ export interface WorkGraphActions {
deployStack: (stackNode: StackNode) => Promise<void>;
buildAsset: (assetNode: AssetBuildNode) => Promise<void>;
publishAsset: (assetNode: AssetPublishNode) => Promise<void>;
marker: (markerNode: MarkerNode) => Promise<void>;
}

function sum(xs: number[]) {
Expand Down
5 changes: 5 additions & 0 deletions packages/@aws-cdk/toolkit-lib/lib/toolkit/toolkit.ts
Original file line number Diff line number Diff line change
Expand Up @@ -567,13 +567,15 @@ export class Toolkit extends CloudAssemblySourceBuilder {
'stack': 1,
'asset-build': concurrency,
'asset-publish': concurrency,
'marker': 1,
};

await workGraph.doParallel(graphConcurrency, {
// No-op: we're only publishing assets, not deploying
deployStack: WorkGraph.NOOP,
buildAsset: this.createBuildAssetFunction(ioHelper, deployments, undefined),
publishAsset: this.createPublishAssetFunction(ioHelper, deployments, undefined, options.force),
marker: WorkGraph.NOOP,
});

await ioHelper.notify(IO.CDK_TOOLKIT_I9402.msg(chalk.green('\n✨ Assets published successfully\n'), { assets }));
Expand Down Expand Up @@ -1049,12 +1051,15 @@ export class Toolkit extends CloudAssemblySourceBuilder {
'stack': concurrency,
'asset-build': (options.assetParallelism ?? true) ? options.assetBuildConcurrency ?? 1 : 1, // This will be CPU-bound/memory bound, mostly matters for Docker builds
'asset-publish': (options.assetParallelism ?? true) ? 8 : 1, // This will be I/O-bound, 8 in parallel seems reasonable
'marker': 1,
};

await workGraph.doParallel(graphConcurrency, {
deployStack,
buildAsset,
publishAsset,
// Markers are only used for telemetry, and the toolkit-lib isn't currently collecting any, so NOOP is fine.
marker: WorkGraph.NOOP,
});

return ret;
Expand Down
Loading
Loading