Skip to content

Commit 8a9222a

Browse files
authored
feat: ability to configure multiple tx broadcast endpoints #765
1 parent 101922b commit 8a9222a

File tree

5 files changed

+251
-1
lines changed

5 files changed

+251
-1
lines changed

.env

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,10 @@ STACKS_CORE_RPC_PORT=20443
2929
# STACKS_CORE_PROXY_HOST=127.0.0.1
3030
# STACKS_CORE_PROXY_PORT=20443
3131

32+
# Configure a path to a file containing additional stacks-node `POST /v2/tranascation` URLs for the /v2 proxy to mutlicast.
33+
# The file should be a newline-delimited list of URLs.
34+
# STACKS_API_EXTRA_TX_ENDPOINTS_FILE=./config/extra-tx-post-endpoints.txt
35+
3236
# STACKS_FAUCET_NODE_HOST=<IP or hostname>
3337
# STACKS_FAUCET_NODE_PORT=<port number>
3438

config/extra-tx-post-endpoints.txt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
http://another-node-a/v2/transactions
2+
http://another-node-b/v2/transactions
3+
# http://another-node-c/v2/transactions
4+
# http://another-node-d/v2/transactions

src/api/routes/core-node-rpc-proxy.ts

Lines changed: 129 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
import * as express from 'express';
22
import * as cors from 'cors';
33
import { createProxyMiddleware, Options } from 'http-proxy-middleware';
4-
import { logger, parsePort } from '../../helpers';
4+
import { logError, logger, parsePort, pipelineAsync, REPO_DIR } from '../../helpers';
55
import { Agent } from 'http';
66
import * as fs from 'fs';
7+
import * as path from 'path';
78
import { addAsync } from '@awaitjs/express';
89
import * as chokidar from 'chokidar';
910
import * as jsoncParser from 'jsonc-parser';
11+
import fetch, { RequestInit } from 'node-fetch';
1012

