Skip to content

Commit

Permalink
Initial support for AWS metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
pcholakov committed Jan 8, 2024
1 parent f7c55b3 commit 61e1cf5
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 18 deletions.
32 changes: 25 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,19 +1,37 @@
# AccountingDB: A high-performance financial ledger based on DynamoDB

This project implements a financial tracking component that heavily leans on AWS
components. At the core, durable stage storage is provided by DynamoDB. Other
aspects of the solution are implemented by maximally relying on a serverless
This project implements a financial tracking component that heavily leans on AWS components. At the core, durable stage
storage is provided by DynamoDB. Other aspects of the solution are implemented by maximally relying on a serverless
approach in order to achieve extremely low human operational cost.

## Operations

Transfers are transactional debits/credits between pairs of accounts. You
specify a credit and debit account, and an amount, and the system will accept or
reject the transfer depending on which logic rules are active.
Transfers are transactional debits/credits between pairs of accounts. You specify a credit and debit account, and an
amount, and the system will accept or reject the transfer depending on which logic rules are active.

## Deploying a test stack
## Deploying the stack

The stack creates a single on-demand billing DynamoDB table and the benchmark runner Lambda Function. These have no idle
cost if left running. Depending on how many other dashboards you have in your account, the benchmark dashboard may
exceed your free tier allowance and attract a charge. Data stored in DynamoDB will be billed according to the

```shell
npm run deploy
eval $(./bin/get-benchmark-function-name.sh)
```

### Read benchmark and sparse account balances

Note that for the read workload to provide representative data, you will want to ensure that account entries exist for
all accounts in the range. We don't yet have a mechanism to fill these in. For accounts that do not exist, the cost of
returning "not found" may be different from the cost of performing a balance read; hence be careful when setting up the
read benchmark.

## Running load tests

The scripts `invoke-benchmark.sh` and `invoke-parallel.sh` can be used to start 1 or N parallel runs with the
configuration defined in `benchmark-request.json`. When using parallel benchmarks, the aggregations returned by any of
the runners only capture the data of that particular runner; instead look to the CloudWatch Dashboard for the aggregated
statistics.

Note that with high-resolution metrics, you only have three hours to see the second-level resolution data.
14 changes: 7 additions & 7 deletions bin/benchmark-local.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,16 @@ import { CreateTransfersLoadTest, ReadAccountBalancesLoadTest } from "../lib/loa
// Load test parameters

const testDurationSeconds = 10;
const numAccounts = 1_000_000;
const numAccounts = 1_000;
const hotAccounts = 1_000;

const readRate = 0; // Set to 0 to disable
const readConcurrency = 1;
const readBatchSize = 1;
const readRate = 500; // Set to 0 to disable
const readConcurrency = 5;
const readBatchSize = 5;

const writeRate = 10; // Set to 0 to disable
const writeConcurrency = 1;
const writeBatchSize = 1;
const writeRate = 500; // Set to 0 to disable
const writeConcurrency = 5;
const writeBatchSize = 5;
const writeAccountSelectionStrategy = AccountSelectionStrategy.RANDOM_PEER_TO_PEER;

const requestTimeoutMs = 100;
Expand Down
2 changes: 1 addition & 1 deletion bin/benchmark-request.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"durationSeconds": 5,
"durationSeconds": 60,
"numAccounts": 1000000,

"writeRate": 10,
Expand Down
57 changes: 54 additions & 3 deletions lib/load-test-runner.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,17 @@
import assert from "assert";
import { createHistogram, performance, RecordableHistogram } from "perf_hooks";
import { Configuration, createMetricsLogger, MetricsLogger, StorageResolution, Unit } from "aws-embedded-metrics";

export interface TestMetadata {
/**
* The test name will be used as a metric dimension for reporting.
*/
name: string;
}

