7
7
using System . Net ;
8
8
using System . Globalization ;
9
9
using PubnubApi . PubnubEventEngine ;
10
-
10
+ using PubnubApi . EventEngine . Subscribe ;
11
+ using PubnubApi . EventEngine . Subscribe . Events ;
12
+ using PubnubApi ;
11
13
12
14
namespace PubnubApi . EndPoint
13
15
{
@@ -28,13 +30,16 @@ public class SubscribeOperation2<T>: ISubscribeOperation<T>
28
30
private Dictionary < string , object > queryParam ;
29
31
private PubnubEventEngine . EventEngine pnEventEngine ;
30
32
private Pubnub PubnubInstance ;
31
- public List < SubscribeCallback > SubscribeListenerList
33
+ private SubscribeEventEngine subscribeEventEngine ;
34
+ public SubscribeEventEngineFactory subscribeEventEngineFactory ;
35
+ public string instanceId ;
36
+ public List < SubscribeCallback > SubscribeListenerList
32
37
{
33
38
get ;
34
39
set ;
35
40
} = new List < SubscribeCallback > ( ) ;
36
41
37
- public SubscribeOperation2 ( PNConfiguration pubnubConfig , IJsonPluggableLibrary jsonPluggableLibrary , IPubnubUnitTest pubnubUnit , IPubnubLog log , EndPoint . TelemetryManager telemetryManager , EndPoint . TokenManager tokenManager , Pubnub instance )
42
+ public SubscribeOperation2 ( PNConfiguration pubnubConfig , IJsonPluggableLibrary jsonPluggableLibrary , IPubnubUnitTest pubnubUnit , IPubnubLog log , EndPoint . TelemetryManager telemetryManager , EndPoint . TokenManager tokenManager , SubscribeEventEngineFactory subscribeEventEngineFactory , string instanceId , Pubnub instance )
38
43
{
39
44
PubnubInstance = instance ;
40
45
config = pubnubConfig ;
@@ -43,6 +48,8 @@ public SubscribeOperation2(PNConfiguration pubnubConfig, IJsonPluggableLibrary j
43
48
pubnubLog = log ;
44
49
pubnubTelemetryMgr = telemetryManager ;
45
50
pubnubTokenMgr = tokenManager ;
51
+ this . subscribeEventEngineFactory = subscribeEventEngineFactory ;
52
+ this . instanceId = instanceId ;
46
53
47
54
var eventEmitter = new EventEmitter ( ) ;
48
55
eventEmitter . RegisterJsonListener ( JsonCallback ) ;
@@ -772,56 +779,29 @@ public void Execute()
772
779
773
780
private void Subscribe ( string [ ] channels , string [ ] channelGroups , Dictionary < string , object > externalQueryParam )
774
781
{
775
- if ( ( channels == null || channels . Length == 0 ) && ( channelGroups == null || channelGroups . Length == 0 ) )
782
+ Action < Pubnub , PNStatus > statusListener = null ;
783
+ Action < Pubnub , PNMessageResult < T > > messageListener = null ;
784
+ if ( ( channels == null || channels . Length == 0 ) && ( channelGroups == null || channelGroups . Length == 0 ) )
776
785
{
777
- throw new ArgumentException ( "Either Channel Or Channel Group or Both should be provided." ) ;
778
- }
779
-
780
- string channel = ( channels != null ) ? string . Join ( "," , channels . OrderBy ( x => x ) . ToArray ( ) ) : "" ;
781
- string channelGroup = ( channelGroups != null ) ? string . Join ( "," , channelGroups . OrderBy ( x => x ) . ToArray ( ) ) : "" ;
782
-
783
- PNPlatform . Print ( config , pubnubLog ) ;
786
+ throw new ArgumentException ( "Either Channel Or Channel Group or Both should be provided." ) ;
787
+ }
784
788
785
- LoggingMethod . WriteToLog ( pubnubLog , string . Format ( CultureInfo . InvariantCulture , "DateTime {0}, requested subscribe for channel(s)={1} and channel group(s)={2}" , DateTime . Now . ToString ( CultureInfo . InvariantCulture ) , channel , channelGroup ) , config . LogVerbosity ) ;
786
-
787
- Dictionary < string , string > initialSubscribeUrlParams = new Dictionary < string , string > ( ) ;
788
- if ( this . subscribeTimetoken >= 0 )
789
+ if ( this . subscribeEventEngineFactory . hasEventEngine ( instanceId ) )
789
790
{
790
- initialSubscribeUrlParams . Add ( "tt" , this . subscribeTimetoken . ToString ( CultureInfo . InvariantCulture ) ) ;
791
- }
792
- if ( ! string . IsNullOrEmpty ( config . FilterExpression ) && config . FilterExpression . Trim ( ) . Length > 0 )
791
+ subscribeEventEngine = subscribeEventEngineFactory . getEventEngine ( instanceId ) ;
792
+ }
793
+ else
793
794
{
794
- initialSubscribeUrlParams . Add ( "filter-expr" , UriUtil . EncodeUriComponent ( config . FilterExpression , PNOperationType . PNSubscribeOperation , false , false , false ) ) ;
795
- }
796
-
795
+ if ( SubscribeListenerList != null && SubscribeListenerList . Count > 0 ) {
796
+ messageListener = MessageEmitter ;
797
+ statusListener = StatusEmitter ;
798
+ }
799
+ var subscribeManager = new SubscribeManager2 ( config , jsonLibrary , unit , pubnubLog , pubnubTelemetryMgr , pubnubTokenMgr , PubnubInstance ) ;
800
+ subscribeEventEngine = subscribeEventEngineFactory . initializeEventEngine ( instanceId , PubnubInstance , config , subscribeManager , statusListener , messageListener ) ;
797
801
798
- #if NETFX_CORE || WINDOWS_UWP || UAP || NETSTANDARD10 || NETSTANDARD11 || NETSTANDARD12
799
- Task . Factory . StartNew ( ( ) =>
800
- {
801
- manager = new SubscribeManager2 ( config , jsonLibrary , unit , pubnubLog , pubnubTelemetryMgr , pubnubTokenMgr , PubnubInstance ) ;
802
- //manager.CurrentPubnubInstance(PubnubInstance);
803
- pnEventEngine . Subscribe ( channels . ToList < string > ( ) , channelGroups . ToList < string > ( ) ) ;
804
- //manager = new SubscribeManager2(config, jsonLibrary, unit, pubnubLog, pubnubTelemetryMgr, pubnubTokenMgr, PubnubInstance);
805
- //manager.CurrentPubnubInstance(PubnubInstance);
806
- //manager.MultiChannelSubscribeInit<T>(PNOperationType.PNSubscribeOperation, channels, channelGroups, initialSubscribeUrlParams, externalQueryParam);
807
- } , CancellationToken . None , TaskCreationOptions . PreferFairness , TaskScheduler . Default ) . ConfigureAwait ( false ) ;
808
- #else
809
- new Thread ( ( ) =>
810
- {
811
- manager = new SubscribeManager2 ( config , jsonLibrary , unit , pubnubLog , pubnubTelemetryMgr , pubnubTokenMgr , PubnubInstance ) ;
812
- //manager.CurrentPubnubInstance(PubnubInstance);
813
- if ( pnEventEngine . CurrentState == null )
814
- {
815
- pnEventEngine . InitialState ( new State ( StateType . Unsubscribed ) { EventType = EventType . SubscriptionChanged } ) ;
816
- }
817
- pnEventEngine . Subscribe ( channels . ToList < string > ( ) , channelGroups . ToList < string > ( ) ) ;
818
- //manager = new SubscribeManager2(config, jsonLibrary, unit, pubnubLog, pubnubTelemetryMgr, pubnubTokenMgr, PubnubInstance);
819
- //manager.CurrentPubnubInstance(PubnubInstance);
820
- //manager.MultiChannelSubscribeInit<T>(PNOperationType.PNSubscribeOperation, channels, channelGroups, initialSubscribeUrlParams, externalQueryParam);
821
- } )
822
- { IsBackground = true } . Start ( ) ;
823
- #endif
824
- }
802
+ }
803
+ subscribeEventEngine . eventQueue . Enqueue ( new SubscriptionChangedEvent ( ) { Channels = channels , ChannelGroups = channelGroups } ) ;
804
+ }
825
805
826
806
internal bool Retry ( bool reconnect )
827
807
{
@@ -863,5 +843,19 @@ internal void CurrentPubnubInstance(Pubnub instance)
863
843
{
864
844
PubnubInstance = instance ;
865
845
}
866
- }
846
+ private void MessageEmitter < T > ( Pubnub pubnubInstance , PNMessageResult < T > messageResult )
847
+ {
848
+ foreach ( var listener in SubscribeListenerList ) {
849
+ listener . Message ( pubnubInstance , messageResult ) ;
850
+ }
851
+ }
852
+
853
+ private void StatusEmitter ( Pubnub pubnubInstance , PNStatus status )
854
+ {
855
+ foreach ( var listener in SubscribeListenerList ) {
856
+ listener . Status ( pubnubInstance , status ) ;
857
+ }
858
+ }
859
+
860
+ }
867
861
}
0 commit comments