Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Eventengine/handshake handler #181

Merged
merged 31 commits into from
Jul 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
6051d31
feat: simplified abstraction
MikeDobrzan Jul 6, 2023
44674ce
feat: handshake effect handler
MikeDobrzan Jul 10, 2023
cc4b87c
fix: initial retry implementation
MikeDobrzan Jul 18, 2023
7f41ebd
handshake reconnect
mohitpubnub Jul 19, 2023
4a08ee4
Merge remote-tracking branch 'origin/reconnect' into eventengine/hand…
MikeDobrzan Jul 19, 2023
6d2001d
fix: handshake handler
MikeDobrzan Jul 19, 2023
2895201
revert *.csproj changes
mohitpubnub Jul 19, 2023
7748a47
added required info in handshake reconnect invocation.
mohitpubnub Jul 20, 2023
4bff839
receive handler (#180)
budgetpreneur Jul 20, 2023
b2fdbe3
fix: background handlers
MikeDobrzan Jul 20, 2023
3cebc79
*reconnection configurations *refactor naming
mohitpubnub Jul 20, 2023
c1a2629
passing configuration for reconnection
mohitpubnub Jul 20, 2023
f8d19ee
fix: transition values from receive reconnect
mohitpubnub Jul 24, 2023
0da6c9a
wip: receive messages
MikeDobrzan Jul 24, 2023
be568f7
Merge remote-tracking branch 'origin/eventengine/handshake-handler' i…
MikeDobrzan Jul 24, 2023
535c093
Revert "Merge remote-tracking branch 'origin/eventengine/handshake-ha…
MikeDobrzan Jul 24, 2023
5aae315
fix
MikeDobrzan Jul 24, 2023
83d3929
Merge branch 'eventengine-poc' into eventengine/handshake-handler
MikeDobrzan Jul 24, 2023
fca3791
ReceivingEffectHandler - ReceivingResponse
budgetpreneur Jul 24, 2023
c36afc1
null check
budgetpreneur Jul 24, 2023
74fea93
SubscribeEventEngine - receiveHandler
budgetpreneur Jul 24, 2023
ff2b976
*csproj file for RReceivingEffectHandler
budgetpreneur Jul 24, 2023
e5367b6
EmitStatus effect handler - take1
mohitpubnub Jul 25, 2023
aa873c4
empty task
mohitpubnub Jul 25, 2023
73abf60
wip emit messages
MikeDobrzan Jul 25, 2023
7ae18a7
Merge remote-tracking branch 'origin/eventengine/handshake-handler' i…
MikeDobrzan Jul 25, 2023
ac49b56
wip emit messages
MikeDobrzan Jul 25, 2023
b7475ac
cleanup and unify convention for *emitters
mohitpubnub Jul 25, 2023
724ef74
missing changes emitmessage
mohitpubnub Jul 25, 2023
63982f3
Added publisher
budgetpreneur Jul 25, 2023
3454008
emitmessages
MikeDobrzan Jul 25, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 13 additions & 13 deletions src/Api/PubnubApi/EndPoint/PubSub/SubscribeOperation2.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public class SubscribeOperation2<T>: ISubscribeOperation<T>
private bool presenceSubscribeEnabled;
private SubscribeManager2 manager;
private Dictionary<string, object> queryParam;
private EventEngine pnEventEngine;
private PubnubEventEngine.EventEngine pnEventEngine;
private Pubnub PubnubInstance;
public List<SubscribeCallback> SubscribeListenerList
{
Expand Down Expand Up @@ -104,18 +104,18 @@ public SubscribeOperation2(PNConfiguration pubnubConfig, IJsonPluggableLibrary j
effectDispatcher.Register(EventType.ReceiveReconnectSuccess, receiveReconnectEffectHandler);
effectDispatcher.Register(EventType.ReceiveReconnectGiveUp, receiveReconnectEffectHandler);

pnEventEngine = new EventEngine(effectDispatcher, eventEmitter);
pnEventEngine.PubnubUnitTest = unit;
pnEventEngine.Setup<T>(config);

if (pnEventEngine.PubnubUnitTest != null)
{
pnEventEngine.PubnubUnitTest.EventTypeList = new List<KeyValuePair<string, string>>();
}
else
{
pnEventEngine.InitialState(new State(StateType.Unsubscribed) { EventType = EventType.SubscriptionChanged });
}
// pnEventEngine = new EventEngine(effectDispatcher, eventEmitter);
// pnEventEngine.PubnubUnitTest = unit;
// pnEventEngine.Setup<T>(config);

// if (pnEventEngine.PubnubUnitTest != null)
// {
// pnEventEngine.PubnubUnitTest.EventTypeList = new List<KeyValuePair<string, string>>();
// }
// else
// {
//pnEventEngine.InitialState(new State(StateType.Unsubscribed) { EventType = EventType.SubscriptionChanged });
// }
}

private void ReceivingEffect_ReceiveRequested(object sender, ReceiveRequestEventArgs e)
Expand Down
60 changes: 60 additions & 0 deletions src/Api/PubnubApi/EventEngine/Common/Delay.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
using System.Threading;
using System.Threading.Tasks;

namespace PubnubApi.EventEngine.Common
{
public class Delay
{
public bool Cancelled { get; private set; } = false;
private readonly TaskCompletionSource<object> taskCompletionSource = new TaskCompletionSource<object>();
private readonly object monitor = new object();
private readonly int milliseconds;

public Delay(int milliseconds)
{
this.milliseconds = milliseconds;
}

public Task Start()
{
#if NETFX_CORE || WINDOWS_UWP || UAP || NETSTANDARD10 || NETSTANDARD11 || NETSTANDARD12
Task taskAwaiter = Task.Factory.StartNew(AwaiterLoop);
taskAwaiter.Wait();
#else
Thread awaiterThread = new Thread(AwaiterLoop);
awaiterThread.Start();
#endif
return taskCompletionSource.Task; }

public void Cancel()
{
lock (monitor)
{
Cancelled = true;
Monitor.Pulse(monitor);
}
}

private void AwaiterLoop()
{
while(true)
{
lock (monitor)
{
if (Cancelled)
{
taskCompletionSource.SetCanceled();
break;
}
Monitor.Wait(monitor, milliseconds);
if (Cancelled)
{
taskCompletionSource.SetCanceled();
break;
}
taskCompletionSource.SetResult(null);
}
}
}
}
}
11 changes: 8 additions & 3 deletions src/Api/PubnubApi/EventEngine/Core/EffectDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
using System.Collections.Generic;
using System.Threading.Tasks;

namespace PubnubApi.PubnubEventEngine.Core {
namespace PubnubApi.EventEngine.Core {
internal class EffectDispatcher {
// assumes 1 instance of handler - capable of managing itself
private readonly Dictionary<System.Type, IEffectHandler> effectInvocationHandlerMap =
Expand All @@ -18,8 +18,13 @@ public async Task Dispatch<T>(T invocation) where T : IEffectInvocation {

if (invocation is IEffectCancelInvocation) {
await effectInvocationHandlerMap[invocation.GetType()].Cancel();
} else {
await ((IEffectHandler<T>)effectInvocationHandlerMap[invocation.GetType()]).Run(invocation);
} else
{
var handler = ((IEffectHandler<T>)effectInvocationHandlerMap[invocation.GetType()]);
if (handler.IsBackground(invocation))
handler.Run(invocation).Start();
else
await handler.Run(invocation);
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/Api/PubnubApi/EventEngine/Core/Engine.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
using System.Threading.Tasks;
using System.Collections.Generic;

namespace PubnubApi.PubnubEventEngine.Core {
namespace PubnubApi.EventEngine.Core {
internal abstract class Engine {
public EventQueue eventQueue = new EventQueue();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
using System.Threading.Tasks;
using System.Collections.Generic;

namespace PubnubApi.PubnubEventEngine.Core {
namespace PubnubApi.EventEngine.Core {

/// <summary>
/// Generic effect handler.
Expand All @@ -17,6 +17,7 @@ internal interface IEffectHandler {
/// <typeparam name="T">Associated invocation</typeparam>
internal interface IEffectHandler<in T> : IEffectHandler where T : IEffectInvocation {
Task Run(T invocation);
bool IsBackground(T invocation);
}

/// <summary>
Expand Down
2 changes: 1 addition & 1 deletion src/Api/PubnubApi/EventEngine/Core/EventQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
using System.Linq;
using System.Threading.Tasks;

namespace PubnubApi.PubnubEventEngine.Core
namespace PubnubApi.EventEngine.Core
{
internal class EventQueue
{
Expand Down
2 changes: 1 addition & 1 deletion src/Api/PubnubApi/EventEngine/Core/Utils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
using System.Threading.Tasks;
using System.Collections.Generic;

namespace PubnubApi.PubnubEventEngine.Core
namespace PubnubApi.EventEngine.Core
{
internal static class Utils
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
using Newtonsoft.Json;

namespace PubnubApi.EventEngine.Subscribe.Common
{
public class SubscriptionCursor
{
public long? Timetoken { get; set; }
public int? Region { get; set; }
}

public class HandshakeResponse
{
[JsonProperty("t")]
public Timetoken Timetoken { get; set; }

[JsonProperty("m")]
public object[] Messages { get; set; }
}
public class HandshakeError
{
[JsonProperty("status")]
public int Status { get; set; }

[JsonProperty("error")]
public string ErrorMessage { get; set; }
}

public class Timetoken
{
[JsonProperty("t")]
public long Timestamp { get; set; }

[JsonProperty("r")]
public int Region { get; set; }

}

public class ReceivingResponse<T>
{
[JsonProperty("t")]
public Timetoken Timetoken { get; set; }

[JsonProperty("m")]
public Message<T>[] Messages { get; set; }
}

public class Message<T>
{
[JsonProperty ("a")]
public string Shard { get; set;}

[JsonProperty ("b")]
public string SubscriptionMatch { get; set;}

[JsonProperty("c")]
public string Channel { get; set; }

[JsonProperty("d")]
public T Payload { get; set; }

[JsonProperty("e")]
public int MessageType { get; set; }

[JsonProperty("f")]
public string Flags { get; set; }

[JsonProperty("i")]
public string IssuingClientId { get; set; }

[JsonProperty("k")]
public string SubscribeKey { get; set; }

[JsonProperty("o")]
public object OriginatingTimetoken { get; set; }

[JsonProperty("p")]
public object PublishMetadata { get; set; }

[JsonProperty("s")]
public long SequenceNumber { get; set; }

[JsonProperty("p")]
public Timetoken Timetoken { get; set; }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
using System;
namespace PubnubApi.EventEngine.Subscribe.Context
{
public static class ReconnectionDelayUtil
{
public static int CalculateDelay(PNReconnectionPolicy policy, int attempts)
{
Random numGenerator = new Random();
int delayValue = 0;
int backoff = 5;
switch (policy) {
case PNReconnectionPolicy.LINEAR:
delayValue = attempts * backoff + numGenerator.Next(1000);
break;
case PNReconnectionPolicy.EXPONENTIAL:
delayValue = (int)(Math.Pow(2, attempts - 1) * 1000 + numGenerator.Next(1000));
break;
}
return delayValue;

}

public static bool shouldRetry(PNReconnectionPolicy policy, int attempts, int maxAttempts)
{
if (policy == PNReconnectionPolicy.NONE) return false;
return maxAttempts < attempts;
}
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
using System;
using System.Linq;
using System.Threading.Tasks;
using Newtonsoft.Json;
using PubnubApi.EventEngine.Core;
using PubnubApi.EventEngine.Subscribe.Invocations;

namespace PubnubApi.EventEngine.Subscribe.Effects
{
internal class EmitMessagesHandler : IEffectHandler<Invocations.EmitMessagesInvocation>
{
private readonly System.Action<Pubnub, PNMessageResult<object>> messageEmitterFunction;
private readonly Pubnub pubnubInstance;

public EmitMessagesHandler(Pubnub pubnubInstance,
System.Action<Pubnub, PNMessageResult<object>> messageEmitterFunction)
{
this.messageEmitterFunction = messageEmitterFunction;
this.pubnubInstance = pubnubInstance;
}

public async Task Run(EmitMessagesInvocation invocation)
{
var processedMessages = invocation.Messages.Messages.Select(m => new PNMessageResult<object>()
{
Channel = m.Channel,
Message = JsonConvert.DeserializeObject(m.Payload),
Subscription = m.SubscriptionMatch,
Timetoken = m.Timetoken.Timestamp,
UserMetadata = m.PublishMetadata,
Publisher = m.IssuingClientId
});

foreach (var message in processedMessages)
{
messageEmitterFunction(pubnubInstance, message);
}
}

public bool IsBackground(EmitMessagesInvocation invocation) => false;

public Task Cancel()
{
throw new NotImplementedException();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
using System;
using System.Threading.Tasks;
using PubnubApi.EventEngine.Core;
using PubnubApi.EventEngine.Subscribe.Invocations;

namespace PubnubApi.EventEngine.Subscribe.Effects
{
public class EmitStatusEffectHandler: Core.IEffectHandler<EmitStatusInvocation>
{
private readonly Action<Pubnub, PNStatus> statusEmitterFunction;
private readonly Pubnub pubnubInstance;

public EmitStatusEffectHandler(Pubnub pn, Action<Pubnub, PNStatus> statusEmitter)
{
this.statusEmitterFunction = statusEmitter;
this.pubnubInstance = pn;
}

public Task Cancel() => Utils.EmptyTask;

bool IEffectHandler<EmitStatusInvocation>.IsBackground(EmitStatusInvocation invocation) => false;

Task IEffectHandler<EmitStatusInvocation>.Run(EmitStatusInvocation invocation)
{
this.statusEmitterFunction(this.pubnubInstance, invocation.Status);
return Utils.EmptyTask;
}
}
}

Loading