Skip to content

Commit be568f7

Browse files
committed
Merge remote-tracking branch 'origin/eventengine/handshake-handler' into eventengine/handshake-handler
2 parents 0da6c9a + f8d19ee commit be568f7

12 files changed

+139
-58
lines changed

src/Api/PubnubApi/EventEngine/Subscribe/Effects/ReceivingEffectHandler.cs

+2-2
Original file line numberDiff line numberDiff line change
@@ -31,13 +31,13 @@ public ReceivingEffectHandler(SubscribeManager2 manager, EventQueue eventQueue)
3131

3232
public Task Run(ReceiveReconnectInvocation invocation)
3333
{
34-
if (!ReconnectionDelayUtil.shouldRetry(invocation.Policy, invocation.AttemptedRetries, invocation.MaxConnectionRetry))
34+
if (!ReconnectionDelayUtil.shouldRetry(invocation.ReconnectionPolicy, invocation.AttemptedRetries, invocation.MaximumReconnectionRetries))
3535
{
3636
eventQueue.Enqueue(new ReceiveReconnectGiveUpEvent() { Status = new PNStatus(PNStatusCategory.PNCancelledCategory) });
3737
}
3838
else
3939
{
40-
retryDelay = new Delay(ReconnectionDelayUtil.CalculateDelay(invocation.Policy, invocation.AttemptedRetries));
40+
retryDelay = new Delay(ReconnectionDelayUtil.CalculateDelay(invocation.ReconnectionPolicy, invocation.AttemptedRetries));
4141
// Run in the background
4242
retryDelay.Start().ContinueWith((_) => this.Run((ReceiveMessagesInvocation)invocation));
4343
}

src/Api/PubnubApi/EventEngine/Subscribe/Invocations/SubscriptionInvocations.cs

+4-4
Original file line numberDiff line numberDiff line change
@@ -56,17 +56,17 @@ internal class CancelHandshakeInvocation : HandshakeInvocation, Core.IEffectCanc
5656
internal class HandshakeReconnectInvocation: HandshakeInvocation
5757
{
5858
public int AttemptedRetries;
59-
public int MaxConnectionRetry;
60-
public PNReconnectionPolicy Policy;
59+
public int MaximumReconnectionRetries;
60+
public PNReconnectionPolicy ReconnectionPolicy;
6161
}
6262

6363
internal class CancelHandshakeReconnectInvocation: HandshakeReconnectInvocation, Core.IEffectCancelInvocation { }
6464

6565
internal class ReceiveReconnectInvocation: ReceiveMessagesInvocation
6666
{
6767
public int AttemptedRetries;
68-
public int MaxConnectionRetry;
69-
public PNReconnectionPolicy Policy;
68+
public int MaximumReconnectionRetries;
69+
public PNReconnectionPolicy ReconnectionPolicy;
7070
}
7171

7272
internal class CancelReceiveReconnectInvocation: ReceiveReconnectInvocation, Core.IEffectCancelInvocation { }

src/Api/PubnubApi/EventEngine/Subscribe/States/HandshakeFailedState.cs

+17-7
Original file line numberDiff line numberDiff line change
@@ -9,27 +9,37 @@ internal class HandshakeFailedState : Core.State
99
{
1010
public IEnumerable<string> Channels;
1111
public IEnumerable<string> ChannelGroups;
12+
public PNReconnectionPolicy ReconnectionPolicy;
13+
public int MaximumReconnectionRetries;
1214

13-
public override TransitionResult Transition(IEvent e)
15+
public override TransitionResult Transition(IEvent e)
1416
{
1517
return e switch
1618
{
1719
Events.SubscriptionChangedEvent subscriptionChanged => new HandshakingState()
1820
{
19-
Channels = subscriptionChanged.Channels, ChannelGroups = subscriptionChanged.ChannelGroups,
20-
},
21+
Channels = subscriptionChanged.Channels,
22+
ChannelGroups = subscriptionChanged.ChannelGroups,
23+
MaximumReconnectionRetries = this.MaximumReconnectionRetries,
24+
ReconnectionPolicy = this.ReconnectionPolicy
25+
},
2126

2227
Events.ReconnectEvent reconnect => new HandshakingState()
2328
{
24-
Channels = reconnect.Channels, ChannelGroups = reconnect.ChannelGroups,
25-
},
29+
Channels = reconnect.Channels,
30+
ChannelGroups = reconnect.ChannelGroups,
31+
MaximumReconnectionRetries = this.MaximumReconnectionRetries,
32+
ReconnectionPolicy = this.ReconnectionPolicy
33+
},
2634

2735
Events.SubscriptionRestoredEvent subscriptionRestored => new ReceivingState()
2836
{
2937
Channels = subscriptionRestored.Channels,
3038
ChannelGroups = subscriptionRestored.ChannelGroups,
31-
Cursor = subscriptionRestored.Cursor
32-
},
39+
Cursor = subscriptionRestored.Cursor,
40+
MaximumReconnectionRetries = this.MaximumReconnectionRetries,
41+
ReconnectionPolicy = this.ReconnectionPolicy
42+
},
3343

3444
_ => null
3545
};

