Skip to content

Commit 84af23d

Browse files
authored
Eventengine poc refactor (#170)
* WIP EE refactor - core classes * WIP - reorganization * WIP - eventQueue * WIP state abstract class to interface
1 parent e1d65ca commit 84af23d

File tree

12 files changed

+377
-0
lines changed

12 files changed

+377
-0
lines changed
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Threading.Tasks;
4+
5+
namespace PubnubApi.PubnubEventEngine.Core {
6+
internal class EffectDispatcher {
7+
// assumes 1 instance of handler - capable of managing itself
8+
private readonly Dictionary<System.Type, IEffectHandler> effectInvocationHandlerMap =
9+
new Dictionary<System.Type, IEffectHandler>();
10+
11+
/// <summary>
12+
/// Dispatch an invocation i.e. call a registered effect handler.
13+
/// </summary>
14+
public async Task Dispatch<T>(T invocation) where T : IEffectInvocation {
15+
if (!effectInvocationHandlerMap.ContainsKey(invocation.GetType())) {
16+
throw new ArgumentException($"No handler for {invocation.GetType().Name} found.");
17+
}
18+
19+
if (invocation is IEffectCancelInvocation) {
20+
await effectInvocationHandlerMap[invocation.GetType()].Cancel();
21+
} else {
22+
await ((IEffectHandler<T>)effectInvocationHandlerMap[invocation.GetType()]).Run(invocation);
23+
}
24+
}
25+
26+
/// <summary>
27+
/// Assign a handler implementation to an invocation.
28+
/// </summary>
29+
public EffectDispatcher Register<TEffectInvocation, TEffectHandler>(TEffectHandler handler)
30+
where TEffectInvocation : IEffectInvocation
31+
where TEffectHandler : IEffectHandler<TEffectInvocation> {
32+
// TODO log
33+
// if (effectInvocationHandlerMap.ContainsKey(typeof(TEffectInvocation)))
34+
35+
effectInvocationHandlerMap[typeof(TEffectInvocation)] = handler;
36+
return this;
37+
}
38+
}
39+
}
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
using System.Threading.Tasks;
2+
using System.Collections.Generic;
3+
4+
namespace PubnubApi.PubnubEventEngine.Core {
5+
internal abstract class Engine {
6+
public EventQueue eventQueue = new EventQueue();
7+
8+
protected EffectDispatcher dispatcher = new EffectDispatcher();
9+
protected IState currentState;
10+
11+
private Task<IState> currentTransition;
12+
13+
public Engine() {
14+
eventQueue.onEventQueued += OnEvent;
15+
}
16+
17+
~Engine() {
18+
eventQueue.onEventQueued -= OnEvent;
19+
}
20+
21+
private async void OnEvent(EventQueue q) {
22+
if (!(currentTransition is null)) {
23+
await currentTransition;
24+
}
25+
currentTransition = Transition(q.Dequeue()).ContinueWith((res) => currentState = res.Result);
26+
}
27+
28+
private async Task<IState> Transition(IEvent e) {
29+
var stateInvocationPair = currentState.Transition(e);
30+
31+
if (stateInvocationPair is null) {
32+
return currentState;
33+
}
34+
35+
await ExecuteStateChange(currentState, stateInvocationPair.Item1, stateInvocationPair.Item2);
36+
37+
return stateInvocationPair.Item1;
38+
}
39+
40+
/// <summary>
41+
/// Launch the invocations associated with transitioning between states
42+
/// </summary>
43+
private async Task ExecuteStateChange(IState sourceState, IState targetState, IEnumerable<IEffectInvocation> invocations) {
44+
foreach (var effectInvocation in sourceState.onExit) {
45+
await dispatcher.Dispatch(effectInvocation);
46+
}
47+
foreach (var effectInvocation in invocations) {
48+
await dispatcher.Dispatch(effectInvocation);
49+
}
50+
foreach (var effectInvocation in targetState.onEntry) {
51+
await dispatcher.Dispatch(effectInvocation);
52+
}
53+
}
54+
}
55+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
using System.Threading.Tasks;
2+
using System.Collections.Generic;
3+
4+
namespace PubnubApi.PubnubEventEngine.Core {
5+
internal interface IEffectHandler {
6+
Task Cancel();
7+
}
8+
9+
internal interface IEffectHandler<in T> : IEffectHandler where T : IEffectInvocation {
10+
Task Run(T invocation);
11+
}
12+
13+
internal interface IEffectInvocation { }
14+
15+
internal interface IEffectCancelInvocation : IEffectInvocation { }
16+
17+
internal interface IEvent { };
18+
19+
internal interface IState {
20+
public abstract IEnumerable<IEffectInvocation> onEntry { get; }
21+
public abstract IEnumerable<IEffectInvocation> onExit { get; }
22+
23+
/// <summary>
24+
/// The EE transition pure function.
25+
/// </summary>
26+
/// <param name="e">Input event</param>
27+
/// <returns>Target state and invocation list, or null for no-transition</returns>
28+
public abstract System.Tuple<IState, IEnumerable<IEffectInvocation>> Transition(IEvent e);
29+
}
30+
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
using System.Collections.Generic;
2+
using System.Linq;
3+
4+
namespace PubnubApi.PubnubEventEngine.Core {
5+
internal class EventQueue {
6+
private volatile Queue<IEvent> eventQueue = new Queue<IEvent>();
7+
private object lockObj = new object();
8+
9+
public event System.Action<EventQueue> onEventQueued;
10+
11+
public void Enqueue(IEvent e) {
12+
lock (lockObj) {
13+
// TODO de-dupe? Throttle?
14+
eventQueue.Enqueue(e);
15+
onEventQueued?.Invoke(this);
16+
}
17+
}
18+
19+
public IEvent Dequeue() {
20+
lock (lockObj) {
21+
return eventQueue.Any() ? eventQueue.Dequeue() : null;
22+
}
23+
}
24+
}
25+
}

src/Api/PubnubApi/EventEngine/EventEngine.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -512,6 +512,8 @@ public State NextState()
512512

513513
public void Setup<T>(PNConfiguration config)
514514
{
515+
516+
515517
CreateState(StateType.Unsubscribed)
516518
.On(EventType.SubscriptionChanged, StateType.Handshaking)
517519
.On(EventType.SubscriptionRestored, StateType.Receiving);
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
using System.Collections.Generic;
2+
using System.Linq;
3+
using System.Threading.Tasks;
4+
using Newtonsoft.Json;
5+
using PubnubApi.EndPoint;
6+
using PubnubApi.PubnubEventEngine.Core;
7+
using PubnubApi.PubnubEventEngine.Subscribe.Invocations;
8+
9+
namespace PubnubApi.PubnubEventEngine.Subscribe.Effects {
10+
internal class HandshakeEffectHandler : Core.IEffectHandler<HandshakeInvocation> {
11+
private SubscribeManager2 manager;
12+
private EventQueue eventQueue;
13+
14+
public HandshakeEffectHandler(SubscribeManager2 manager, EventQueue eventQueue) {
15+
this.manager = manager;
16+
this.eventQueue = eventQueue;
17+
}
18+
19+
public async Task Run(HandshakeInvocation invocation) {
20+
// TODO fix this, probably wrong :)
21+
var resp = await manager.HandshakeRequest<string>(
22+
PNOperationType.PNSubscribeOperation,
23+
invocation.channels.ToArray(),
24+
invocation.channelGroups.ToArray(),
25+
null,
26+
null,
27+
invocation.initialSubscribeQueryParams,
28+
invocation.externalQueryParams
29+
);
30+
31+
if (!resp.Item2.Error) {
32+
// TODO move deserialization outside
33+
// TODO does this need more error checking?
34+
var handshakeResponse = JsonConvert.DeserializeObject<HandshakeResponse>(resp.Item1);
35+
var c = new SubscriptionCursor() {
36+
Region = handshakeResponse.Timetoken.Region,
37+
Timetoken = handshakeResponse.Timetoken.Timestamp
38+
};
39+
40+
eventQueue.Enqueue(new Events.HandshakeSuccessEvent() {cursor = c});
41+
}
42+
}
43+
44+
public async Task Cancel() {
45+
manager.HandshakeRequestCancellation();
46+
}
47+
}
48+
}
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
using System.Collections.Generic;
2+
3+
namespace PubnubApi.PubnubEventEngine.Subscribe.Events {
4+
public class SubscriptionChangedEvent : Core.IEvent {
5+
public IEnumerable<string> channels;
6+
public IEnumerable<string> channelGroups;
7+
}
8+
9+
public class SubscriptionRestoredEvent : Core.IEvent {
10+
public IEnumerable<string> channels;
11+
public IEnumerable<string> channelGroups;
12+
public SubscriptionCursor cursor;
13+
}
14+
15+
public class HandshakeSuccessEvent : Core.IEvent {
16+
public SubscriptionCursor cursor;
17+
}
18+
19+
public class HandshakeFailureEvent : Core.IEvent {
20+
// TODO status or reason?
21+
public PNStatus status;
22+
}
23+
24+
public class HandshakeReconnectSuccessEvent : HandshakeSuccessEvent {
25+
}
26+
27+
public class HandshakeReconnectFailureEvent : HandshakeFailureEvent {
28+
}
29+
30+
public class HandshakeReconnectRetryEvent : Core.IEvent {
31+
}
32+
33+
public class HandshakeReconnectGiveUpEvent : Core.IEvent {
34+
// TODO status or reason?
35+
public PNStatus status;
36+
}
37+
38+
public class ReceiveSuccessEvent : Core.IEvent {
39+
public List<PNMessageResult<object>> messages;
40+
public SubscriptionCursor cursor;
41+
}
42+
43+
public class ReceiveFailureEvent : Core.IEvent {
44+
// TODO status or reason?
45+
public PNStatus status;
46+
}
47+
48+
public class ReceiveReconnectRetry : Core.IEvent {
49+
}
50+
51+
public class ReceiveReconnectSuccessEvent : ReceiveSuccessEvent {
52+
}
53+
54+
public class ReceiveReconnectFailureEvent : ReceiveFailureEvent {
55+
}
56+
57+
public class ReceiveReconnectGiveUpEvent : Core.IEvent {
58+
// TODO status or reason?
59+
public PNStatus status;
60+
}
61+
62+
public class DisconnectEvent : Core.IEvent {
63+
}
64+
65+
public class ReconnectEvent : Core.IEvent {
66+
}
67+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
using System.Collections.Generic;
2+
using PubnubApi.PubnubEventEngine.Core;
3+
4+
namespace PubnubApi.PubnubEventEngine.Subscribe.Invocations {
5+
internal class EmitMessagesInvocation : Core.IEffectInvocation {
6+
public List<object> messages;
7+
}
8+
9+
internal class EmitStatusInvocation : Core.IEffectInvocation {
10+
11+
}
12+
13+
internal class HandshakeInvocation : Core.IEffectInvocation {
14+
public IEnumerable<string> channels;
15+
public IEnumerable<string> channelGroups;
16+
public Dictionary<string, string> initialSubscribeQueryParams = new Dictionary<string, string>();
17+
public Dictionary<string, object> externalQueryParams = new Dictionary<string, object>();
18+
}
19+
20+
internal class ReceiveMessagesInvocation : Core.IEffectInvocation { }
21+
22+
internal class CancelReceiveMessagesInvocation : ReceiveMessagesInvocation, Core.IEffectCancelInvocation { }
23+
24+
internal class HandshakeCancelInvocation : HandshakeInvocation, Core.IEffectCancelInvocation { }
25+
26+
internal class ReconnectInvocation : Core.IEffectInvocation { }
27+
28+
internal class CancelReconnectInvocation : ReconnectInvocation, Core.IEffectCancelInvocation { }
29+
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using PubnubApi.PubnubEventEngine.Core;
4+
5+
namespace PubnubApi.PubnubEventEngine.Subscribe.States {
6+
internal class HandshakingState : Core.IState {
7+
8+
public IEnumerable<string> channels;
9+
public IEnumerable<string> channelGroups;
10+
11+
public IEnumerable<IEffectInvocation> onEntry { get; }
12+
public IEnumerable<IEffectInvocation> onExit { get; }
13+
public Tuple<Core.IState, IEnumerable<IEffectInvocation>> Transition(IEvent e) {
14+
throw new NotImplementedException();
15+
}
16+
}
17+
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using PubnubApi.PubnubEventEngine.Core;
4+
5+
namespace PubnubApi.PubnubEventEngine.Subscribe.States {
6+
internal class SubscribedState : Core.IState {
7+
public IEnumerable<IEffectInvocation> onEntry { get; }
8+
public IEnumerable<IEffectInvocation> onExit { get; }
9+
public Tuple<Core.IState, IEnumerable<IEffectInvocation>> Transition(IEvent e) {
10+
throw new NotImplementedException();
11+
}
12+
}
13+
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using PubnubApi.PubnubEventEngine.Core;
4+
using PubnubApi.PubnubEventEngine.Subscribe.Invocations;
5+
6+
namespace PubnubApi.PubnubEventEngine.Subscribe.States {
7+
internal class UnsubscribedState : Core.IState {
8+
public IEnumerable<IEffectInvocation> onEntry { get; }
9+
public IEnumerable<IEffectInvocation> onExit { get; }
10+
11+
public Tuple<Core.IState, IEnumerable<IEffectInvocation>> Transition(Core.IEvent e) {
12+
switch (e) {
13+
case Events.SubscriptionChangedEvent subscriptionChanged:
14+
return new Tuple<Core.IState, IEnumerable<IEffectInvocation>>(
15+
new HandshakingState() {
16+
channels = subscriptionChanged.channels,
17+
channelGroups = subscriptionChanged.channelGroups,
18+
},
19+
new[] {
20+
new HandshakeInvocation() {
21+
channels = subscriptionChanged.channels,
22+
channelGroups = subscriptionChanged.channelGroups,
23+
},
24+
}
25+
);
26+
27+
default: return null;
28+
}
29+
}
30+
}
31+
}

0 commit comments

Comments
 (0)