@@ -22,7 +22,7 @@ const FORK_DEPTH_SAFETY_MARGIN: u64 = 31; // Max fork depth for processed commit
2222
2323// SDK metadata constants
2424const SDK_NAME : & str = "laserstream-javascript" ;
25- const SDK_VERSION : & str = "0.2.3 " ;
25+ const SDK_VERSION : & str = "0.2.4 " ;
2626
2727/// Custom interceptor that adds SDK metadata headers to all gRPC requests
2828#[ derive( Clone ) ]
@@ -383,34 +383,59 @@ impl StreamInner {
383383 if let Some ( geyser:: subscribe_update:: UpdateOneof :: Pong ( _pong) ) = & message. update_oneof {
384384 continue ;
385385 }
386-
386+
387387 // Track slot updates for reconnection (only decode when necessary)
388388 if let Some ( geyser:: subscribe_update:: UpdateOneof :: Slot ( slot) ) = & message. update_oneof {
389389 tracked_slot. store( slot. slot, Ordering :: SeqCst ) ;
390-
390+
391391 // Check if this slot update is EXCLUSIVELY from our internal subscription
392392 // Only skip if the message contains ONLY the internal filter ID (not mixed with user subscriptions)
393393 if message. filters. len( ) == 1 && message. filters. contains( & internal_slot_sub_id) {
394394 continue ; // Skip forwarding this message
395395 }
396+
397+ // If we reach here, user ALSO has a slot subscription
398+ // Remove internal filter ID so it doesn't leak to user
399+ // OPTIMIZATION: Only clean filters for slot messages (not all messages)
400+ let mut clean_message = message;
401+ if let Some ( pos) = clean_message. filters. iter( ) . position( |id| id == & internal_slot_sub_id) {
402+ clean_message. filters. swap_remove( pos) ;
403+ }
404+
405+ // Serialize the protobuf message to bytes
406+ let mut buf = Vec :: new( ) ;
407+ if let Err ( _e) = clean_message. encode( & mut buf) {
408+ // Failed to encode protobuf message, skip this message
409+ continue ;
410+ }
411+
412+ let bytes_wrapper = crate :: SubscribeUpdateBytes ( buf) ;
413+
414+ // mark that at least one message was forwarded in this session
415+ progress_flag. store( true , Ordering :: SeqCst ) ;
416+
417+ // Use Blocking mode to prevent message drops and handle errors
418+ let status = ts_callback. call( Ok ( bytes_wrapper) , ThreadsafeFunctionCallMode :: Blocking ) ;
419+ if status != napi:: Status :: Ok {
420+ // Failed to deliver bytes to JavaScript, continue processing
421+ // Continue processing other messages instead of breaking the stream
422+ }
423+ continue ; // Skip to next message
396424 }
397-
398- // Clean up internal filter ID from ALL message types
399- let mut clean_message = message;
400- clean_message. filters. retain( |filter_id| filter_id != & internal_slot_sub_id) ;
401-
402- // Serialize the protobuf message to bytes
425+
426+ // For all non-slot messages, forward as-is (no filter cleanup needed)
427+ // Internal slot ID will never appear in account/transaction/block messages
403428 let mut buf = Vec :: new( ) ;
404- if let Err ( _e) = clean_message . encode( & mut buf) {
429+ if let Err ( _e) = message . encode( & mut buf) {
405430 // Failed to encode protobuf message, skip this message
406431 continue ;
407432 }
408-
433+
409434 let bytes_wrapper = crate :: SubscribeUpdateBytes ( buf) ;
410-
435+
411436 // mark that at least one message was forwarded in this session
412437 progress_flag. store( true , Ordering :: SeqCst ) ;
413-
438+
414439 // Use Blocking mode to prevent message drops and handle errors
415440 let status = ts_callback. call( Ok ( bytes_wrapper) , ThreadsafeFunctionCallMode :: Blocking ) ;
416441 if status != napi:: Status :: Ok {
0 commit comments