From 21d84adfcc0dc0e882b60eef406749a553e3ecfe Mon Sep 17 00:00:00 2001 From: Elliot Winkler Date: Thu, 30 Jan 2025 15:30:42 -0700 Subject: [PATCH] Include endpointUrl in RpcService event listener data Modify RpcService so that when it emits the `onRetry`, `onBreak`, or `onDegraded` event, it passes the URL of the endpoint it is requesting to the respective event listener. This will be useful when we add the ability to create a chain of RPC service objects, where we start out hitting a primary node and then keep failing over to successive nodes in the chain until the request succeeds. By including the endpoint URL in the event listener data, we are able to identify which RPC service was active when the event occurred. --- .../src/create-service-policy.ts | 20 ++-- packages/controller-utils/src/index.ts | 1 + .../src/rpc-service/abstract-rpc-service.ts | 53 ++++++++- .../src/rpc-service/rpc-service.test.ts | 104 +++++++++++------- .../src/rpc-service/rpc-service.ts | 50 ++++++--- .../src/rpc-service/shared.ts | 12 ++ 6 files changed, 175 insertions(+), 65 deletions(-) diff --git a/packages/controller-utils/src/create-service-policy.ts b/packages/controller-utils/src/create-service-policy.ts index 2747418d480..f0943e20591 100644 --- a/packages/controller-utils/src/create-service-policy.ts +++ b/packages/controller-utils/src/create-service-policy.ts @@ -1,6 +1,7 @@ import { BrokenCircuitError, CircuitState, + EventEmitter as CockatielEventEmitter, ConsecutiveBreaker, ExponentialBackoff, circuitBreaker, @@ -11,6 +12,7 @@ import { } from 'cockatiel'; import type { CircuitBreakerPolicy, + Event as CockatielEvent, IPolicy, Policy, RetryPolicy, @@ -18,6 +20,8 @@ import type { export { CircuitState, BrokenCircuitError, handleAll, handleWhen }; +export type { CockatielEvent }; + /** * The options for `createServicePolicy`. */ @@ -76,7 +80,7 @@ export type ServicePolicy = IPolicy & { * never succeeds before the retry policy gives up and before the maximum * number of consecutive failures has been reached. */ - onDegraded: (fn: () => void) => void; + onDegraded: CockatielEvent; /** * A function which will be called by the retry policy each time the service * fails and the policy kicks off a timer to re-run the service. This is @@ -203,12 +207,10 @@ export function createServicePolicy({ }); const onBreak = circuitBreakerPolicy.onBreak.bind(circuitBreakerPolicy); - const onDegradedListeners: (() => void)[] = []; + const onDegradedEventEmitter = new CockatielEventEmitter(); retryPolicy.onGiveUp(() => { if (circuitBreakerPolicy.state === CircuitState.Closed) { - for (const listener of onDegradedListeners) { - listener(); - } + onDegradedEventEmitter.emit(); } }); retryPolicy.onSuccess(({ duration }) => { @@ -216,14 +218,10 @@ export function createServicePolicy({ circuitBreakerPolicy.state === CircuitState.Closed && duration > degradedThreshold ) { - for (const listener of onDegradedListeners) { - listener(); - } + onDegradedEventEmitter.emit(); } }); - const onDegraded = (listener: () => void) => { - onDegradedListeners.push(listener); - }; + const onDegraded = onDegradedEventEmitter.addListener; // Every time the retry policy makes an attempt, it executes the circuit // breaker policy, which executes the service. diff --git a/packages/controller-utils/src/index.ts b/packages/controller-utils/src/index.ts index f0f65a9b1da..155c269217c 100644 --- a/packages/controller-utils/src/index.ts +++ b/packages/controller-utils/src/index.ts @@ -10,6 +10,7 @@ export { handleWhen, } from './create-service-policy'; export type { + CockatielEvent, CreateServicePolicyOptions, ServicePolicy, } from './create-service-policy'; diff --git a/packages/network-controller/src/rpc-service/abstract-rpc-service.ts b/packages/network-controller/src/rpc-service/abstract-rpc-service.ts index bf7c3002ecf..d3bc82dc400 100644 --- a/packages/network-controller/src/rpc-service/abstract-rpc-service.ts +++ b/packages/network-controller/src/rpc-service/abstract-rpc-service.ts @@ -6,15 +6,60 @@ import type { JsonRpcResponse, } from '@metamask/utils'; -import type { FetchOptions } from './shared'; +import type { AddToCockatielEventData, FetchOptions } from './shared'; /** * The interface for a service class responsible for making a request to an RPC * endpoint. */ -export type AbstractRpcService = Partial< - Pick -> & { +export type AbstractRpcService = { + /** + * Listens for when the RPC service retries the request. + * + * @param listener - The callback to be called when the retry occurs. + * @returns What {@link ServicePolicy.onRetry} returns. + * @see {@link createServicePolicy} + */ + onRetry( + listener: AddToCockatielEventData< + Parameters[0], + { endpointUrl: string } + >, + ): ReturnType; + + /** + * Listens for when the RPC service retries the request too many times in a + * row. + * + * @param listener - The callback to be called when the circuit is broken. + * @returns What {@link ServicePolicy.onBreak} returns. + * @see {@link createServicePolicy} + */ + onBreak( + listener: AddToCockatielEventData< + Parameters[0], + { endpointUrl: string } + >, + ): ReturnType; + + /** + * Listens for when the policy underlying this RPC service detects a slow + * request. + * + * @param listener - The callback to be called when the request is slow. + * @returns What {@link ServicePolicy.onDegraded} returns. + * @see {@link createServicePolicy} + */ + onDegraded( + listener: AddToCockatielEventData< + Parameters[0], + { endpointUrl: string } + >, + ): ReturnType; + + /** + * Makes a request to the RPC endpoint. + */ request( jsonRpcRequest: JsonRpcRequest, fetchOptions?: FetchOptions, diff --git a/packages/network-controller/src/rpc-service/rpc-service.test.ts b/packages/network-controller/src/rpc-service/rpc-service.test.ts index fe69e397fcf..ffc148c4a4a 100644 --- a/packages/network-controller/src/rpc-service/rpc-service.test.ts +++ b/packages/network-controller/src/rpc-service/rpc-service.test.ts @@ -144,7 +144,8 @@ describe('RpcService', () => { describe('if the endpoint has a 405 response', () => { it('throws a non-existent method error without retrying the request', async () => { - nock('https://rpc.example.chain') + const endpointUrl = 'https://rpc.example.chain'; + nock(endpointUrl) .post('/', { id: 1, jsonrpc: '2.0', @@ -155,7 +156,7 @@ describe('RpcService', () => { const service = new RpcService({ fetch, btoa, - endpointUrl: 'https://rpc.example.chain', + endpointUrl, }); const promise = service.request({ @@ -170,7 +171,8 @@ describe('RpcService', () => { }); it('does not forward the request to a failover service if given one', async () => { - nock('https://rpc.example.chain') + const endpointUrl = 'https://rpc.example.chain'; + nock(endpointUrl) .post('/', { id: 1, jsonrpc: '2.0', @@ -182,7 +184,7 @@ describe('RpcService', () => { const service = new RpcService({ fetch, btoa, - endpointUrl: 'https://rpc.example.chain', + endpointUrl, failoverService, }); @@ -197,7 +199,8 @@ describe('RpcService', () => { }); it('does not call onBreak', async () => { - nock('https://rpc.example.chain') + const endpointUrl = 'https://rpc.example.chain'; + nock(endpointUrl) .post('/', { id: 1, jsonrpc: '2.0', @@ -209,7 +212,7 @@ describe('RpcService', () => { const service = new RpcService({ fetch, btoa, - endpointUrl: 'https://rpc.example.chain', + endpointUrl, }); service.onBreak(onBreakListener); @@ -226,7 +229,8 @@ describe('RpcService', () => { describe('if the endpoint has a 429 response', () => { it('throws a rate-limiting error without retrying the request', async () => { - nock('https://rpc.example.chain') + const endpointUrl = 'https://rpc.example.chain'; + nock(endpointUrl) .post('/', { id: 1, jsonrpc: '2.0', @@ -237,7 +241,7 @@ describe('RpcService', () => { const service = new RpcService({ fetch, btoa, - endpointUrl: 'https://rpc.example.chain', + endpointUrl, }); const promise = service.request({ @@ -250,7 +254,8 @@ describe('RpcService', () => { }); it('does not forward the request to a failover service if given one', async () => { - nock('https://rpc.example.chain') + const endpointUrl = 'https://rpc.example.chain'; + nock(endpointUrl) .post('/', { id: 1, jsonrpc: '2.0', @@ -262,7 +267,7 @@ describe('RpcService', () => { const service = new RpcService({ fetch, btoa, - endpointUrl: 'https://rpc.example.chain', + endpointUrl, failoverService, }); @@ -277,7 +282,8 @@ describe('RpcService', () => { }); it('does not call onBreak', async () => { - nock('https://rpc.example.chain') + const endpointUrl = 'https://rpc.example.chain'; + nock(endpointUrl) .post('/', { id: 1, jsonrpc: '2.0', @@ -289,7 +295,7 @@ describe('RpcService', () => { const service = new RpcService({ fetch, btoa, - endpointUrl: 'https://rpc.example.chain', + endpointUrl, }); service.onBreak(onBreakListener); @@ -306,7 +312,8 @@ describe('RpcService', () => { describe('when the endpoint has a response that is neither 2xx, nor 405, 429, 503, or 504', () => { it('throws a generic error without retrying the request', async () => { - nock('https://rpc.example.chain') + const endpointUrl = 'https://rpc.example.chain'; + nock(endpointUrl) .post('/', { id: 1, jsonrpc: '2.0', @@ -321,7 +328,7 @@ describe('RpcService', () => { const service = new RpcService({ fetch, btoa, - endpointUrl: 'https://rpc.example.chain', + endpointUrl, }); const promise = service.request({ @@ -343,7 +350,8 @@ describe('RpcService', () => { }); it('does not forward the request to a failover service if given one', async () => { - nock('https://rpc.example.chain') + const endpointUrl = 'https://rpc.example.chain'; + nock(endpointUrl) .post('/', { id: 1, jsonrpc: '2.0', @@ -359,7 +367,7 @@ describe('RpcService', () => { const service = new RpcService({ fetch, btoa, - endpointUrl: 'https://rpc.example.chain', + endpointUrl, failoverService, }); @@ -374,7 +382,8 @@ describe('RpcService', () => { }); it('does not call onBreak', async () => { - nock('https://rpc.example.chain') + const endpointUrl = 'https://rpc.example.chain'; + nock(endpointUrl) .post('/', { id: 1, jsonrpc: '2.0', @@ -390,7 +399,7 @@ describe('RpcService', () => { const service = new RpcService({ fetch, btoa, - endpointUrl: 'https://rpc.example.chain', + endpointUrl, }); service.onBreak(onBreakListener); @@ -417,7 +426,8 @@ describe('RpcService', () => { }); it('removes non-JSON-RPC-compliant properties from the request body before sending it to the endpoint', async () => { - nock('https://rpc.example.chain') + const endpointUrl = 'https://rpc.example.chain'; + nock(endpointUrl) .post('/', { id: 1, jsonrpc: '2.0', @@ -432,7 +442,7 @@ describe('RpcService', () => { const service = new RpcService({ fetch, btoa, - endpointUrl: 'https://rpc.example.chain', + endpointUrl, }); // @ts-expect-error Intentionally passing bad input. @@ -566,7 +576,8 @@ describe('RpcService', () => { }); it('returns the JSON-decoded response if the request succeeds', async () => { - nock('https://rpc.example.chain') + const endpointUrl = 'https://rpc.example.chain'; + nock(endpointUrl) .post('/', { id: 1, jsonrpc: '2.0', @@ -593,7 +604,7 @@ describe('RpcService', () => { const service = new RpcService({ fetch, btoa, - endpointUrl: 'https://rpc.example.chain', + endpointUrl, }); const response = await service.request({ @@ -623,7 +634,8 @@ describe('RpcService', () => { }); it('does not throw if the endpoint returns an unsuccessful JSON-RPC response', async () => { - nock('https://rpc.example.chain') + const endpointUrl = 'https://rpc.example.chain'; + nock(endpointUrl) .post('/', { id: 1, jsonrpc: '2.0', @@ -641,7 +653,7 @@ describe('RpcService', () => { const service = new RpcService({ fetch, btoa, - endpointUrl: 'https://rpc.example.chain', + endpointUrl, }); const response = await service.request({ @@ -662,7 +674,8 @@ describe('RpcService', () => { }); it('interprets a "Not Found" response for eth_getBlockByNumber as an empty result', async () => { - nock('https://rpc.example.chain') + const endpointUrl = 'https://rpc.example.chain'; + nock(endpointUrl) .post('/', { id: 1, jsonrpc: '2.0', @@ -673,7 +686,7 @@ describe('RpcService', () => { const service = new RpcService({ fetch, btoa, - endpointUrl: 'https://rpc.example.chain', + endpointUrl, }); const response = await service.request({ @@ -691,7 +704,8 @@ describe('RpcService', () => { }); it('calls the onDegraded callback if the endpoint takes more than 5 seconds to respond', async () => { - nock('https://rpc.example.chain') + const endpointUrl = 'https://rpc.example.chain'; + nock(endpointUrl) .post('/', { id: 1, jsonrpc: '2.0', @@ -710,7 +724,7 @@ describe('RpcService', () => { const service = new RpcService({ fetch, btoa, - endpointUrl: 'https://rpc.example.chain', + endpointUrl, }); service.onDegraded(onDegradedListener); @@ -827,11 +841,12 @@ function testsForRetriableFetchErrors({ const mockFetch = jest.fn(() => { throw producedError; }); + const endpointUrl = 'https://rpc.example.chain'; const onBreakListener = jest.fn(); const service = new RpcService({ fetch: mockFetch, btoa, - endpointUrl: 'https://rpc.example.chain', + endpointUrl, }); service.onRetry(() => { // We don't need to await this promise; adding it to the promise @@ -853,7 +868,10 @@ function testsForRetriableFetchErrors({ await ignoreRejection(service.request(jsonRpcRequest)); expect(onBreakListener).toHaveBeenCalledTimes(1); - expect(onBreakListener).toHaveBeenCalledWith({ error: expectedError }); + expect(onBreakListener).toHaveBeenCalledWith({ + error: expectedError, + endpointUrl: `${endpointUrl}/`, + }); }); }); @@ -1012,12 +1030,13 @@ function testsForRetriableFetchErrors({ const mockFetch = jest.fn(() => { throw producedError; }); + const endpointUrl = 'https://rpc.example.chain'; const failoverService = buildMockRpcService(); const onBreakListener = jest.fn(); const service = new RpcService({ fetch: mockFetch, btoa, - endpointUrl: 'https://rpc.example.chain', + endpointUrl, failoverService, }); service.onRetry(() => { @@ -1043,7 +1062,10 @@ function testsForRetriableFetchErrors({ await service.request(jsonRpcRequest); expect(onBreakListener).toHaveBeenCalledTimes(2); - expect(onBreakListener).toHaveBeenCalledWith({ error: expectedError }); + expect(onBreakListener).toHaveBeenCalledWith({ + error: expectedError, + endpointUrl: `${endpointUrl}/`, + }); }); }); } @@ -1150,7 +1172,8 @@ function testsForRetriableResponses({ it('calls the onBreak callback once after the circuit breaks', async () => { const clock = getClock(); - nock('https://rpc.example.chain') + const endpointUrl = 'https://rpc.example.chain'; + nock(endpointUrl) .post('/', { id: 1, jsonrpc: '2.0', @@ -1163,7 +1186,7 @@ function testsForRetriableResponses({ const service = new RpcService({ fetch, btoa, - endpointUrl: 'https://rpc.example.chain', + endpointUrl, }); service.onRetry(() => { // We don't need to await this promise; adding it to the promise @@ -1185,7 +1208,10 @@ function testsForRetriableResponses({ await ignoreRejection(service.request(jsonRpcRequest)); expect(onBreakListener).toHaveBeenCalledTimes(1); - expect(onBreakListener).toHaveBeenCalledWith({ error: expectedError }); + expect(onBreakListener).toHaveBeenCalledWith({ + error: expectedError, + endpointUrl: `${endpointUrl}/`, + }); }); }); @@ -1365,12 +1391,13 @@ function testsForRetriableResponses({ }) .times(16) .reply(httpStatus, responseBody); + const endpointUrl = 'https://rpc.example.chain'; const failoverService = buildMockRpcService(); const onBreakListener = jest.fn(); const service = new RpcService({ fetch, btoa, - endpointUrl: 'https://rpc.example.chain', + endpointUrl, failoverService, }); service.onRetry(() => { @@ -1396,7 +1423,10 @@ function testsForRetriableResponses({ await service.request(jsonRpcRequest); expect(onBreakListener).toHaveBeenCalledTimes(2); - expect(onBreakListener).toHaveBeenCalledWith({ error: expectedError }); + expect(onBreakListener).toHaveBeenCalledWith({ + error: expectedError, + endpointUrl: `${endpointUrl}/`, + }); }); }); diff --git a/packages/network-controller/src/rpc-service/rpc-service.ts b/packages/network-controller/src/rpc-service/rpc-service.ts index a063c8aa18f..fc2c18fa9c9 100644 --- a/packages/network-controller/src/rpc-service/rpc-service.ts +++ b/packages/network-controller/src/rpc-service/rpc-service.ts @@ -15,7 +15,7 @@ import { import deepmerge from 'deepmerge'; import type { AbstractRpcService } from './abstract-rpc-service'; -import type { FetchOptions } from './shared'; +import type { AddToCockatielEventData, FetchOptions } from './shared'; /** * The list of error messages that represent a failure to reach the network. @@ -107,8 +107,8 @@ export class RpcService implements AbstractRpcService { * If your JavaScript environment supports `fetch` natively, you'll probably * want to pass that; otherwise you can pass an equivalent (such as `fetch` * via `node-fetch`). - * @param args.btoa - A function that can be used to encode a binary string - * into base 64. Used to encode authorization credentials. + * @param args.btoa - A function that can be used to convert a binary string + * into base-64. Used to encode authorization credentials. * @param args.endpointUrl - The URL of the RPC endpoint. * @param args.fetchOptions - A common set of options that will be used to * make every request. Can be overridden on the request level (e.g. to add @@ -158,28 +158,45 @@ export class RpcService implements AbstractRpcService { this.#policy = policy; } + getEndpointUrl() { + return this.#endpointUrl; + } + /** - * Listens for when the retry policy underlying this RPC service retries the - * request. + * Listens for when the RPC service retries the request. * * @param listener - The callback to be called when the retry occurs. * @returns What {@link ServicePolicy.onRetry} returns. * @see {@link createServicePolicy} */ - onRetry(listener: Parameters[0]) { - return this.#policy.onRetry(listener); + onRetry( + listener: AddToCockatielEventData< + Parameters[0], + { endpointUrl: string } + >, + ) { + return this.#policy.onRetry((data) => { + listener({ ...data, endpointUrl: this.#endpointUrl.toString() }); + }); } /** - * Listens for when the circuit breaker policy underlying this RPC service - * detects a broken circuit. + * Listens for when the RPC service retries the request too many times in a + * row. * * @param listener - The callback to be called when the circuit is broken. * @returns What {@link ServicePolicy.onBreak} returns. * @see {@link createServicePolicy} */ - onBreak(listener: Parameters[0]) { - return this.#policy.onBreak(listener); + onBreak( + listener: AddToCockatielEventData< + Parameters[0], + { endpointUrl: string } + >, + ) { + return this.#policy.onBreak((data) => { + listener({ ...data, endpointUrl: this.#endpointUrl.toString() }); + }); } /** @@ -190,8 +207,15 @@ export class RpcService implements AbstractRpcService { * @returns What {@link ServicePolicy.onDegraded} returns. * @see {@link createServicePolicy} */ - onDegraded(listener: Parameters[0]) { - return this.#policy.onDegraded(listener); + onDegraded( + listener: AddToCockatielEventData< + Parameters[0], + { endpointUrl: string } + >, + ) { + return this.#policy.onDegraded(() => { + listener({ endpointUrl: this.#endpointUrl.toString() }); + }); } /** diff --git a/packages/network-controller/src/rpc-service/shared.ts b/packages/network-controller/src/rpc-service/shared.ts index e45ec187e09..68e4c78b250 100644 --- a/packages/network-controller/src/rpc-service/shared.ts +++ b/packages/network-controller/src/rpc-service/shared.ts @@ -2,3 +2,15 @@ * Equivalent to the built-in `FetchOptions` type, but renamed for clarity. */ export type FetchOptions = RequestInit; + +/** + * Extends an event listener that Cockatiel uses so that when it is called, more + * data can be supplied in the event object. + */ +export type AddToCockatielEventData = + EventListener extends (data: infer Data) => void + ? // Prevent Data from being split if it's a type union + [Data] extends [void] + ? (data: AdditionalData) => void + : (data: Data & AdditionalData) => void + : never;