src/Api/PubnubApi/EventEngine/Subscribe/States/HandshakeReconnectingState.cs

+3-3
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,13 @@ internal class HandshakeReconnectingState : Core.State
99
{
1010
public IEnumerable<string> Channels;
1111
public IEnumerable<string> ChannelGroups;
12-
public PNReconnectionPolicy RetryPolicy;
13-
public int MaxConnectionRetry;
12+
public PNReconnectionPolicy ReconnectionPolicy;
13+
public int MaximumReconnectionRetries;
1414
public int AttemptedRetries;
1515

1616

1717
public override IEnumerable<IEffectInvocation> OnEntry => new HandshakeReconnectInvocation()
18-
{ Channels = this.Channels, ChannelGroups = this.ChannelGroups, Policy = this.RetryPolicy, MaxConnectionRetry = this.MaxConnectionRetry, AttemptedRetries = this.AttemptedRetries }.AsArray();
18+
{ Channels = this.Channels, ChannelGroups = this.ChannelGroups, ReconnectionPolicy = this.ReconnectionPolicy, MaximumReconnectionRetries = this.MaximumReconnectionRetries, AttemptedRetries = this.AttemptedRetries }.AsArray();
1919
public override IEnumerable<IEffectInvocation> OnExit { get; } = new CancelHandshakeReconnectInvocation().AsArray();
2020

2121
public override TransitionResult Transition(IEvent e)

src/Api/PubnubApi/EventEngine/Subscribe/States/HandshakeStoppedState.cs

+17-7
Original file line numberDiff line numberDiff line change
@@ -9,26 +9,36 @@ internal class HandshakeStoppedState : Core.State
99
{
1010
public IEnumerable<string> Channels;
1111
public IEnumerable<string> ChannelGroups;
12-
13-
public override TransitionResult Transition(IEvent e)
12+
public PNReconnectionPolicy ReconnectionPolicy;
13+
public int MaximumReconnectionRetries;
14+
15+
public override TransitionResult Transition(IEvent e)
1416
{
1517
return e switch
1618
{
1719
Events.SubscriptionChangedEvent subscriptionChanged => new HandshakingState()
1820
{
19-
Channels = subscriptionChanged.Channels, ChannelGroups = subscriptionChanged.ChannelGroups,
20-
},
21+
Channels = subscriptionChanged.Channels,
22+
ChannelGroups = subscriptionChanged.ChannelGroups,
23+
MaximumReconnectionRetries = this.MaximumReconnectionRetries,
24+
ReconnectionPolicy = this.ReconnectionPolicy
25+
},
2126

2227
Events.ReconnectEvent reconnect => new HandshakingState()
2328
{
24-
Channels = reconnect.Channels, ChannelGroups = reconnect.ChannelGroups,
25-
},
29+
Channels = reconnect.Channels,
30+
ChannelGroups = reconnect.ChannelGroups,
31+
MaximumReconnectionRetries = this.MaximumReconnectionRetries,
32+
ReconnectionPolicy = this.ReconnectionPolicy
33+
},
2634

2735
Events.SubscriptionRestoredEvent subscriptionRestored => new ReceivingState()
2836
{
2937
Channels = subscriptionRestored.Channels,
3038
ChannelGroups = subscriptionRestored.ChannelGroups,
31-
Cursor = subscriptionRestored.Cursor
39+
MaximumReconnectionRetries = this.MaximumReconnectionRetries,
40+
ReconnectionPolicy = this.ReconnectionPolicy,
41+
Cursor = subscriptionRestored.Cursor
3242
},
3343

3444
_ => null

src/Api/PubnubApi/EventEngine/Subscribe/States/HandshakingState.cs

+29-11
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,10 @@ internal class HandshakingState : Core.State
99
{
1010
public IEnumerable<string> Channels;
1111
public IEnumerable<string> ChannelGroups;
12+
public PNReconnectionPolicy ReconnectionPolicy;
13+
public int MaximumReconnectionRetries;
1214

13-
public override IEnumerable<IEffectInvocation> OnEntry => new HandshakeInvocation()
15+
public override IEnumerable<IEffectInvocation> OnEntry => new HandshakeInvocation()
1416
{ Channels = this.Channels, ChannelGroups = this.ChannelGroups }.AsArray();
1517

1618
public override IEnumerable<IEffectInvocation> OnExit { get; } = new CancelHandshakeInvocation().AsArray();
@@ -21,30 +23,46 @@ public override TransitionResult Transition(IEvent e)
2123
{
2224
Events.SubscriptionChangedEvent subscriptionChanged => new States.HandshakingState()
2325
{
24-
Channels = subscriptionChanged.Channels, ChannelGroups = subscriptionChanged.ChannelGroups
25-
},
26+
Channels = subscriptionChanged.Channels,
27+
ChannelGroups = subscriptionChanged.ChannelGroups,
28+
MaximumReconnectionRetries = this.MaximumReconnectionRetries,
29+
ReconnectionPolicy = this.ReconnectionPolicy
30+
},
2631

2732
Events.SubscriptionRestoredEvent subscriptionRestored => new States.ReceivingState()
2833
{
2934
Channels = subscriptionRestored.Channels,
3035
ChannelGroups = subscriptionRestored.ChannelGroups,
31-
Cursor = subscriptionRestored.Cursor
32-
},
36+
Cursor = subscriptionRestored.Cursor,
37+
MaximumReconnectionRetries = this.MaximumReconnectionRetries,
38+
ReconnectionPolicy = this.ReconnectionPolicy
39+
},
3340

3441
Events.HandshakeFailureEvent handshakeFailure => new States.HandshakeReconnectingState()
3542
{
36-
Channels = this.Channels, ChannelGroups = this.ChannelGroups
37-
}.With(new EmitStatusInvocation(handshakeFailure.Status)),
43+
Channels = this.Channels,
44+
ChannelGroups = this.ChannelGroups,
45+
AttemptedRetries = 0,
46+
MaximumReconnectionRetries = this.MaximumReconnectionRetries,
47+
ReconnectionPolicy = this.ReconnectionPolicy
48+
}.With(new EmitStatusInvocation(handshakeFailure.Status)),
3849

3950
Events.DisconnectEvent disconnect => new States.HandshakeStoppedState()
4051
{
41-
Channels = disconnect.Channels, ChannelGroups = disconnect.ChannelGroups,
42-
}.With(new EmitStatusInvocation(PNStatusCategory.PNDisconnectedCategory)),
52+
Channels = disconnect.Channels,
53+
ChannelGroups = disconnect.ChannelGroups,
54+
MaximumReconnectionRetries = this.MaximumReconnectionRetries,
55+
ReconnectionPolicy = this.ReconnectionPolicy
56+
}.With(new EmitStatusInvocation(PNStatusCategory.PNDisconnectedCategory)),
4357