export interface Test {
metadata(): TestMetadata;

setup(): Promise<void>;

teardown(): Promise<void>;
Expand All @@ -19,6 +29,12 @@ export interface Test {
}

export abstract class AbstractBaseTest implements Test {
metadata() {
return {
name: this.constructor.name,
};
}

async setup(): Promise<void> {
}

Expand All @@ -36,16 +52,25 @@ export abstract class AbstractBaseTest implements Test {
}
}

export const METRIC_NAMESPACE = "AccountingDB";
Configuration.namespace = METRIC_NAMESPACE;

export type MetricNames = "Success" // overall success (1) / failure (0) of a single batch
| "Latency" // elapsed time from batch arrival time to commit
| "ServiceTime" // elapsed time from batch processing start time to commit
| "BatchSize"; // number of successfully processed items in a single batch; not emitted for failed transactions

const METRICS_RESOLUTION = StorageResolution.High;

export class LoadTestDriver {
private readonly concurrency: number;
private readonly targetRps: number;
private readonly workerCycleTimeMs: number;
private readonly arrivalIntervalTimeMs: number;
private readonly test: Test;
private readonly name: string;
private readonly overallDurationMs: number;
private readonly warmupDurationMs: number;
private readonly requestLatencyMicros: RecordableHistogram;
private readonly serviceTimeMicros: RecordableHistogram;
private readonly timeoutValueMs: number;
private completedIterationsCount: number = 0;
private scheduledIterationsCount: number = 0;
Expand All @@ -59,6 +84,10 @@ export class LoadTestDriver {
private workerRunTime: number = 0;
private workerBackoffTime: number = 0;
private workerBehindScheduleTime: number = 0;
// We track metrics internally, and optionally post them to CloudWatch. The latter is great for distributed use.
private readonly requestLatencyMicros: RecordableHistogram;
private readonly serviceTimeMicros: RecordableHistogram;
private readonly metrics: MetricsLogger;

constructor(
test: Test,
Expand All @@ -85,10 +114,12 @@ export class LoadTestDriver {
this.test = test;
this.requestLatencyMicros = createHistogram();
this.serviceTimeMicros = createHistogram();
this.metrics = createMetricsLogger();
this.name = test.metadata().name;
}

async run(): Promise<any> {
if (this.targetRps == 0) {
if (this.targetRps <= 0) {
return;
}

Expand All @@ -105,11 +136,16 @@ export class LoadTestDriver {

while (nextRequestTime < endTime) {
const cutoffTime = performance.now() - this.timeoutValueMs;
// Prune expired-in-queue requests from the work queue and record timeouts:
while (this.workQueue.length > 0 && this.workQueue[0] < cutoffTime) {
assert(this.workQueue.shift() !== undefined);
this.metrics.putDimensions({ Name: this.name });
this.metrics.putMetric($m("Latency"), this.timeoutValueMs, Unit.Milliseconds, METRICS_RESOLUTION);
this.metrics.putMetric($m("Success"), 0, Unit.None, METRICS_RESOLUTION);
this.missedIterations += 1;
this.recordDuration(this.requestLatencyMicros, this.timeoutValueMs * 1000);
}
await this.metrics.flush();

while (this.workQueue.length < this.concurrency * 2 && nextRequestTime < endTime) {
this.workQueue.push(nextRequestTime);
Expand All @@ -129,14 +165,18 @@ export class LoadTestDriver {
// Skip over any scheduled iterations that have already timed out in-queue
const cutoffTime = workerLoopStart - this.timeoutValueMs;
let arrivalTime = this.workQueue.shift();
this.metrics.putDimensions({ Name: this.name });
for (; arrivalTime !== undefined && arrivalTime < cutoffTime; arrivalTime = this.workQueue.shift()) {
// Only record timeouts post-warmup
if (arrivalTime > measurementStartTime) {
this.missedIterations += 1;
this.recordDuration(this.requestLatencyMicros, Math.round(this.timeoutValueMs * 1000));
this.metrics.putMetric($m("Latency"), this.timeoutValueMs, Unit.Milliseconds, METRICS_RESOLUTION);
this.metrics.putMetric($m("Success"), 0, Unit.None, METRICS_RESOLUTION);
arrivalTime = this.workQueue.shift();
}
}
await this.metrics.flush();

// No more work for this worker to do
if (arrivalTime === undefined) {
Expand Down Expand Up @@ -181,6 +221,13 @@ export class LoadTestDriver {
this.completedIterationsCount += 1;
this.requestCount += this.requestsPerIteration;
this.workerRunTime += serviceTimeMillis;

this.metrics.putDimensions({ Name: this.name });
this.metrics.putMetric($m("Latency"), iterationDurationMillis, Unit.Milliseconds, METRICS_RESOLUTION);
this.metrics.putMetric($m("ServiceTime"), serviceTimeMillis, Unit.Milliseconds, METRICS_RESOLUTION);
this.metrics.putMetric($m("BatchSize"), this.requestsPerIteration, Unit.Count, METRICS_RESOLUTION);
this.metrics.putMetric($m("Success"), 1, Unit.None, METRICS_RESOLUTION);
await this.metrics.flush();
}
} while (true);
};
Expand Down Expand Up @@ -264,3 +311,7 @@ export class LoadTestDriver {
export async function sleep(ms: number) {
await new Promise((resolve) => setTimeout(resolve, ms));
}

function $m(metric: MetricNames) {
return metric;
}
12 changes: 12 additions & 0 deletions lib/load-tests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,12 @@ export class CreateTransfersLoadTest extends AbstractBaseTest {
};
}

metadata(): { name: string } {
return {
name: "CreateTransfers",
}
}

async performIteration() {
const transfers = generateTransfers(this.transferBatchSize, this.accountSelectionStrategy, {
numAccounts: this.numAccounts,
Expand Down Expand Up @@ -188,6 +194,12 @@ export class ReadAccountBalancesLoadTest extends AbstractBaseTest {
}
}

metadata(): { name: string } {
return {
name: "ReadAccountBalances",
}
}

requestsPerIteration() {
return this.batchSize;
}
Expand Down

0 comments on commit 61e1cf5

Please sign in to comment.