Skip to content

Commit de9cf8a

Browse files
authored
feat(node): Add amqplibIntegration (#13714)
resolves #13312 Before submitting a pull request, please take a look at our [Contributing](https://github.com/getsentry/sentry-javascript/blob/master/CONTRIBUTING.md) guidelines and verify: - [x] If you've added code that should be tested, please add tests. - [x] Ensure your code lints and the test suite passes (`yarn lint`) & (`yarn test`).
1 parent 021c8c1 commit de9cf8a

File tree

21 files changed

+316
-18
lines changed

21 files changed

+316
-18
lines changed

.gitignore

+3
Original file line numberDiff line numberDiff line change
@@ -58,3 +58,6 @@ packages/deno/lib.deno.d.ts
5858

5959
# gatsby
6060
packages/gatsby/gatsby-node.d.ts
61+
62+
# intellij
63+
*.iml

dev-packages/node-integration-tests/package.json

+2
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
"@types/mongodb": "^3.6.20",
3939
"@types/mysql": "^2.15.21",
4040
"@types/pg": "^8.6.5",
41+
"amqplib": "^0.10.4",
4142
"apollo-server": "^3.11.1",
4243
"axios": "^1.6.7",
4344
"connect": "^3.7.0",
@@ -66,6 +67,7 @@
6667
"yargs": "^16.2.0"
6768
},
6869
"devDependencies": {
70+
"@types/amqplib": "^0.10.5",
6971
"@types/node-cron": "^3.0.11",
7072
"@types/node-schedule": "^2.1.7",
7173
"globby": "11"
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
const amqpUsername = 'sentry';
2+
const amqpPassword = 'sentry';
3+
4+
export const AMQP_URL = `amqp://${amqpUsername}:${amqpPassword}@localhost:5672/`;
5+
export const ACKNOWLEDGEMENT = { noAck: false };
6+
7+
export const QUEUE_OPTIONS = {
8+
durable: true, // Make the queue durable
9+
exclusive: false, // Not exclusive
10+
autoDelete: false, // Don't auto-delete the queue
11+
arguments: {
12+
'x-message-ttl': 30000, // Message TTL of 30 seconds
13+
'x-max-length': 1000, // Maximum queue length of 1000 messages
14+
},
15+
};
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
version: '3'
2+
3+
services:
4+
rabbitmq:
5+
image: rabbitmq:management
6+
container_name: rabbitmq
7+
environment:
8+
- RABBITMQ_DEFAULT_USER=sentry
9+
- RABBITMQ_DEFAULT_PASS=sentry
10+
ports:
11+
- "5672:5672"
12+
- "15672:15672"
13+
14+
networks:
15+
default:
16+
driver: bridge
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
import { loggingTransport } from '@sentry-internal/node-integration-tests';
2+
import * as Sentry from '@sentry/node';
3+
4+
Sentry.init({
5+
dsn: 'https://[email protected]/1337',
6+
release: '1.0',
7+
tracesSampleRate: 1.0,
8+
transport: loggingTransport,
9+
});
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
import * as Sentry from '@sentry/node';
2+
import './init';
3+
import { connectToRabbitMQ, consumeMessageFromQueue, createQueue, sendMessageToQueue } from './utils';
4+
5+
const queueName = 'queue1';
6+
7+
// eslint-disable-next-line @typescript-eslint/no-floating-promises
8+
(async () => {
9+
const { connection, channel } = await connectToRabbitMQ();
10+
await createQueue(queueName, channel);
11+
12+
await Sentry.startSpan({ name: 'root span' }, async () => {
13+
sendMessageToQueue(queueName, channel, JSON.stringify({ foo: 'bar01' }));
14+
});
15+
16+
await consumeMessageFromQueue(queueName, channel);
17+
await channel.close();
18+
await connection.close();
19+
})();
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
import type { TransactionEvent } from '@sentry/types';
2+
import { cleanupChildProcesses, createRunner } from '../../../utils/runner';
3+
4+
jest.setTimeout(30_000);
5+
6+
const EXPECTED_MESSAGE_SPAN_PRODUCER = expect.objectContaining({
7+
op: 'message',
8+
data: expect.objectContaining({
9+
'messaging.system': 'rabbitmq',
10+
'otel.kind': 'PRODUCER',
11+
'sentry.op': 'message',
12+
'sentry.origin': 'auto.amqplib.otel.publisher',
13+
}),
14+
status: 'ok',
15+
});
16+
17+
const EXPECTED_MESSAGE_SPAN_CONSUMER = expect.objectContaining({
18+
op: 'message',
19+
data: expect.objectContaining({
20+
'messaging.system': 'rabbitmq',
21+
'otel.kind': 'CONSUMER',
22+
'sentry.op': 'message',
23+
'sentry.origin': 'auto.amqplib.otel.consumer',
24+
}),
25+
status: 'ok',
26+
});
27+
28+
describe('amqplib auto-instrumentation', () => {
29+
afterAll(async () => {
30+
cleanupChildProcesses();
31+
});
32+
33+
test('should be able to send and receive messages', done => {
34+
createRunner(__dirname, 'scenario-message.ts')
35+
.withDockerCompose({
36+
workingDirectory: [__dirname],
37+
readyMatches: ['Time to start RabbitMQ'],
38+
})
39+
.expect({
40+
transaction: (transaction: TransactionEvent) => {
41+
expect(transaction.transaction).toEqual('root span');
42+
expect(transaction.spans?.length).toEqual(1);
43+
expect(transaction.spans![0]).toMatchObject(EXPECTED_MESSAGE_SPAN_PRODUCER);
44+
},
45+
})
46+
.expect({
47+
transaction: (transaction: TransactionEvent) => {
48+
expect(transaction.transaction).toEqual('queue1 process');
49+
expect(transaction.contexts?.trace).toMatchObject(EXPECTED_MESSAGE_SPAN_CONSUMER);
50+
},
51+
})
52+
.start(done);
53+
});
54+
});
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
import amqp from 'amqplib';
2+
import type { Channel, Connection } from 'amqplib';
3+
import { ACKNOWLEDGEMENT, AMQP_URL, QUEUE_OPTIONS } from './constants';
4+
5+
export type RabbitMQData = {
6+
connection: Connection;
7+
channel: Channel;
8+
};
9+
10+
export async function connectToRabbitMQ(): Promise<RabbitMQData> {
11+
const connection = await amqp.connect(AMQP_URL);
12+
const channel = await connection.createChannel();
13+
return { connection, channel };
14+
}
15+
16+
export async function createQueue(queueName: string, channel: Channel): Promise<void> {
17+
await channel.assertQueue(queueName, QUEUE_OPTIONS);
18+
}
19+
20+
export function sendMessageToQueue(queueName: string, channel: Channel, message: string): void {
21+
channel.sendToQueue(queueName, Buffer.from(message));
22+
}
23+
24+
async function consumer(queueName: string, channel: Channel): Promise<void> {
25+
await channel.consume(
26+
queueName,
27+
message => {
28+
if (message) {
29+
channel.ack(message);
30+
}
31+
},
32+
ACKNOWLEDGEMENT,
33+
);
34+
}
35+
36+
export async function consumeMessageFromQueue(queueName: string, channel: Channel): Promise<void> {
37+
await consumer(queueName, channel);
38+
}

dev-packages/node-integration-tests/utils/runner.ts

+6-4
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import type {
1111
SerializedCheckIn,
1212
SerializedSession,
1313
SessionAggregates,
14+
TransactionEvent,
1415
} from '@sentry/types';
1516
import axios from 'axios';
1617
import { createBasicSentryServer } from './server';
@@ -151,7 +152,7 @@ type Expected =
151152
event: Partial<Event> | ((event: Event) => void);
152153
}
153154
| {
154-
transaction: Partial<Event> | ((event: Event) => void);
155+
transaction: Partial<TransactionEvent> | ((event: TransactionEvent) => void);
155156
}
156157
| {
157158
session: Partial<SerializedSession> | ((event: SerializedSession) => void);
@@ -317,7 +318,7 @@ export function createRunner(...paths: string[]) {
317318
}
318319

319320
if ('transaction' in expected) {
320-
const event = item[1] as Event;
321+
const event = item[1] as TransactionEvent;
321322
if (typeof expected.transaction === 'function') {
322323
expected.transaction(event);
323324
} else {
@@ -483,6 +484,7 @@ export function createRunner(...paths: string[]) {
483484
method: 'get' | 'post',
484485
path: string,
485486
headers: Record<string, string> = {},
487+
data?: any, // axios accept any as data
486488
): Promise<T | undefined> {
487489
try {
488490
await waitFor(() => scenarioServerPort !== undefined);
@@ -497,7 +499,7 @@ export function createRunner(...paths: string[]) {
497499
if (method === 'get') {
498500
await axios.get(url, { headers });
499501
} else {
500-
await axios.post(url, { headers });
502+
await axios.post(url, data, { headers });
501503
}
502504
} catch (e) {
503505
return;
@@ -506,7 +508,7 @@ export function createRunner(...paths: string[]) {
506508
} else if (method === 'get') {
507509
return (await axios.get(url, { headers })).data;
508510
} else {
509-
return (await axios.post(url, { headers })).data;
511+
return (await axios.post(url, data, { headers })).data;
510512
}
511513
},
512514
};

packages/astro/src/index.server.ts

+1
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ export {
1313
addIntegration,
1414
addOpenTelemetryInstrumentation,
1515
addRequestDataToEvent,
16+
amqplibIntegration,
1617
anrIntegration,
1718
captureCheckIn,
1819
captureConsoleIntegration,

packages/aws-serverless/src/index.ts

+1
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ export {
111111
addOpenTelemetryInstrumentation,
112112
zodErrorsIntegration,
113113
profiler,
114+
amqplibIntegration,
114115
} from '@sentry/node';
115116

116117
export {

packages/bun/src/index.ts

+1
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,7 @@ export {
132132
addOpenTelemetryInstrumentation,
133133
zodErrorsIntegration,
134134
profiler,
135+
amqplibIntegration,
135136
} from '@sentry/node';
136137

137138
export {

packages/google-cloud-serverless/src/index.ts

+1
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ export {
111111
addOpenTelemetryInstrumentation,
112112
zodErrorsIntegration,
113113
profiler,
114+
amqplibIntegration,
114115
} from '@sentry/node';
115116

116117
export {

packages/node/package.json

+1
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@
6969
"@opentelemetry/context-async-hooks": "^1.25.1",
7070
"@opentelemetry/core": "^1.25.1",
7171
"@opentelemetry/instrumentation": "^0.53.0",
72+
"@opentelemetry/instrumentation-amqplib": "^0.42.0",
7273
"@opentelemetry/instrumentation-connect": "0.39.0",
7374
"@opentelemetry/instrumentation-dataloader": "0.12.0",
7475
"@opentelemetry/instrumentation-express": "0.42.0",

packages/node/src/index.ts

+1
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ export { connectIntegration, setupConnectErrorHandler } from './integrations/tra
2929
export { spotlightIntegration } from './integrations/spotlight';
3030
export { genericPoolIntegration } from './integrations/tracing/genericPool';
3131
export { dataloaderIntegration } from './integrations/tracing/dataloader';
32+
export { amqplibIntegration } from './integrations/tracing/amqplib';
3233

3334
export { SentryContextManager } from './otel/contextManager';
3435
export { generateInstrumentOnce } from './otel/instrument';
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
import type { Span } from '@opentelemetry/api';
2+
import { AmqplibInstrumentation, type AmqplibInstrumentationConfig } from '@opentelemetry/instrumentation-amqplib';
3+
import { defineIntegration } from '@sentry/core';
4+
import type { IntegrationFn } from '@sentry/types';
5+
import { generateInstrumentOnce } from '../../otel/instrument';
6+
import { addOriginToSpan } from '../../utils/addOriginToSpan';
7+
8+
const INTEGRATION_NAME = 'Amqplib';
9+
10+
const config: AmqplibInstrumentationConfig = {
11+
consumeEndHook: (span: Span) => {
12+
addOriginToSpan(span, 'auto.amqplib.otel.consumer');
13+
},
14+
publishHook: (span: Span) => {
15+
addOriginToSpan(span, 'auto.amqplib.otel.publisher');
16+
},
17+
};
18+
19+
export const instrumentAmqplib = generateInstrumentOnce(INTEGRATION_NAME, () => new AmqplibInstrumentation(config));
20+
21+
const _amqplibIntegration = (() => {
22+
return {
23+
name: INTEGRATION_NAME,
24+
setupOnce() {
25+
instrumentAmqplib();
26+
},
27+
};
28+
}) satisfies IntegrationFn;
29+
30+
export const amqplibIntegration = defineIntegration(_amqplibIntegration);

packages/node/src/integrations/tracing/index.ts

+3
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import type { Integration } from '@sentry/types';
22
import { instrumentHttp } from '../http';
33

4+
import { amqplibIntegration, instrumentAmqplib } from './amqplib';
45
import { connectIntegration, instrumentConnect } from './connect';
56
import { dataloaderIntegration, instrumentDataloader } from './dataloader';
67
import { expressIntegration, instrumentExpress } from './express';
@@ -43,6 +44,7 @@ export function getAutoPerformanceIntegrations(): Integration[] {
4344
genericPoolIntegration(),
4445
kafkaIntegration(),
4546
dataloaderIntegration(),
47+
amqplibIntegration(),
4648
];
4749
}
4850

@@ -70,5 +72,6 @@ export function getOpenTelemetryInstrumentationToPreload(): (((options?: any) =>
7072
instrumentRedis,
7173
instrumentGenericPool,
7274
instrumentDataloader,
75+
instrumentAmqplib,
7376
];
7477
}

packages/remix/src/index.server.ts

+1
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ export {
1818
addIntegration,
1919
addOpenTelemetryInstrumentation,
2020
addRequestDataToEvent,
21+
amqplibIntegration,
2122
anrIntegration,
2223
captureCheckIn,
2324
captureConsoleIntegration,

packages/solidstart/src/server/index.ts

+1
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ export {
99
addIntegration,
1010
addOpenTelemetryInstrumentation,
1111
addRequestDataToEvent,
12+
amqplibIntegration,
1213
anrIntegration,
1314
captureCheckIn,
1415
captureConsoleIntegration,

packages/sveltekit/src/server/index.ts

+1
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ export {
99
addIntegration,
1010
addOpenTelemetryInstrumentation,
1111
addRequestDataToEvent,
12+
amqplibIntegration,
1213
anrIntegration,
1314
captureCheckIn,
1415
captureConsoleIntegration,

0 commit comments

Comments
 (0)