Skip to content

Commit 4931812

Browse files
authored
lukas/gill websocket sub (#1781)
* websockets gill temp * feat: feature parity between gill version ws acct sub and reg one + optional passing into driftClient * fix: post rebase bugs and cleanup * chore: websocket account subscriber export * feat: logging string update on ws acct v2 * rm: useless logging * chore: cleanup ws subscriber v2 docs * chore: specific name on custom ws acct sub param * fix: post rebase again cleanup * fix: prettier fixed
1 parent 91dd935 commit 4931812

9 files changed

+903
-12
lines changed

sdk/bun.lock

Lines changed: 96 additions & 5 deletions
Large diffs are not rendered by default.

sdk/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
"@switchboard-xyz/on-demand": "2.4.1",
5454
"@triton-one/yellowstone-grpc": "1.3.0",
5555
"anchor-bankrun": "0.3.0",
56+
"gill": "^0.10.2",
5657
"nanoid": "3.3.4",
5758
"node-cache": "5.1.2",
5859
"rpc-websockets": "7.5.1",
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
# WebSocketAccountSubscriberV2
2+
3+
This is a new implementation of the WebSocket account subscriber that utilizes the [gill](https://www.npmjs.com/package/gill) library for improved RPC and WebSocket functionality.
4+
5+
## Overview
6+
7+
The `WebSocketAccountSubscriberV2` class provides the same interface as the original `WebSocketAccountSubscriber` but uses gill's modern TypeScript client library for Solana blockchain interactions.
8+
9+
## Usage
10+
11+
The usage is identical to the original `WebSocketAccountSubscriber`:
12+
13+
```typescript
14+
import { WebSocketAccountSubscriberV2 } from './accounts/webSocketAccountSubscriberV2';
15+
16+
const subscriber = new WebSocketAccountSubscriberV2(
17+
'userAccount', // account name
18+
program, // Anchor program instance
19+
userAccountPublicKey, // PublicKey of the account to subscribe to
20+
decodeBuffer, // optional custom decode function
21+
resubOpts, // optional resubscription options
22+
commitment // optional commitment level
23+
);
24+
25+
// Subscribe to account changes
26+
await subscriber.subscribe((data) => {
27+
console.log('Account updated:', data);
28+
});
29+
30+
// Unsubscribe when done
31+
await subscriber.unsubscribe();
32+
```
33+
34+
## Implementation Details
35+
36+
### Gill Integration
37+
38+
The implementation uses gill's `createSolanaClient` function to create RPC and WebSocket clients:
39+
40+
```typescript
41+
import { createSolanaClient } from 'gill';
42+
43+
const { rpc, rpcSubscriptions } = createSolanaClient({
44+
urlOrMoniker: rpcUrl, // or "mainnet", "devnet", etc.
45+
});
46+
```
47+
48+
### Key Differences from Original
49+
50+
1. **RPC Client**: Uses gill's `rpc` client for account fetching
51+
2. **WebSocket Subscriptions**: Uses gill's `rpcSubscriptions` for real-time updates
52+
3. **Address Handling**: Converts `PublicKey` to gill's `Address` type for compatibility
53+
4. **Response Formatting**: Converts gill responses to match the expected `AccountInfo<Buffer>` format
54+
5. **Abort Signal**: Utilizes AbortSignal nodejs/web class to shutdown websocket connection synchronously
Lines changed: 310 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,310 @@
1+
import {
2+
DataAndSlot,
3+
AccountSubscriber,
4+
ResubOpts,
5+
BufferAndSlot,
6+
} from './types';
7+
import { AnchorProvider, Program } from '@coral-xyz/anchor';
8+
import { capitalize } from './utils';
9+
import {
10+
AccountInfoBase,
11+
AccountInfoWithBase58EncodedData,
12+
AccountInfoWithBase64EncodedData,
13+
createSolanaClient,
14+
isAddress,
15+
type Address,
16+
type Commitment,
17+
} from 'gill';
18+
import { PublicKey } from '@solana/web3.js';
19+
import bs58 from 'bs58';
20+
21+
export class WebSocketAccountSubscriberV2<T> implements AccountSubscriber<T> {
22+
dataAndSlot?: DataAndSlot<T>;
23+
bufferAndSlot?: BufferAndSlot;
24+
accountName: string;
25+
logAccountName: string;
26+
program: Program;
27+
accountPublicKey: PublicKey;
28+
decodeBufferFn: (buffer: Buffer) => T;
29+
onChange: (data: T) => void;
30+
listenerId?: number;
31+
32+
resubOpts?: ResubOpts;
33+
34+
commitment?: Commitment;
35+
isUnsubscribing = false;
36+
37+
timeoutId?: ReturnType<typeof setTimeout>;
38+
39+
receivingData: boolean;
40+
41+
// Gill client components
42+
private rpc: ReturnType<typeof createSolanaClient>['rpc'];
43+
private rpcSubscriptions: ReturnType<
44+
typeof createSolanaClient
45+
>['rpcSubscriptions'];
46+
private abortController?: AbortController;
47+
48+
public constructor(
49+
accountName: string,
50+
program: Program,
51+
accountPublicKey: PublicKey,
52+
decodeBuffer?: (buffer: Buffer) => T,
53+
resubOpts?: ResubOpts,
54+
commitment?: Commitment
55+
) {
56+
this.accountName = accountName;
57+
this.logAccountName = `${accountName}-${accountPublicKey.toBase58()}-ws-acct-subscriber-v2`;
58+
this.program = program;
59+
this.accountPublicKey = accountPublicKey;
60+
this.decodeBufferFn = decodeBuffer;
61+
this.resubOpts = resubOpts;
62+
if (this.resubOpts?.resubTimeoutMs < 1000) {
63+
console.log(
64+
`resubTimeoutMs should be at least 1000ms to avoid spamming resub ${this.logAccountName}`
65+
);
66+
}
67+
this.receivingData = false;
68+
if (
69+
['recent', 'single', 'singleGossip', 'root', 'max'].includes(
70+
(this.program.provider as AnchorProvider).opts.commitment
71+
)
72+
) {
73+
console.warn(
74+
`using commitment ${
75+
(this.program.provider as AnchorProvider).opts.commitment
76+
} that is not supported by gill, this may cause issues`
77+
);
78+
}
79+
this.commitment =
80+
commitment ??
81+
((this.program.provider as AnchorProvider).opts.commitment as Commitment);
82+
83+
// Initialize gill client using the same RPC URL as the program provider
84+
const rpcUrl = (this.program.provider as AnchorProvider).connection
85+
.rpcEndpoint;
86+
const { rpc, rpcSubscriptions } = createSolanaClient({
87+
urlOrMoniker: rpcUrl,
88+
});
89+
this.rpc = rpc;
90+
this.rpcSubscriptions = rpcSubscriptions;
91+
}
92+
93+
async subscribe(onChange: (data: T) => void): Promise<void> {
94+
if (this.listenerId != null || this.isUnsubscribing) {
95+
if (this.resubOpts?.logResubMessages) {
96+
console.log(
97+
`[${this.logAccountName}] Subscribe returning early - listenerId=${this.listenerId}, isUnsubscribing=${this.isUnsubscribing}`
98+
);
99+
}
100+
return;
101+
}
102+
103+
this.onChange = onChange;
104+
if (!this.dataAndSlot) {
105+
await this.fetch();
106+
}
107+
108+
// Create abort controller for proper cleanup
109+
const abortController = new AbortController();
110+
this.abortController = abortController;
111+
112+
// Subscribe to account changes using gill's rpcSubscriptions
113+
const pubkey = this.accountPublicKey.toBase58();
114+
if (isAddress(pubkey)) {
115+
const subscription = await this.rpcSubscriptions
116+
.accountNotifications(pubkey, {
117+
commitment: this.commitment,
118+
encoding: 'base64',
119+
})
120+
.subscribe({
121+
abortSignal: abortController.signal,
122+
});
123+
124+
for await (const notification of subscription) {
125+
if (this.resubOpts?.resubTimeoutMs) {
126+
this.receivingData = true;
127+
clearTimeout(this.timeoutId);
128+
this.handleRpcResponse(notification.context, notification.value);
129+
this.setTimeout();
130+
} else {
131+
this.handleRpcResponse(notification.context, notification.value);
132+
}
133+
}
134+
}
135+
136+
this.listenerId = Math.random(); // Unique ID for logging purposes
137+
138+
if (this.resubOpts?.resubTimeoutMs) {
139+
this.receivingData = true;
140+
this.setTimeout();
141+
}
142+
}
143+
144+
setData(data: T, slot?: number): void {
145+
const newSlot = slot || 0;
146+
if (this.dataAndSlot && this.dataAndSlot.slot > newSlot) {
147+
return;
148+
}
149+
150+
this.dataAndSlot = {
151+
data,
152+
slot,
153+
};
154+
}
155+
156+
protected setTimeout(): void {
157+
if (!this.onChange) {
158+
throw new Error('onChange callback function must be set');
159+
}
160+
this.timeoutId = setTimeout(
161+
async () => {
162+
if (this.isUnsubscribing) {
163+
// If we are in the process of unsubscribing, do not attempt to resubscribe
164+
if (this.resubOpts?.logResubMessages) {
165+
console.log(
166+
`[${this.logAccountName}] Timeout fired but isUnsubscribing=true, skipping resubscribe`
167+
);
168+
}
169+
return;
170+
}
171+
172+
if (this.receivingData) {
173+
if (this.resubOpts?.logResubMessages) {
174+
console.log(
175+
`No ws data from ${this.logAccountName} in ${this.resubOpts.resubTimeoutMs}ms, resubscribing - listenerId=${this.listenerId}, isUnsubscribing=${this.isUnsubscribing}`
176+
);
177+
}
178+
await this.unsubscribe(true);
179+
this.receivingData = false;
180+
await this.subscribe(this.onChange);
181+
if (this.resubOpts?.logResubMessages) {
182+
console.log(
183+
`[${this.logAccountName}] Resubscribe completed - receivingData=${this.receivingData}, listenerId=${this.listenerId}, isUnsubscribing=${this.isUnsubscribing}`
184+
);
185+
}
186+
} else {
187+
if (this.resubOpts?.logResubMessages) {
188+
console.log(
189+
`[${this.logAccountName}] Timeout fired but receivingData=false, skipping resubscribe`
190+
);
191+
}
192+
}
193+
},
194+
this.resubOpts?.resubTimeoutMs
195+
);
196+
}
197+
198+
async fetch(): Promise<void> {
199+
// Use gill's rpc for fetching account info
200+
const accountAddress = this.accountPublicKey.toBase58() as Address;
201+
const rpcResponse = await this.rpc
202+
.getAccountInfo(accountAddress, {
203+
commitment: this.commitment,
204+
encoding: 'base64',
205+
})
206+
.send();
207+
208+
// Convert gill response to match the expected format
209+
const context = {
210+
slot: Number(rpcResponse.context.slot),
211+
};
212+
213+
const accountInfo = rpcResponse.value;
214+
215+
this.handleRpcResponse({ slot: BigInt(context.slot) }, accountInfo);
216+
}
217+
218+
handleRpcResponse(
219+
context: { slot: bigint },
220+
accountInfo?: AccountInfoBase &
221+
(AccountInfoWithBase58EncodedData | AccountInfoWithBase64EncodedData)
222+
): void {
223+
const newSlot = context.slot;
224+
let newBuffer: Buffer | undefined = undefined;
225+
226+
if (accountInfo) {
227+
// Extract data from gill response
228+
if (accountInfo.data) {
229+
// Handle different data formats from gill
230+
if (Array.isArray(accountInfo.data)) {
231+
// If it's a tuple [data, encoding]
232+
const [data, encoding] = accountInfo.data;
233+
234+
if (encoding === 'base58') {
235+
// we know encoding will be base58
236+
// Convert base58 to buffer using bs58
237+
newBuffer = Buffer.from(bs58.decode(data));
238+
} else {
239+
newBuffer = Buffer.from(data, 'base64');
240+
}
241+
}
242+
}
243+
}
244+
245+
if (!this.bufferAndSlot) {
246+
this.bufferAndSlot = {
247+
buffer: newBuffer,
248+
slot: Number(newSlot),
249+
};
250+
if (newBuffer) {
251+
const account = this.decodeBuffer(newBuffer);
252+
this.dataAndSlot = {
253+
data: account,
254+
slot: Number(newSlot),
255+
};
256+
this.onChange(account);
257+
}
258+
return;
259+
}
260+
261+
if (Number(newSlot) < this.bufferAndSlot.slot) {
262+
return;
263+
}
264+
265+
const oldBuffer = this.bufferAndSlot.buffer;
266+
if (newBuffer && (!oldBuffer || !newBuffer.equals(oldBuffer))) {
267+
this.bufferAndSlot = {
268+
buffer: newBuffer,
269+
slot: Number(newSlot),
270+
};
271+
const account = this.decodeBuffer(newBuffer);
272+
this.dataAndSlot = {
273+
data: account,
274+
slot: Number(newSlot),
275+
};
276+
this.onChange(account);
277+
}
278+
}
279+
280+
decodeBuffer(buffer: Buffer): T {
281+
if (this.decodeBufferFn) {
282+
return this.decodeBufferFn(buffer);
283+
} else {
284+
return this.program.account[this.accountName].coder.accounts.decode(
285+
capitalize(this.accountName),
286+
buffer
287+
);
288+
}
289+
}
290+
291+
unsubscribe(onResub = false): Promise<void> {
292+
if (!onResub && this.resubOpts) {
293+
this.resubOpts.resubTimeoutMs = undefined;
294+
}
295+
this.isUnsubscribing = true;
296+
clearTimeout(this.timeoutId);
297+
this.timeoutId = undefined;
298+
299+
// Abort the WebSocket subscription
300+
if (this.abortController) {
301+
this.abortController.abort('unsubscribing');
302+
this.abortController = undefined;
303+
}
304+
305+
this.listenerId = undefined;
306+
this.isUnsubscribing = false;
307+
308+
return Promise.resolve();
309+
}
310+
}

0 commit comments

Comments
 (0)