Skip to content

Commit

Permalink
Include endpointUrl in RpcService event listener data
Browse files Browse the repository at this point in the history
Modify RpcService so that when it emits the `onRetry`, `onBreak`, or
`onDegraded` event, it passes the URL of the endpoint it is requesting
to the respective event listener.

This will be useful when we add the ability to create a chain of RPC
service objects, where we start out hitting a primary node and then keep
failing over to successive nodes in the chain until the request
succeeds. By including the endpoint URL in the event listener data, we
are able to identify which RPC service was active when the event
occurred.
  • Loading branch information
mcmire committed Jan 30, 2025
1 parent 846253a commit 0fb406d
Show file tree
Hide file tree
Showing 6 changed files with 171 additions and 65 deletions.
20 changes: 9 additions & 11 deletions packages/controller-utils/src/create-service-policy.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import {
BrokenCircuitError,
CircuitState,
EventEmitter as CockatielEventEmitter,
ConsecutiveBreaker,
ExponentialBackoff,
circuitBreaker,
Expand All @@ -11,13 +12,16 @@ import {
} from 'cockatiel';
import type {
CircuitBreakerPolicy,
Event as CockatielEvent,
IPolicy,
Policy,
RetryPolicy,
} from 'cockatiel';

export { CircuitState, BrokenCircuitError, handleAll, handleWhen };

export type { CockatielEvent };

/**
* The options for `createServicePolicy`.
*/
Expand Down Expand Up @@ -76,7 +80,7 @@ export type ServicePolicy = IPolicy & {
* never succeeds before the retry policy gives up and before the maximum
* number of consecutive failures has been reached.
*/
onDegraded: (fn: () => void) => void;
onDegraded: CockatielEvent<void>;
/**
* A function which will be called by the retry policy each time the service
* fails and the policy kicks off a timer to re-run the service. This is
Expand Down Expand Up @@ -203,27 +207,21 @@ export function createServicePolicy({
});
const onBreak = circuitBreakerPolicy.onBreak.bind(circuitBreakerPolicy);

const onDegradedListeners: (() => void)[] = [];
const onDegradedEventEmitter = new CockatielEventEmitter<void>();
retryPolicy.onGiveUp(() => {
if (circuitBreakerPolicy.state === CircuitState.Closed) {
for (const listener of onDegradedListeners) {
listener();
}
onDegradedEventEmitter.emit();
}
});
retryPolicy.onSuccess(({ duration }) => {
if (
circuitBreakerPolicy.state === CircuitState.Closed &&
duration > degradedThreshold
) {
for (const listener of onDegradedListeners) {
listener();
}
onDegradedEventEmitter.emit();
}
});
const onDegraded = (listener: () => void) => {
onDegradedListeners.push(listener);
};
const onDegraded = onDegradedEventEmitter.addListener;

// Every time the retry policy makes an attempt, it executes the circuit
// breaker policy, which executes the service.
Expand Down
1 change: 1 addition & 0 deletions packages/controller-utils/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ export {
handleWhen,
} from './create-service-policy';
export type {
CockatielEvent,
CreateServicePolicyOptions,
ServicePolicy,
} from './create-service-policy';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,60 @@ import type {
JsonRpcResponse,
} from '@metamask/utils';

import type { FetchOptions } from './shared';
import type { AddToCockatielEventData, FetchOptions } from './shared';

/**
* The interface for a service class responsible for making a request to an RPC
* endpoint.
*/
export type AbstractRpcService = Partial<
Pick<ServicePolicy, 'onBreak' | 'onRetry' | 'onDegraded'>
> & {
export type AbstractRpcService = {
/**
* Listens for when the RPC service retries the request.
*
* @param listener - The callback to be called when the retry occurs.
* @returns What {@link ServicePolicy.onRetry} returns.
* @see {@link createServicePolicy}
*/
onRetry(
listener: AddToCockatielEventData<
Parameters<ServicePolicy['onRetry']>[0],
{ endpointUrl: string }
>,
): ReturnType<ServicePolicy['onRetry']>;

/**
* Listens for when the RPC service retries the request too many times in a
* row.
*
* @param listener - The callback to be called when the circuit is broken.
* @returns What {@link ServicePolicy.onBreak} returns.
* @see {@link createServicePolicy}
*/
onBreak(
listener: AddToCockatielEventData<
Parameters<ServicePolicy['onBreak']>[0],
{ endpointUrl: string }
>,
): ReturnType<ServicePolicy['onBreak']>;

/**
* Listens for when the policy underlying this RPC service detects a slow
* request.
*
* @param listener - The callback to be called when the request is slow.
* @returns What {@link ServicePolicy.onDegraded} returns.
* @see {@link createServicePolicy}
*/
onDegraded(
listener: AddToCockatielEventData<
Parameters<ServicePolicy['onDegraded']>[0],
{ endpointUrl: string }
>,
): ReturnType<ServicePolicy['onDegraded']>;

/**
* Makes a request to the RPC endpoint.
*/
request<Params extends JsonRpcParams, Result extends Json>(
jsonRpcRequest: JsonRpcRequest<Params>,
fetchOptions?: FetchOptions,
Expand Down
Loading

0 comments on commit 0fb406d

Please sign in to comment.