4458
Events.HandshakeSuccessEvent success => new ReceivingState()
4559
{
46-
Channels = this.Channels, ChannelGroups = this.ChannelGroups, Cursor = success.Cursor
47-
}.With(new EmitStatusInvocation(success.Status)),
60+
Channels = this.Channels,
61+
ChannelGroups = this.ChannelGroups,
62+
Cursor = success.Cursor,
63+
MaximumReconnectionRetries = this.MaximumReconnectionRetries,
64+
ReconnectionPolicy = this.ReconnectionPolicy
65+
}.With(new EmitStatusInvocation(success.Status)),
4866

4967
_ => null
5068
};

src/Api/PubnubApi/EventEngine/Subscribe/States/ReceiveFailedState.cs

+15-7
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,10 @@ internal class ReceiveFailedState : Core.State
1111
public IEnumerable<string> Channels;
1212
public IEnumerable<string> ChannelGroups;
1313
public SubscriptionCursor Cursor;
14+
public PNReconnectionPolicy ReconnectionPolicy;
15+
public int MaximumReconnectionRetries;
1416

15-
public IEnumerable<IEffectInvocation> OnEntry { get; }
17+
public IEnumerable<IEffectInvocation> OnEntry { get; }
1618
public IEnumerable<IEffectInvocation> OnExit { get; }
1719