1113
export function GetStacksNodeProxyEndpoint() {
1214
// Use STACKS_CORE_PROXY env vars if available, otherwise fallback to `STACKS_CORE_RPC
@@ -79,6 +81,132 @@ export function createCoreNodeRpcProxyRouter(): express.Router {
7981
return null;
8082
};
8183

84+
/**
85+
* Check for any extra endpoints that have been configured for performing a "multicast" for a tx submission.
86+
*/
87+
async function getExtraTxPostEndpoints(): Promise<string[] | false> {
88+
const STACKS_API_EXTRA_TX_ENDPOINTS_FILE_ENV_VAR = 'STACKS_API_EXTRA_TX_ENDPOINTS_FILE';
89+
const extraEndpointsEnvVar = process.env[STACKS_API_EXTRA_TX_ENDPOINTS_FILE_ENV_VAR];
90+
if (!extraEndpointsEnvVar) {
91+
return false;
92+
}
93+
const filePath = path.resolve(REPO_DIR, extraEndpointsEnvVar);
94+
let fileContents: string;
95+
try {
96+
fileContents = await fs.promises.readFile(filePath, { encoding: 'utf8' });
97+
} catch (error) {
98+
logError(`Error reading ${STACKS_API_EXTRA_TX_ENDPOINTS_FILE_ENV_VAR}: ${error}`, error);
99+
return false;
100+
}
101+
const endpoints = fileContents
102+
.split(/\r?\n/)
103+
.map(r => r.trim())
104+
.filter(r => !r.startsWith('#') && r.length !== 0);
105+
if (endpoints.length === 0) {
106+
return false;
107+
}
108+
return endpoints;
109+
}
110+
111+
/**
112+
* Reads an http request stream into a Buffer.
113+
*/
114+
async function readRequestBody(req: express.Request, maxSizeBytes = Infinity): Promise<Buffer> {
115+
return new Promise((resolve, reject) => {
116+
let resultBuffer: Buffer = Buffer.alloc(0);
117+
req.on('data', chunk => {
118+
if (!Buffer.isBuffer(chunk)) {
119+
reject(
120+
new Error(
121+
`Expected request body chunks to be Buffer, received ${chunk.constructor.name}`
122+
)
123+
);
124+
req.destroy();
125+
return;
126+
}
127+
resultBuffer = resultBuffer.length === 0 ? chunk : Buffer.concat([resultBuffer, chunk]);
128+
if (resultBuffer.byteLength >= maxSizeBytes) {
129+
reject(new Error(`Request body exceeded max byte size`));
130+
req.destroy();
131+
return;
132+
}
133+
});
134+
req.on('end', () => {
135+
if (!req.complete) {
136+
return reject(
137+
new Error('The connection was terminated while the message was still being sent')
138+
);
139+
}
140+
resolve(resultBuffer);
141+
});
142+
req.on('error', error => reject(error));
143+
});
144+
}
145+
146+
router.postAsync('/transactions', async (req, res, next) => {
147+
const extraEndpoints = await getExtraTxPostEndpoints();
148+
if (!extraEndpoints) {
149+
next();
150+
return;
151+
}
152+
const endpoints = [
153+
// The primary proxy endpoint (the http response from this one will be returned to the client)
154+
`http://${stacksNodeRpcEndpoint}/v2/transactions`,
155+
];
156+
endpoints.push(...extraEndpoints);
157+
logger.info(`Overriding POST /v2/transactions to multicast to ${endpoints.join(',')}}`);
158+
const maxBodySize = 10_000_000; // 10 MB max POST body size
159+
const reqBody = await readRequestBody(req, maxBodySize);
160+
const reqHeaders: string[][] = [];
161+
for (let i = 0; i < req.rawHeaders.length; i += 2) {
162+
reqHeaders.push([req.rawHeaders[i], req.rawHeaders[i + 1]]);
163+
}
164+
const postFn = async (endpoint: string) => {
165+
const reqOpts: RequestInit = {
166+
method: 'POST',
167+
agent: httpAgent,
168+
body: reqBody,
169+
headers: reqHeaders,
170+
};
171+
const proxyResult = await fetch(endpoint, reqOpts);
172+
return proxyResult;
173+
};
174+
175+
// Here's were we "multicast" the `/v2/transaction` POST, by concurrently sending the http request to all configured endpoints.
176+
const results = await Promise.allSettled(endpoints.map(endpoint => postFn(endpoint)));
177+
178+
// Only the first (non-extra) endpoint http response is proxied back through to the client, so ensure any errors from requests
179+
// to the extra endpoints are logged.
180+
results.slice(1).forEach(p => {
181+
if (p.status === 'rejected') {
182+
logError(`Error during POST /v2/transaction to extra endpoint: ${p.reason}`, p.reason);
183+
} else {
184+
if (!p.value.ok) {
185+
logError(
186+
`Response ${p.value.status} during POST /v2/transaction to extra endpoint ${p.value.url}`
187+
);
188+
}
189+
}
190+
});
191+
192+
// Proxy the result of the (non-extra) http response back to the client.
193+
const mainResult = results[0];
194+
if (mainResult.status === 'rejected') {
195+
logError(
196+
`Error in primary POST /v2/transaction proxy: ${mainResult.reason}`,
197+
mainResult.reason
198+
);
199+
res.status(500).json({ error: mainResult.reason });
200+
} else {
201+
const proxyResp = mainResult.value;
202+
res.status(proxyResp.status);
203+
proxyResp.headers.forEach((value, name) => {
204+
res.setHeader(name, value);
205+
});
206+
await pipelineAsync(proxyResp.body, res);
207+
}
208+
});
209+
82210
const proxyOptions: Options = {
83211
agent: httpAgent,
84212
target: `http://${stacksNodeRpcEndpoint}`,

src/tests/test-helpers.ts

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,3 +37,32 @@ export async function useWithCleanup<T extends [...Disposable<any>[]]>(
3737
}
3838
}
3939
}
40+
41+
export type TestEnvVar = [EnvVarKey: string, EnvVarValue: string];
42+
43+
/**
44+
* Helper function for tests.
45+
* Sets local process environment variables, and returns a function that restores them to the original values.
46+
*/
47+
export function withEnvVars(...envVars: TestEnvVar[]) {
48+
const original: { exists: boolean; key: string; value: string | undefined }[] = [];
49+
envVars.forEach(([k, v]) => {
50+
original.push({
51+
exists: k in process.env,
52+
key: k,
53+
value: v,
54+
});
55+
});
56+
envVars.forEach(([k, v]) => {
57+
process.env[k] = v;
58+
});
59+
return () => {
60+
original.forEach(orig => {
61+
if (!orig.exists) {
62+
delete process.env[orig.key];
63+
} else {
64+
process.env[orig.key] = orig.value;
65+
}
66+
});
67+
};
68+
}

