Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HandshakeStoppedState, ReceiveStoppedState, ReceiveFailedState, ReceivingState and ReceiveReconnectState transitions #173

Merged
merged 6 commits into from
Jun 26, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ namespace PubnubApi.PubnubEventEngine.Subscribe.Events {
public class SubscriptionChangedEvent : Core.IEvent {
public IEnumerable<string> Channels;
public IEnumerable<string> ChannelGroups;
public SubscriptionCursor Cursor;
}

public class SubscriptionRestoredEvent : Core.IEvent {
Expand All @@ -24,6 +25,7 @@ public class HandshakeFailureEvent : Core.IEvent {
public class HandshakeReconnectSuccessEvent : HandshakeSuccessEvent {
public IEnumerable<string> Channels;
public IEnumerable<string> ChannelGroups;
public SubscriptionCursor Cursor;
}

public class HandshakeReconnectFailureEvent : HandshakeFailureEvent {
Expand All @@ -40,11 +42,16 @@ public class HandshakeReconnectGiveUpEvent : Core.IEvent {
}

public class ReceiveSuccessEvent : Core.IEvent {
public List<PNMessageResult<object>> messages;
public SubscriptionCursor cursor;
public IEnumerable<string> Channels;
public IEnumerable<string> ChannelGroups;
public List<PNMessageResult<object>> Messages;
public SubscriptionCursor Cursor;
}

public class ReceiveFailureEvent : Core.IEvent {
public IEnumerable<string> Channels;
public IEnumerable<string> ChannelGroups;
public SubscriptionCursor Cursor;
// TODO status or reason?
public PNStatus status;
}
Expand All @@ -59,17 +66,22 @@ public class ReceiveReconnectFailureEvent : ReceiveFailureEvent {
}

public class ReceiveReconnectGiveUpEvent : Core.IEvent {
public IEnumerable<string> Channels;
public IEnumerable<string> ChannelGroups;
public SubscriptionCursor Cursor;
// TODO status or reason?
public PNStatus status;
}

public class DisconnectEvent : Core.IEvent {
public IEnumerable<string> Channels;
public IEnumerable<string> ChannelGroups;
public SubscriptionCursor Cursor;
}

public class ReconnectEvent : Core.IEvent {
public IEnumerable<string> Channels;
public IEnumerable<string> ChannelGroups;
public SubscriptionCursor Cursor;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ internal class ReceiveMessagesInvocation : Core.IEffectInvocation
{
public IEnumerable<string> Channels;
public IEnumerable<string> ChannelGroups;
public SubscriptionCursor Cursor;
}

internal class CancelReceiveMessagesInvocation : ReceiveMessagesInvocation, Core.IEffectCancelInvocation { }
Expand All @@ -37,7 +38,13 @@ internal class HandshakeReconnectInvocation: Core.IEffectInvocation

internal class CancelHandshakeReconnectInvocation: HandshakeReconnectInvocation, Core.IEffectCancelInvocation { }

internal class ReceiveReconnectInvocation: Core.IEffectInvocation { }
internal class ReceiveReconnectInvocation: Core.IEffectInvocation
{
public IEnumerable<string> Channels;
public IEnumerable<string> ChannelGroups;
public SubscriptionCursor Cursor;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it needs to pass the Cursor

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added cursor


internal class CancelReceiveReconnectInvocation: ReceiveReconnectInvocation, Core.IEffectCancelInvocation { }
//internal class CancelReconnectInvocation : ReconnectInvocation, Core.IEffectCancelInvocation { }
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ internal class HandshakeFailedState : Core.IState {

public IEnumerable<string> Channels;
public IEnumerable<string> ChannelGroups;
public SubscriptionCursor Cursor;

public IEnumerable<IEffectInvocation> OnEntry { get; }
public IEnumerable<IEffectInvocation> OnExit { get; }
Expand Down Expand Up @@ -43,12 +44,14 @@ internal class HandshakeFailedState : Core.IState {
return new Tuple<IState, IEnumerable<IEffectInvocation>>(
new ReceivingState() {
Channels = subscriptionRestored.Channels,
ChannelGroups = subscriptionRestored.ChannelGroups
ChannelGroups = subscriptionRestored.ChannelGroups,
Cursor = subscriptionRestored.Cursor
},
new[] {
new ReceiveMessagesInvocation() {
Channels = subscriptionRestored.Channels,
ChannelGroups = subscriptionRestored.ChannelGroups,
Cursor = subscriptionRestored.Cursor
},
}
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ internal class HandshakeReconnectingState : Core.IState {
return new Tuple<IState, IEnumerable<IEffectInvocation>>(
new ReceivingState() {
Channels = handshakeReconnectSuccess.Channels,
ChannelGroups = handshakeReconnectSuccess.ChannelGroups
ChannelGroups = handshakeReconnectSuccess.ChannelGroups,
Cursor = handshakeReconnectSuccess.Cursor
},
new[] {
new EmitStatusInvocation() {
Expand All @@ -59,7 +60,8 @@ internal class HandshakeReconnectingState : Core.IState {
return new Tuple<IState, IEnumerable<IEffectInvocation>>(
new HandshakeFailedState() {
Channels = subscriptionRestored.Channels,
ChannelGroups = subscriptionRestored.ChannelGroups
ChannelGroups = subscriptionRestored.ChannelGroups,
Cursor = subscriptionRestored.Cursor
},
null
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,50 @@ internal class HandshakeStoppedState : Core.IState {
public IEnumerable<IEffectInvocation> OnEntry { get; }
public IEnumerable<IEffectInvocation> OnExit { get; }
public Tuple<Core.IState, IEnumerable<IEffectInvocation>> Transition(IEvent e) {
throw new NotImplementedException();
switch (e) {
case Events.SubscriptionChangedEvent subscriptionChanged:
return new Tuple<Core.IState, IEnumerable<IEffectInvocation>>(
new HandshakingState() {
Channels = subscriptionChanged.Channels,
ChannelGroups = subscriptionChanged.ChannelGroups,
},
new[] {
new HandshakeInvocation() {
Channels = subscriptionChanged.Channels,
ChannelGroups = subscriptionChanged.ChannelGroups,
},
}
);
case Events.ReconnectEvent reconnect:
return new Tuple<Core.IState, IEnumerable<IEffectInvocation>>(
new HandshakingState() {
Channels = reconnect.Channels,
ChannelGroups = reconnect.ChannelGroups,
},
new[] {
new HandshakeInvocation() {
Channels = reconnect.Channels,
ChannelGroups = reconnect.ChannelGroups,
},
}
);
case Events.SubscriptionRestoredEvent subscriptionRestored:
return new Tuple<IState, IEnumerable<IEffectInvocation>>(
new ReceivingState() {
Channels = subscriptionRestored.Channels,
ChannelGroups = subscriptionRestored.ChannelGroups,
Cursor = subscriptionRestored.Cursor
},
new[] {
new ReceiveMessagesInvocation() {
Channels = subscriptionRestored.Channels,
ChannelGroups = subscriptionRestored.ChannelGroups,
},
}
);

default: return null;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,60 @@ internal class ReceiveFailedState : Core.IState {

public IEnumerable<string> Channels;
public IEnumerable<string> ChannelGroups;
public SubscriptionCursor Cursor;

public IEnumerable<IEffectInvocation> OnEntry { get; }
public IEnumerable<IEffectInvocation> OnExit { get; }
public Tuple<Core.IState, IEnumerable<IEffectInvocation>> Transition(IEvent e) {
throw new NotImplementedException();
switch (e) {
case Events.SubscriptionChangedEvent subscriptionChanged:
return new Tuple<Core.IState, IEnumerable<IEffectInvocation>>(
new ReceivingState() {
Channels = subscriptionChanged.Channels,
ChannelGroups = subscriptionChanged.ChannelGroups,
Cursor = subscriptionChanged.Cursor
},
new[] {
new ReceiveMessagesInvocation() {
Channels = subscriptionChanged.Channels,
ChannelGroups = subscriptionChanged.ChannelGroups,
Cursor = subscriptionChanged.Cursor
},
}
);
case Events.ReconnectEvent reconnect:
return new Tuple<Core.IState, IEnumerable<IEffectInvocation>>(
new ReceivingState() {
Channels = reconnect.Channels,
ChannelGroups = reconnect.ChannelGroups,
Cursor = reconnect.Cursor
},
new[] {
new ReceiveMessagesInvocation() {
Channels = reconnect.Channels,
ChannelGroups = reconnect.ChannelGroups,
Cursor = reconnect.Cursor
},
}
);
case Events.SubscriptionRestoredEvent subscriptionRestored:
return new Tuple<IState, IEnumerable<IEffectInvocation>>(
new ReceivingState() {
Channels = subscriptionRestored.Channels,
ChannelGroups = subscriptionRestored.ChannelGroups,
Cursor = subscriptionRestored.Cursor
},
new[] {
new ReceiveMessagesInvocation() {
Channels = subscriptionRestored.Channels,
ChannelGroups = subscriptionRestored.ChannelGroups,
Cursor = subscriptionRestored.Cursor
},
}
);

default: return null;
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,104 @@ internal class ReceiveReconnectingState : Core.IState {

public IEnumerable<string> Channels;
public IEnumerable<string> ChannelGroups;
public SubscriptionCursor Cursor;

public IEnumerable<IEffectInvocation> OnEntry { get; }
public IEnumerable<IEffectInvocation> OnExit { get; }
public Tuple<Core.IState, IEnumerable<IEffectInvocation>> Transition(IEvent e) {
throw new NotImplementedException();
switch (e) {
case Events.SubscriptionChangedEvent subscriptionChanged:
return new Tuple<Core.IState, IEnumerable<IEffectInvocation>>(
new ReceivingState() {
Channels = subscriptionChanged.Channels,
ChannelGroups = subscriptionChanged.ChannelGroups,
Cursor = subscriptionChanged.Cursor
},
new[] {
new ReceiveMessagesInvocation() {
Channels = subscriptionChanged.Channels,
ChannelGroups = subscriptionChanged.ChannelGroups,
Cursor = subscriptionChanged.Cursor
},
}
);
case Events.DisconnectEvent disconnect:
return new Tuple<Core.IState, IEnumerable<IEffectInvocation>>(
new ReceiveStoppedState() {
Channels = disconnect.Channels,
ChannelGroups = disconnect.ChannelGroups,
Cursor = disconnect.Cursor
},
new[] {
new EmitStatusInvocation() {
Channels = disconnect.Channels,
ChannelGroups = disconnect.ChannelGroups,
},
}
);
case Events.SubscriptionRestoredEvent subscriptionRestored:
return new Tuple<IState, IEnumerable<IEffectInvocation>>(
new ReceivingState() {
Channels = subscriptionRestored.Channels,
ChannelGroups = subscriptionRestored.ChannelGroups,
Cursor = subscriptionRestored.Cursor
},
new[] {
new ReceiveMessagesInvocation() {
Channels = subscriptionRestored.Channels,
ChannelGroups = subscriptionRestored.ChannelGroups,
Cursor = subscriptionRestored.Cursor
},
}
);
case Events.ReceiveReconnectSuccessEvent receiveReconnectSuccess:
return new Tuple<IState, IEnumerable<IEffectInvocation>>(
new ReceivingState() {
Channels = receiveReconnectSuccess.Channels,
ChannelGroups = receiveReconnectSuccess.ChannelGroups,
Cursor = receiveReconnectSuccess.Cursor
},
new IEffectInvocation[] {
new EmitStatusInvocation() {
Channels = receiveReconnectSuccess.Channels,
ChannelGroups = receiveReconnectSuccess.ChannelGroups,
},
new ReceiveMessagesInvocation() {
Channels = receiveReconnectSuccess.Channels,
ChannelGroups = receiveReconnectSuccess.ChannelGroups,
Cursor = receiveReconnectSuccess.Cursor
}
}
);
case Events.ReceiveReconnectFailureEvent receiveReconnectFailure:
return new Tuple<IState, IEnumerable<IEffectInvocation>>(
new ReceiveReconnectingState() {
Channels = receiveReconnectFailure.Channels,
ChannelGroups = receiveReconnectFailure.ChannelGroups,
Cursor = receiveReconnectFailure.Cursor
},
new[]
{
new ReceiveReconnectInvocation
{
Channels = receiveReconnectFailure.Channels,
ChannelGroups = receiveReconnectFailure.ChannelGroups,
Cursor = receiveReconnectFailure.Cursor
}
}
);
case Events.ReceiveReconnectGiveUpEvent receiveReconnectGiveUp:
return new Tuple<IState, IEnumerable<IEffectInvocation>>(
new ReceiveFailedState() {
Channels = receiveReconnectGiveUp.Channels,
ChannelGroups = receiveReconnectGiveUp.ChannelGroups,
Cursor = receiveReconnectGiveUp.Cursor
},
null
);

default: return null;
}
}
}
}
Expand Down
Loading