1820
public override TransitionResult Transition(IEvent e)
@@ -23,22 +25,28 @@ public override TransitionResult Transition(IEvent e)
2325
{
2426
Channels = subscriptionChanged.Channels,
2527
ChannelGroups = subscriptionChanged.ChannelGroups,
26-
Cursor = this.Cursor
27-
},
28+
Cursor = this.Cursor,
29+
MaximumReconnectionRetries = this.MaximumReconnectionRetries,
30+
ReconnectionPolicy = this.ReconnectionPolicy
31+
},
2832

2933
Events.ReconnectEvent reconnect => new ReceivingState()
3034
{
3135
Channels = reconnect.Channels,
3236
ChannelGroups = reconnect.ChannelGroups,
33-
Cursor = reconnect.Cursor
34-
},
37+
Cursor = reconnect.Cursor,
38+
MaximumReconnectionRetries = this.MaximumReconnectionRetries,
39+
ReconnectionPolicy = this.ReconnectionPolicy
40+
},
3541

3642
Events.SubscriptionRestoredEvent subscriptionRestored => new ReceivingState()
3743
{
3844
Channels = subscriptionRestored.Channels,
3945
ChannelGroups = subscriptionRestored.ChannelGroups,
40-
Cursor = subscriptionRestored.Cursor
41-
},
46+
Cursor = subscriptionRestored.Cursor,
47+
MaximumReconnectionRetries = this.MaximumReconnectionRetries,
48+
ReconnectionPolicy = this.ReconnectionPolicy
49+
},
4250

4351
_ => null
4452
};

src/Api/PubnubApi/EventEngine/Subscribe/States/ReceiveReconnectingState.cs

+16-3
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,20 @@ internal class ReceiveReconnectingState : Core.State
1010
{
1111
public IEnumerable<string> Channels;
1212
public IEnumerable<string> ChannelGroups;
13-
public SubscriptionCursor Cursor;
13+
public PNReconnectionPolicy ReconnectionPolicy;
14+
public int MaximumReconnectionRetries;
15+
public int AttemptedRetries;
16+
public SubscriptionCursor Cursor;
1417

1518
public override IEnumerable<IEffectInvocation> OnEntry => new ReceiveReconnectInvocation()
16-
{ Channels = this.Channels, ChannelGroups = this.ChannelGroups, Cursor = this.Cursor }.AsArray();
19+
{
20+
Channels = this.Channels,
21+
ChannelGroups = this.ChannelGroups,
22+
Cursor = this.Cursor,
23+
ReconnectionPolicy = this.ReconnectionPolicy,
24+
MaximumReconnectionRetries = this.MaximumReconnectionRetries,
25+
AttemptedRetries = this.AttemptedRetries
26+
}.AsArray();
1727

1828
public override IEnumerable<IEffectInvocation> OnExit { get; } =
1929
new CancelReceiveReconnectInvocation().AsArray();
@@ -54,7 +64,10 @@ public override TransitionResult Transition(IEvent e)
5464
{
5565
Channels = this.Channels,
5666
ChannelGroups = this.ChannelGroups,
57-
Cursor = this.Cursor
67+
Cursor = this.Cursor,
68+
AttemptedRetries = this.AttemptedRetries,
69+
MaximumReconnectionRetries = this.MaximumReconnectionRetries,
70+
ReconnectionPolicy = this.ReconnectionPolicy
5871
}.With(new EmitStatusInvocation(receiveReconnectFailure.Status)),
5972

6073
Events.ReceiveReconnectGiveUpEvent receiveReconnectGiveUp => new ReceiveFailedState()

src/Api/PubnubApi/EventEngine/Subscribe/States/ReceiveStoppedState.cs

+14-6
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,10 @@ internal class ReceiveStoppedState : Core.State
1111
public IEnumerable<string> Channels;
1212
public IEnumerable<string> ChannelGroups;
1313
public SubscriptionCursor Cursor;
14+
public PNReconnectionPolicy ReconnectionPolicy;
15+
public int MaximumReconnectionRetries;
1416

15-
public override TransitionResult Transition(IEvent e)
17+
public override TransitionResult Transition(IEvent e)
1618
{
1719
return e switch
1820
{
@@ -21,21 +23,27 @@ public override TransitionResult Transition(IEvent e)
2123
Channels = subscriptionChanged.Channels,
2224
ChannelGroups = subscriptionChanged.ChannelGroups,
2325
Cursor = this.Cursor,
24-
},
26+
MaximumReconnectionRetries = this.MaximumReconnectionRetries,
27+
ReconnectionPolicy = this.ReconnectionPolicy
28+
},
2529

2630
Events.ReconnectEvent reconnect => new ReceivingState()
2731
{
2832
Channels = reconnect.Channels,
2933
ChannelGroups = reconnect.ChannelGroups,
30-
Cursor = reconnect.Cursor
31-
},
34+
Cursor = reconnect.Cursor,
35+
MaximumReconnectionRetries = this.MaximumReconnectionRetries,
36+
ReconnectionPolicy = this.ReconnectionPolicy
37+
},
3238

