Skip to content

Commit e01f7de

Browse files
authored
Merge pull request #61 from helius-labs/sub-request-fix
Preserve subscription modifications across reconnections
2 parents d4480e9 + cab7b9d commit e01f7de

File tree

5 files changed

+173
-18
lines changed

5 files changed

+173
-18
lines changed

javascript/Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

javascript/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "laserstream-napi"
3-
version = "0.2.0"
3+
version = "0.2.2"
44
edition = "2021"
55

66
[lib]

javascript/napi-src/stream.rs

Lines changed: 48 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -58,15 +58,18 @@ impl StreamInner {
5858
}
5959

6060
let id_for_cleanup = id.clone();
61+
62+
// Wrap current_request in Arc<Mutex> so it can be updated from write() calls
63+
let current_request = Arc::new(parking_lot::Mutex::new(initial_request));
64+
6165
tokio::spawn(async move {
62-
let mut current_request = initial_request;
6366
let mut reconnect_attempts = 0u32;
64-
67+
6568
// Determine effective max attempts
6669
let effective_max_attempts = max_reconnect_attempts.min(HARD_CAP_RECONNECT_ATTEMPTS);
67-
70+
6871
// Extract commitment level for reconnection logic
69-
let commitment_level = current_request.commitment.unwrap_or(0); // 0 = Processed, 1 = Confirmed, 2 = Finalized
72+
let commitment_level = current_request.lock().commitment.unwrap_or(0); // 0 = Processed, 1 = Confirmed, 2 = Finalized
7073

7174
loop {
7275
let tracked_slot_clone = tracked_slot.clone();
@@ -77,6 +80,9 @@ impl StreamInner {
7780
// Reset progress flag for this connection attempt
7881
made_progress.store(false, Ordering::SeqCst);
7982

83+
// Clone the current request for this connection attempt
84+
let request_snapshot = current_request.lock().clone();
85+
8086
tokio::select! {
8187
_ = &mut cancel_rx => {
8288
break;
@@ -85,13 +91,14 @@ impl StreamInner {
8591
result = Self::connect_and_stream_bytes(
8692
&endpoint,
8793
&token,
88-
&current_request,
94+
&request_snapshot,
8995
ts_callback_clone,
9096
tracked_slot_clone,
9197
internal_slot_id_clone,
9298
progress_flag_clone,
9399
&channel_options,
94100
&mut write_rx,
101+
current_request.clone(),
95102
) => {
96103
match result {
97104
Ok(()) => {
@@ -121,7 +128,7 @@ impl StreamInner {
121128
break;
122129
}
123130

124-
// Determine where to resume based on commitment level.
131+
// Determine where to resume based on commitment level.
125132
let last_tracked_slot = tracked_slot.load(Ordering::SeqCst);
126133

127134
// Only use from_slot when replay is enabled
@@ -141,9 +148,9 @@ impl StreamInner {
141148
}
142149
};
143150

144-
current_request.from_slot = Some(from_slot);
151+
current_request.lock().from_slot = Some(from_slot);
145152
} else {
146-
current_request.from_slot = None;
153+
current_request.lock().from_slot = None;
147154
}
148155

149156
// Fixed interval delay between reconnections
@@ -172,6 +179,7 @@ impl StreamInner {
172179
progress_flag: Arc<std::sync::atomic::AtomicBool>,
173180
channel_options: &Option<ChannelOptions>,
174181
write_rx: &mut mpsc::UnboundedReceiver<geyser::SubscribeRequest>,
182+
current_request: Arc<parking_lot::Mutex<geyser::SubscribeRequest>>,
175183
) -> std::result::Result<(), Box<dyn std::error::Error + Send + Sync>> {
176184

177185
let mut builder = GeyserGrpcClient::build_from_shared(endpoint.to_string())?;
@@ -384,6 +392,13 @@ impl StreamInner {
384392

385393
// Handle write requests from the JavaScript client
386394
Some(write_request) = write_rx.recv() => {
395+
// IMPORTANT: Merge the write_request into current_request so it persists across reconnections
396+
{
397+
let mut req = current_request.lock();
398+
Self::merge_subscribe_requests(&mut req, &write_request);
399+
}
400+
401+
// Send the modification to the active stream
387402
if let Err(e) = sender.send(write_request).await {
388403
return Err(Box::new(e));
389404
}
@@ -399,13 +414,37 @@ impl StreamInner {
399414
Ok(())
400415
}
401416

417+
/// Merges a subscription modification request into the current request.
418+
/// This ensures that modifications made via write() are preserved across reconnections.
419+
fn merge_subscribe_requests(
420+
current: &mut geyser::SubscribeRequest,
421+
modification: &geyser::SubscribeRequest,
422+
) {
423+
// Merge all subscription types
424+
current.accounts.extend(modification.accounts.clone());
425+
current.slots.extend(modification.slots.clone());
426+
current.transactions.extend(modification.transactions.clone());
427+
current.transactions_status.extend(modification.transactions_status.clone());
428+
current.blocks.extend(modification.blocks.clone());
429+
current.blocks_meta.extend(modification.blocks_meta.clone());
430+
current.entry.extend(modification.entry.clone());
431+
current.accounts_data_slice.extend(modification.accounts_data_slice.clone());
432+
433+
// Update commitment if specified
434+
if modification.commitment.is_some() {
435+
current.commitment = modification.commitment;
436+
}
437+
438+
// Note: from_slot and ping are not merged as they are connection-specific
439+
}
440+
402441
pub fn cancel(&self) -> Result<()> {
403442
if let Some(tx) = self.cancel_tx.lock().take() {
404443
let _ = tx.send(());
405444
}
406445
Ok(())
407446
}
408-
447+
409448
pub fn write(&self, request: geyser::SubscribeRequest) -> Result<()> {
410449
let tx_guard = self.write_tx.lock();
411450
if let Some(ref tx) = *tx_guard {

javascript/package.json

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "helius-laserstream",
3-
"version": "0.2.1",
3+
"version": "0.2.2",
44
"description": "High-performance Laserstream gRPC client with automatic reconnection",
55
"main": "client.js",
66
"types": "client.d.ts",
@@ -83,12 +83,12 @@
8383
}
8484
},
8585
"optionalDependencies": {
86-
"helius-laserstream-darwin-arm64": "0.2.1",
87-
"helius-laserstream-darwin-x64": "0.2.1",
88-
"helius-laserstream-linux-arm64-gnu": "0.2.1",
89-
"helius-laserstream-linux-arm64-musl": "0.2.1",
90-
"helius-laserstream-linux-x64-gnu": "0.2.1",
91-
"helius-laserstream-linux-x64-musl": "0.2.1"
86+
"helius-laserstream-darwin-arm64": "0.2.2",
87+
"helius-laserstream-darwin-x64": "0.2.2",
88+
"helius-laserstream-linux-arm64-gnu": "0.2.2",
89+
"helius-laserstream-linux-arm64-musl": "0.2.2",
90+
"helius-laserstream-linux-x64-gnu": "0.2.2",
91+
"helius-laserstream-linux-x64-musl": "0.2.2"
9292
},
9393
"dependencies": {
9494
"@types/protobufjs": "^6.0.0",
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
import { subscribe, CommitmentLevel, SubscribeUpdate, LaserstreamConfig } from '../client';
2+
3+
const credentials = require('../test-config');
4+
5+
const USDC_MINT = 'EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v';
6+
const USDT_MINT = 'Es9vMFrzaCERmJfrF4H2FYD4KCoNkY11McCe8BenwNYB';
7+
8+
async function testSubscriptionModificationPersistence() {
9+
const config: LaserstreamConfig = {
10+
apiKey: credentials.laserstream.apiKey,
11+
endpoint: credentials.laserstream.endpoint,
12+
replay: true,
13+
};
14+
15+
const seenFilters = new Set<string>();
16+
let reconnected = false;
17+
let reconnectTime = 0;
18+
const filtersAfterReconnect = new Set<string>();
19+
20+
const stream = await subscribe(
21+
config,
22+
{
23+
transactions: {
24+
"usdc-filter": {
25+
vote: false,
26+
failed: false,
27+
accountInclude: [USDC_MINT],
28+
accountExclude: [],
29+
accountRequired: []
30+
}
31+
},
32+
commitment: CommitmentLevel.PROCESSED,
33+
accounts: {},
34+
slots: {},
35+
transactionsStatus: {},
36+
blocks: {},
37+
blocksMeta: {},
38+
entry: {},
39+
accountsDataSlice: [],
40+
},
41+
async (update: SubscribeUpdate) => {
42+
if (update.transaction) {
43+
const filters = update.filters || [];
44+
filters.forEach(f => {
45+
seenFilters.add(f);
46+
// Track filters seen after reconnection occurred
47+
if (reconnected && Date.now() - reconnectTime > 1000) {
48+
filtersAfterReconnect.add(f);
49+
}
50+
});
51+
}
52+
},
53+
(error) => {
54+
if (error.message.includes('Connection error') && error.message.includes('attempt 1')) {
55+
if (!reconnected) {
56+
reconnected = true;
57+
reconnectTime = Date.now();
58+
}
59+
}
60+
}
61+
);
62+
63+
// Wait for initial data
64+
await new Promise(resolve => setTimeout(resolve, 5000));
65+
66+
// Add USDT filter
67+
await stream.write({
68+
transactions: {
69+
"usdt-filter": {
70+
vote: false,
71+
failed: false,
72+
accountInclude: [USDT_MINT],
73+
accountExclude: [],
74+
accountRequired: []
75+
}
76+
},
77+
commitment: CommitmentLevel.PROCESSED,
78+
accounts: {},
79+
slots: {},
80+
transactionsStatus: {},
81+
blocks: {},
82+
blocksMeta: {},
83+
entry: {},
84+
accountsDataSlice: [],
85+
});
86+
87+
// Wait for modification to take effect and potential reconnection
88+
await new Promise(resolve => setTimeout(resolve, 60000));
89+
90+
stream.cancel();
91+
92+
// Verify results
93+
const hasUSBC = seenFilters.has('usdc-filter');
94+
const hasUSDT = seenFilters.has('usdt-filter');
95+
const usdtAfterReconnect = filtersAfterReconnect.has('usdt-filter');
96+
97+
console.log(`USDC filter present: ${hasUSBC}`);
98+
console.log(`USDT filter present: ${hasUSDT}`);
99+
console.log(`Reconnection occurred: ${reconnected}`);
100+
console.log(`USDT seen after reconnect: ${usdtAfterReconnect}`);
101+
102+
if (!hasUSBC || !hasUSDT) {
103+
throw new Error('Test failed: Not all filters received data');
104+
}
105+
106+
if (reconnected && !usdtAfterReconnect) {
107+
throw new Error('Test failed: USDT filter lost after reconnection');
108+
}
109+
110+
console.log('✅ Test passed: Subscription modifications persist across reconnections');
111+
}
112+
113+
testSubscriptionModificationPersistence().catch(err => {
114+
console.error('❌ Test failed:', err.message);
115+
process.exit(1);
116+
});

0 commit comments

Comments
 (0)