src/tests/v2-proxy-tests.ts

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
import * as supertest from 'supertest';
2+
import { ChainID } from '@stacks/transactions';
3+
import { startApiServer } from '../api/init';
4+
import { PgDataStore, cycleMigrations, runMigrations } from '../datastore/postgres-store';
5+
import { PoolClient } from 'pg';
6+
import { useWithCleanup, withEnvVars } from './test-helpers';
7+
import * as fs from 'fs';
8+
import * as path from 'path';
9+
import * as os from 'os';
10+
import * as nock from 'nock';
11+
12+
describe('v2-proxy tests', () => {
13+
let db: PgDataStore;
14+
let client: PoolClient;
15+
16+
beforeEach(async () => {
17+
process.env.PG_DATABASE = 'postgres';
18+
await cycleMigrations();
19+
db = await PgDataStore.connect();
20+
client = await db.pool.connect();
21+
});
22+
23+
test('tx post multicast', async () => {
24+
const primaryProxyEndpoint = 'proxy-stacks-node:12345';
25+
const extraTxEndpoint = 'http://extra-tx-endpoint-a/test';
26+
await useWithCleanup(
27+
() => {
28+
const restoreEnvVars = withEnvVars(
29+
['STACKS_CORE_PROXY_HOST', primaryProxyEndpoint.split(':')[0]],
30+
['STACKS_CORE_PROXY_PORT', primaryProxyEndpoint.split(':')[1]]
31+
);
32+
return [, () => restoreEnvVars()] as const;
33+
},
34+
() => {
35+
const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), 'stacks-api-unit-test-'));
36+
const extraEndpointsFilePath = path.join(tempDir, 'extra-tx-endpoints.txt');
37+
fs.writeFileSync(extraEndpointsFilePath, extraTxEndpoint, { flag: 'w' });
38+
const restoreEnvVars = withEnvVars([
39+
'STACKS_API_EXTRA_TX_ENDPOINTS_FILE',
40+
extraEndpointsFilePath,
41+
]);
42+
return [, () => restoreEnvVars()] as const;
43+
},
44+
async () => {
45+
const apiServer = await startApiServer({
46+
datastore: db,
47+
chainId: ChainID.Mainnet,
48+
httpLogLevel: 'debug',
49+
});
50+
return [apiServer, apiServer.terminate] as const;
51+
},
52+
async (_, __, api) => {
53+
const primaryStubbedResponse = 'success stubbed response';
54+
const extraStubbedResponse = 'extra success stubbed response';
55+
const testRequest = 'fake-tx-data';
56+
let mockedRequestBody = 'none';
57+
nock(`http://${primaryProxyEndpoint}`)
58+
.post('/v2/transactions', testRequest)
59+
.once()
60+
.reply(200, primaryStubbedResponse);
61+
nock(extraTxEndpoint)
62+
.post(() => true, testRequest)
63+
.once()
64+
.reply(200, (_url, body, cb) => {
65+
// the "extra" endpoint responses are logged internally and not sent back to the client, so use this mock callback to
66+
// test that this endpoint was called correctly
67+
mockedRequestBody = body as string;
68+
cb(null, extraStubbedResponse);
69+
});
70+
const postTxReq = await supertest(api.server).post(`/v2/transactions`).send(testRequest);
71+
// test that main endpoint response was returned
72+
expect(postTxReq.status).toBe(200);
73+
expect(postTxReq.text).toBe(primaryStubbedResponse);
74+
// test that the extra endpoint was queried
75+
expect(mockedRequestBody).toBe(testRequest);
76+
}
77+
);
78+
});
79+
80+
afterEach(async () => {
81+
client.release();
82+
await db?.close();
83+
await runMigrations(undefined, 'down');
84+
});
85+
});

0 commit comments

Comments
 (0)