3339
Events.SubscriptionRestoredEvent subscriptionRestored => new ReceivingState()
3440
{
3541
Channels = subscriptionRestored.Channels,
3642
ChannelGroups = subscriptionRestored.ChannelGroups,
37-
Cursor = subscriptionRestored.Cursor
38-
},
43+
Cursor = subscriptionRestored.Cursor,
44+
MaximumReconnectionRetries = this.MaximumReconnectionRetries,
45+
ReconnectionPolicy = this.ReconnectionPolicy
46+
},
3947

4048
_ => null
4149
};

src/Api/PubnubApi/EventEngine/Subscribe/States/ReceivingState.cs

+7-2
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,9 @@ internal class ReceivingState : Core.State
1010
{
1111
public IEnumerable<string> Channels;
1212
public IEnumerable<string> ChannelGroups;
13-
public SubscriptionCursor Cursor;
13+
public PNReconnectionPolicy ReconnectionPolicy;
14+
public int MaximumReconnectionRetries;
15+
public SubscriptionCursor Cursor;
1416

1517
public override IEnumerable<IEffectInvocation> OnEntry => new ReceiveMessagesInvocation()
1618
{ Channels = this.Channels, ChannelGroups = this.ChannelGroups, Cursor = this.Cursor }.AsArray();
@@ -56,7 +58,10 @@ public override TransitionResult Transition(IEvent e)
5658
{
5759
Channels = this.Channels,
5860
ChannelGroups = this.ChannelGroups,
59-
Cursor = this.Cursor
61+
AttemptedRetries = 0,
62+
MaximumReconnectionRetries = this.MaximumReconnectionRetries,
63+
ReconnectionPolicy = this.ReconnectionPolicy,
64+
Cursor = this.Cursor
6065
},
6166

6267
_ => null

src/Api/PubnubApi/EventEngine/Subscribe/States/UnsubscribedState.cs

+13-5
Original file line numberDiff line numberDiff line change
@@ -7,21 +7,29 @@ namespace PubnubApi.EventEngine.Subscribe.States
77
{
88
internal class UnsubscribedState : Core.State
99
{
10-
public override TransitionResult Transition(Core.IEvent e)
10+
public PNReconnectionPolicy ReconnectionPolicy;
11+
public int MaximumReconnectionRetries;
12+
13+
public override TransitionResult Transition(Core.IEvent e)
1114
{
1215
return e switch
1316
{
1417
Events.SubscriptionChangedEvent subscriptionChanged => new HandshakingState()
1518
{
16-
Channels = subscriptionChanged.Channels, ChannelGroups = subscriptionChanged.ChannelGroups,
17-
},
19+
Channels = subscriptionChanged.Channels,
20+
ChannelGroups = subscriptionChanged.ChannelGroups,
21+
MaximumReconnectionRetries = this.MaximumReconnectionRetries,
22+
ReconnectionPolicy = this.ReconnectionPolicy
23+
},
1824

1925
Events.SubscriptionRestoredEvent subscriptionRestored => new States.ReceivingState()
2026
{
2127
Channels = subscriptionRestored.Channels,
2228
ChannelGroups = subscriptionRestored.ChannelGroups,
23-
Cursor = subscriptionRestored.Cursor
24-
},
29+
Cursor = subscriptionRestored.Cursor,
30+
MaximumReconnectionRetries = this.MaximumReconnectionRetries,
31+
ReconnectionPolicy = this.ReconnectionPolicy
32+
},
2533

2634
_ => null
2735
};

0 commit comments

Comments
 (0)