Skip to content

Commit f655009

Browse files
committed
ReceivingReconnect WIP
1 parent 189b2c4 commit f655009

File tree

8 files changed

+323
-61
lines changed

8 files changed

+323
-61
lines changed

src/Api/PubnubApi/EndPoint/PubSub/SubscribeOperation2.cs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,14 @@ public SubscribeOperation2(PNConfiguration pubnubConfig, IJsonPluggableLibrary j
7272
receivingEffectHandler.AnnounceStatus = Announce;
7373
receivingEffectHandler.AnnounceMessage = Announce;
7474

75+
var receiveReconnectEffectHandler = new ReceiveReconnectingEffectHandler<object>(eventEmitter);
76+
receiveReconnectEffectHandler.ReconnectionPolicy = config.ReconnectionPolicy;
77+
receiveReconnectEffectHandler.MaxRetries = config.ConnectionMaxRetries;
78+
receiveReconnectEffectHandler.LogCallback = LogCallback;
79+
receiveReconnectEffectHandler.ReceiveReconnectRequested += ReceiveReconnectEffect_ReceiveRequested;
80+
receiveReconnectEffectHandler.CancelReceiveReconnectRequested += ReceiveReconnectEffect_CancelReceiveRequested;
81+
receiveReconnectEffectHandler.AnnounceStatus = Announce;
82+
7583
var effectDispatcher = new EffectDispatcher();
7684
effectDispatcher.PubnubUnitTest = unit;
7785
effectDispatcher.Register(EventType.Handshake,handshakeEffectHandler);
@@ -90,6 +98,11 @@ public SubscribeOperation2(PNConfiguration pubnubConfig, IJsonPluggableLibrary j
9098
effectDispatcher.Register(EventType.CancelReceiveMessages, receivingEffectHandler);
9199
effectDispatcher.Register(EventType.ReceiveSuccess, receivingEffectHandler);
92100

101+
effectDispatcher.Register(EventType.ReceiveReconnect, receiveReconnectEffectHandler);
102+
effectDispatcher.Register(EventType.CancelReceiveReconnect, receiveReconnectEffectHandler);
103+
effectDispatcher.Register(EventType.ReceiveReconnectSuccess, receiveReconnectEffectHandler);
104+
effectDispatcher.Register(EventType.ReceiveReconnectGiveUp, receiveReconnectEffectHandler);
105+
93106
pnEventEngine = new EventEngine(effectDispatcher, eventEmitter);
94107
pnEventEngine.PubnubUnitTest = unit;
95108
pnEventEngine.Setup<T>(config);
@@ -139,6 +152,17 @@ private void ReceivingEffect_CancelReceiveRequested(object sender, CancelReceive
139152
{
140153
manager.ReceiveRequestCancellation();
141154
}
155+
private void ReceiveReconnectEffect_ReceiveRequested(object sender, ReceiveReconnectRequestEventArgs e)
156+
{
157+
Tuple<string, PNStatus> resp = manager.ReceiveRequest<T>(PNOperationType.PNSubscribeOperation, e.ExtendedState.Channels.ToArray(), e.ExtendedState.ChannelGroups.ToArray(), e.ExtendedState.Timetoken, e.ExtendedState.Region, null, null).Result;
158+
159+
string jsonResp = resp.Item1;
160+
e.ReceiveReconnectResponseCallback?.Invoke(jsonResp);
161+
}
162+
private void ReceiveReconnectEffect_CancelReceiveRequested(object sender, CancelReceiveReconnectRequestEventArgs e)
163+
{
164+
manager.ReceiveRequestCancellation();
165+
}
142166

143167
private void JsonCallback(string json, bool zeroTimeTokenRequest, int messageCount)
144168
{

src/Api/PubnubApi/EventEngine/EventEngine.cs

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -255,6 +255,11 @@ public void Announce()
255255
//Ignore Announce for 200
256256
return;
257257
}
258+
else if (Handler is ReceiveReconnectingEffectHandler<object> && status.StatusCode == 200)
259+
{
260+
//Ignore Announce for 200
261+
return;
262+
}
258263
System.Diagnostics.Debug.WriteLine($"Status Category = {status.Category} to be announced");
259264
AnnounceStatus(status);
260265
}
@@ -642,7 +647,7 @@ public void Setup<T>(PNConfiguration config)
642647

643648
EmitMessages<T> receiveEmitMessages = new EmitMessages<T>(null);
644649
receiveEmitMessages.Name = "EMIT_EVENTS";
645-
receiveEmitMessages.Effectype = EventType.ReceiveSuccess;
650+
receiveEmitMessages.Effectype = EventType.ReceiveMessages;
646651

647652
EmitStatus receiveDisconnectEmitStatus = new EmitStatus();
648653
receiveDisconnectEmitStatus.Name = "EMIT_STATUS";
@@ -662,7 +667,7 @@ public void Setup<T>(PNConfiguration config)
662667
.On(EventType.SubscriptionRestored, StateType.Receiving)
663668
.On(EventType.ReceiveSuccess, StateType.Receiving, new List<EffectInvocation>()
664669
{
665-
receiveEmitStatus,
670+
//receiveEmitStatus,
666671
receiveEmitMessages
667672
}
668673
)
@@ -696,11 +701,11 @@ public void Setup<T>(PNConfiguration config)
696701
#region ReceiveReconnecting Effect Invocations and Emit Status
697702
EmitStatus receiveReconnectEmitStatus = new EmitStatus();
698703
receiveReconnectEmitStatus.Name = "RECONNECT_EMIT_STATUS";
699-
receiveEmitStatus.Effectype = EventType.ReceiveReconnectSuccess;
704+
receiveReconnectEmitStatus.Effectype = EventType.ReceiveReconnectSuccess;
700705

701706
EmitMessages<T> receiveReconnectEmitMessages = new EmitMessages<T>(null);
702707
receiveReconnectEmitMessages.Name = "RECEIVE_RECONNECT_EVENTS";
703-
receiveReconnectEmitMessages.Effectype = EventType.ReceiveReconnectSuccess;
708+
receiveReconnectEmitMessages.Effectype = EventType.ReceiveMessages;
704709

705710
EmitStatus receiveReconnectDisconnectEmitStatus = new EmitStatus();
706711
receiveReconnectDisconnectEmitStatus.Name = "RECONNECT_DISCONNECT_STATUS";
@@ -724,15 +729,11 @@ public void Setup<T>(PNConfiguration config)
724729
.On(EventType.SubscriptionRestored, StateType.Receiving)
725730
.On(EventType.ReceiveReconnectSuccess, StateType.Receiving, new List<EffectInvocation>()
726731
{
727-
receiveReconnectEmitStatus,
732+
//receiveReconnectEmitStatus,
728733
receiveReconnectEmitMessages
729734
}
730735
)
731-
.On(EventType.Disconnect, StateType.ReceiveStopped, new List<EffectInvocation>()
732-
{
733-
receiveReconnectDisconnectEmitStatus
734-
}
735-
)
736+
.On(EventType.Disconnect, StateType.ReceiveStopped)
736737
.On(EventType.ReceiveReconnectGiveUp, StateType.ReceiveFailed, new List<EffectInvocation>()
737738
{
738739
receiveReconnectGiveupEmitStatus

src/Api/PubnubApi/EventEngine/HandshakeReconnectEffectHandler.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,7 @@ private void PrepareAndEmitHandshakeReconnectGiveupEvent(Exception ex)
233233

234234
emitter.emit(handshakeReconnectGiveupEvent);
235235
}
236+
236237
//private void PrepareAndEmitHandshakeFailedEvent(Exception ex)
237238
//{
238239
// HandshakeFailure handshakeFailureEvent = new HandshakeFailure();
Lines changed: 268 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,268 @@
1+
using Newtonsoft.Json;
2+
using System;
3+
using System.Threading;
4+
using System.Threading.Tasks;
5+
6+
namespace PubnubApi.PubnubEventEngine
7+
{
8+
public class ReceiveReconnectRequestEventArgs : EventArgs
9+
{
10+
public ExtendedState ExtendedState { get; set; }
11+
public Action<string> ReceiveReconnectResponseCallback { get; set; }
12+
}
13+
public class CancelReceiveReconnectRequestEventArgs : EventArgs
14+
{
15+
}
16+
public class ReceiveReconnectingEffectHandler<T> : IEffectInvocationHandler
17+
{
18+
EventEmitter eventEmitter;
19+
private ExtendedState extendedState { get; set;}
20+
public Action<string> LogCallback { get; set; }
21+
public Action<PNStatus> AnnounceStatus { get; set; }
22+
public PNReconnectionPolicy ReconnectionPolicy { get; set; }
23+
public int MaxRetries { get; set; }
24+
private Message<object>[] receiveMessages { get; set; }
25+
private PNStatus pnStatus { get; set; }
26+
private int timerInterval;
27+
const int MINEXPONENTIALBACKOFF = 1;
28+
const int MAXEXPONENTIALBACKOFF = 25;
29+
const int INTERVAL = 3;
30+
31+
public event EventHandler<ReceiveReconnectRequestEventArgs> ReceiveReconnectRequested;
32+
public event EventHandler<CancelReceiveReconnectRequestEventArgs> CancelReceiveReconnectRequested;
33+
System.Threading.Timer timer;
34+
protected virtual void OnReceiveReconnectRequested(ReceiveReconnectRequestEventArgs e)
35+
{
36+
EventHandler<ReceiveReconnectRequestEventArgs> handler = ReceiveReconnectRequested;
37+
if (handler != null)
38+
{
39+
handler(this, e);
40+
}
41+
}
42+
protected virtual void OnCancelReceiveReconnectRequested(CancelReceiveReconnectRequestEventArgs e)
43+
{
44+
EventHandler<CancelReceiveReconnectRequestEventArgs> handler = CancelReceiveReconnectRequested;
45+
if (handler != null)
46+
{
47+
handler(this, e);
48+
}
49+
}
50+
51+
CancellationTokenSource cancellationTokenSource;
52+
public ReceiveReconnectingEffectHandler(EventEmitter emitter)
53+
{
54+
this.eventEmitter = emitter;
55+
cancellationTokenSource = new CancellationTokenSource();
56+
}
57+
58+
public async void Start(ExtendedState context, EventType eventType)
59+
{
60+
extendedState = context;
61+
await Task.Factory.StartNew(() => { });
62+
if (cancellationTokenSource != null && cancellationTokenSource.Token.CanBeCanceled) {
63+
Cancel();
64+
}
65+
66+
if (ReconnectionPolicy == PNReconnectionPolicy.EXPONENTIAL)
67+
{
68+
double numberForMath = extendedState.Attempts % 6;
69+
timerInterval = (int)(Math.Pow(2, numberForMath) - 1);
70+
if (timerInterval > MAXEXPONENTIALBACKOFF)
71+
{
72+
timerInterval = MINEXPONENTIALBACKOFF;
73+
}
74+
else if (timerInterval < 1)
75+
{
76+
timerInterval = MINEXPONENTIALBACKOFF;
77+
}
78+
}
79+
else if (ReconnectionPolicy == PNReconnectionPolicy.LINEAR)
80+
{
81+
timerInterval = INTERVAL;
82+
}
83+
else
84+
{
85+
timerInterval = -1;
86+
}
87+
LogCallback?.Invoke($"ReceiveReconnectingEffectHandler ReconnectionPolicy = {ReconnectionPolicy}; Interval = {timerInterval}");
88+
89+
if (timer != null)
90+
{
91+
timer.Change(Timeout.Infinite, Timeout.Infinite);
92+
}
93+
if (timerInterval != -1)
94+
{
95+
timer = new Timer(new TimerCallback(ReceiveReconnectTimerCallback), null,
96+
(-1 == timerInterval) ? Timeout.Infinite : timerInterval * 1000, Timeout.Infinite);
97+
}
98+
else
99+
{
100+
PrepareFailurePNStatus(new ReceiveError() { Status = 400 });
101+
PrepareAndEmitReceiveReconnectGiveupEvent(null);
102+
}
103+
}
104+
105+
private void ReceiveReconnectTimerCallback(object state)
106+
{
107+
LogCallback?.Invoke("ReceiveReconnectingEffectHandler Timer interval invoke");
108+
ReceiveReconnectRequestEventArgs args = new ReceiveReconnectRequestEventArgs();
109+
args.ExtendedState = extendedState;
110+
args.ReceiveReconnectResponseCallback = OnReceiveReconnectEffectResponseReceived;
111+
OnReceiveReconnectRequested(args);
112+
}
113+
114+
public void OnReceiveReconnectEffectResponseReceived(string json)
115+
{
116+
try
117+
{
118+
LogCallback?.Invoke($"OnReceiveReconnectEffectResponseReceived Json Response = {json}");
119+
var receivedResponse = JsonConvert.DeserializeObject<ReceiveingResponse<object>>(json);
120+
if (receivedResponse != null && receivedResponse.Timetoken != null)
121+
{
122+
receiveMessages = receivedResponse.Messages;
123+
124+
ReceiveReconnectSuccess receiveReconnectSuccessEvent = new ReceiveReconnectSuccess();
125+
receiveReconnectSuccessEvent.SubscriptionCursor = new SubscriptionCursor();
126+
receiveReconnectSuccessEvent.SubscriptionCursor.Timetoken = receivedResponse.Timetoken.Timestamp;
127+
receiveReconnectSuccessEvent.SubscriptionCursor.Region = receivedResponse.Timetoken.Region;
128+
receiveReconnectSuccessEvent.EventPayload.Timetoken = receivedResponse.Timetoken.Timestamp;
129+
receiveReconnectSuccessEvent.EventPayload.Region = receivedResponse.Timetoken.Region;
130+
receiveReconnectSuccessEvent.EventType = EventType.ReceiveReconnectSuccess;
131+
receiveReconnectSuccessEvent.Name = "RECEIVE_RECONNECT_SUCCESS";
132+
LogCallback?.Invoke("OnReceiveReconnectEffectResponseReceived - EventType.ReceiveReconnectSuccess");
133+
134+
pnStatus = new PNStatus();
135+
pnStatus.StatusCode = 200;
136+
pnStatus.Operation = PNOperationType.PNSubscribeOperation;
137+
pnStatus.AffectedChannels = extendedState.Channels;
138+
pnStatus.AffectedChannelGroups = extendedState.ChannelGroups;
139+
pnStatus.Category = PNStatusCategory.PNConnectedCategory;
140+
pnStatus.Error = false;
141+
142+
extendedState.Attempts = 0;
143+
144+
eventEmitter.emit(receiveReconnectSuccessEvent);
145+
}
146+
else
147+
{
148+
ReceiveReconnectFailure receiveReconnectFailureEvent = new ReceiveReconnectFailure();
149+
receiveReconnectFailureEvent.Name = "RECEIVE_RECONNECT_FAILURE";
150+
receiveReconnectFailureEvent.EventType = EventType.ReceiveReconnectFailure;
151+
LogCallback?.Invoke("OnReceivingReconnectEffectResponseReceived - EventType.ReceiveReconnectFailure");
152+
153+
pnStatus = new PNStatus();
154+
pnStatus.Operation = PNOperationType.PNSubscribeOperation;
155+
pnStatus.AffectedChannels = extendedState.Channels;
156+
pnStatus.AffectedChannelGroups = extendedState.ChannelGroups;
157+
pnStatus.Error = true;
158+
159+
160+
var receiveReconnectError = JsonConvert.DeserializeObject<ReceiveError>(json);
161+
extendedState.Attempts++;
162+
PrepareFailurePNStatus(receiveReconnectError);
163+
164+
if (MaxRetries != -1 && extendedState.Attempts > MaxRetries)
165+
{
166+
LogCallback?.Invoke($"Attempt: {extendedState.Attempts}; OnReceivingReconnectEffectResponseReceived - EventType.ReceiveReconnectGiveUp");
167+
PrepareAndEmitReceiveReconnectGiveupEvent(null);
168+
}
169+
else
170+
{
171+
LogCallback?.Invoke($"Attempt: {extendedState.Attempts}; OnReceivingReconnectEffectResponseReceived - EventType.ReceiveReconnectFailure");
172+
PrepareAndEmitReceiveReconnectFailureEvent(null);
173+
}
174+
}
175+
}
176+
catch (Exception ex)
177+
{
178+
extendedState.Attempts++;
179+
PrepareFailurePNStatus(new ReceiveError() { Status = 400 });
180+
181+
if (MaxRetries != -1 && extendedState.Attempts > MaxRetries)
182+
{
183+
LogCallback?.Invoke($"Attempt: {extendedState.Attempts}; OnHandshakeReconnectEffectResponseReceived - EventType.ReceiveReconnectGiveUp");
184+
PrepareAndEmitReceiveReconnectGiveupEvent(null);
185+
186+
}
187+
else
188+
{
189+
LogCallback?.Invoke($"Attempt: {extendedState.Attempts}; OnHandshakeReconnectEffectResponseReceived - EventType.ReceiveReconnectFailure");
190+
PrepareAndEmitReceiveReconnectFailureEvent(ex);
191+
}
192+
}
193+
finally
194+
{
195+
if (timer != null)
196+
{
197+
try
198+
{
199+
timer.Change(Timeout.Infinite, Timeout.Infinite);
200+
}
201+
catch { }
202+
}
203+
}
204+
}
205+
206+
private void PrepareFailurePNStatus(ReceiveError error)
207+
{
208+
pnStatus = new PNStatus();
209+
pnStatus.StatusCode = (error != null && error.Status != 0) ? error.Status : 504;
210+
pnStatus.Operation = PNOperationType.PNSubscribeOperation;
211+
pnStatus.AffectedChannels = extendedState.Channels;
212+
pnStatus.AffectedChannelGroups = extendedState.ChannelGroups;
213+
pnStatus.Category = PNStatusCategory.PNNetworkIssuesCategory;
214+
pnStatus.Error = true;
215+
}
216+
217+
private void PrepareAndEmitReceiveReconnectFailureEvent(Exception ex)
218+
{
219+
ReceiveReconnectFailure receiveReconnectFailureEvent = new ReceiveReconnectFailure();
220+
receiveReconnectFailureEvent.Name = "RECEIVE_RECONNECT_FAILURE";
221+
receiveReconnectFailureEvent.EventType = EventType.ReceiveReconnectFailure;
222+
receiveReconnectFailureEvent.Attempts = extendedState.Attempts;
223+
if (ex != null)
224+
{
225+
receiveReconnectFailureEvent.EventPayload.exception = ex;
226+
}
227+
228+
eventEmitter.emit(receiveReconnectFailureEvent);
229+
}
230+
231+
private void PrepareAndEmitReceiveReconnectGiveupEvent(Exception ex)
232+
{
233+
ReceiveReconnectGiveUp receiveReconnectGiveupEvent = new ReceiveReconnectGiveUp();
234+
receiveReconnectGiveupEvent.Name = "RECEIVE_RECONNECT_GIVEUP";
235+
receiveReconnectGiveupEvent.EventType = EventType.ReceiveReconnectGiveUp;
236+
receiveReconnectGiveupEvent.Attempts = extendedState.Attempts;
237+
if (ex != null)
238+
{
239+
receiveReconnectGiveupEvent.EventPayload.exception = ex;
240+
}
241+
242+
eventEmitter.emit(receiveReconnectGiveupEvent);
243+
}
244+
245+
public void Cancel()
246+
{
247+
if (cancellationTokenSource != null)
248+
{
249+
LogCallback?.Invoke($"ReceiveReconnectEffectHandler - cancellationTokenSource - cancellion attempted.");
250+
cancellationTokenSource.Cancel();
251+
}
252+
LogCallback?.Invoke($"ReceiveReconnectEffectHandler - invoking OnCancelReceiveReconnectRequested.");
253+
CancelReceiveReconnectRequestEventArgs args = new CancelReceiveReconnectRequestEventArgs();
254+
OnCancelReceiveReconnectRequested(args);
255+
}
256+
public void Run(ExtendedState context)
257+
{
258+
if (AnnounceStatus != null && pnStatus != null)
259+
{
260+
AnnounceStatus(pnStatus);
261+
}
262+
}
263+
public PNStatus GetPNStatus()
264+
{
265+
return pnStatus;
266+
}
267+
}
268+
}

0 commit comments

Comments
 (0)