Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SubscribeManager Deserialize and Presence States #188

Open
wants to merge 59 commits into
base: eventengine-poc
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
59 commits
Select commit Hold shift + click to select a range
c84ab1f
wip: deserialization
MikeDobrzan Aug 7, 2023
54f735c
wip: deserialization
MikeDobrzan Aug 7, 2023
4e38861
fix: passed dependencies to emit messages handler
MikeDobrzan Aug 7, 2023
8f913a3
fix duplicate p jsonproperty
budgetpreneur Aug 7, 2023
4fc700a
Merge remote-tracking branch 'origin/eventengine/deserialization-gene…
budgetpreneur Aug 7, 2023
4a5b848
fix for PNMessageResult
budgetpreneur Aug 7, 2023
0acc10e
fix: emitter references
MikeDobrzan Aug 7, 2023
f74b743
wip: error handling
MikeDobrzan Aug 7, 2023
c05b00d
fix for ReceiveSuccessEvent
budgetpreneur Aug 7, 2023
4f94c14
fix: deserialization of messages
MikeDobrzan Aug 7, 2023
045a5f0
Merge branch 'mgr-deserialize' of https://github.com/pubnub/c-sharp i…
MikeDobrzan Aug 7, 2023
1182abb
fix: deserialization
MikeDobrzan Aug 7, 2023
feacb98
fix: usermetadata aka timetoken
MikeDobrzan Aug 7, 2023
b90cb9a
fix: added serializer settings
MikeDobrzan Aug 7, 2023
421224e
unit test fix
budgetpreneur Aug 7, 2023
9b66193
fix: fixed string edge-case
MikeDobrzan Aug 7, 2023
e829c3b
wip: switched user metadata source
MikeDobrzan Aug 8, 2023
cdddbd5
wip: added null check in ReceivingEffectHandler.cs
MikeDobrzan Aug 8, 2023
98e8165
wip: null check.
MikeDobrzan Aug 8, 2023
0ea9033
removed unused files
budgetpreneur Aug 8, 2023
714d676
fix ReceiveRequest for timeout
budgetpreneur Aug 8, 2023
5be1234
reconnect fix from network failure
budgetpreneur Aug 8, 2023
6ce2853
reconnection delay fix for exponential type
mohitpubnub Aug 8, 2023
4e1fe1a
HandshakeFailureEvent to UnsubscribedState
budgetpreneur Aug 8, 2023
76d909f
Merge branch 'mgr-deserialize' of https://github.com/pubnub/c-sharp i…
budgetpreneur Aug 8, 2023
dc5f9ab
fix: background invocations
MikeDobrzan Aug 8, 2023
9eef18f
fix: race condition
MikeDobrzan Aug 8, 2023
db02e27
SubscribeEventEngine delegates for logging
budgetpreneur Aug 8, 2023
812fa3a
fix: core abstraction not broken anymore
MikeDobrzan Aug 9, 2023
2e6e738
fix: delay
MikeDobrzan Aug 9, 2023
2d889f3
fix: fix
MikeDobrzan Aug 9, 2023
f562344
fix: delay
MikeDobrzan Aug 9, 2023
d406f85
fix: delay
MikeDobrzan Aug 9, 2023
4db4290
fix: delay
MikeDobrzan Aug 9, 2023
8066cd7
fix: handshake
MikeDobrzan Aug 9, 2023
3c44ccf
fix: HandshakeEffectHandler.cs composition pattern
MikeDobrzan Aug 9, 2023
dcbc426
fix: ReceivingEffectHandler.cs composition pattern
MikeDobrzan Aug 9, 2023
71767ab
fix: cancel
MikeDobrzan Aug 9, 2023
1069833
`Unsubscribe()` at eventengine
mohitpubnub Aug 9, 2023
b846c14
fix: delay calculation as per doc
mohitpubnub Aug 9, 2023
a225a77
attempt value bound
mohitpubnub Aug 9, 2023
4c7e727
fix for disconnect and reconnect
budgetpreneur Aug 9, 2023
4ecdb57
Reconnect
budgetpreneur Aug 9, 2023
748806a
unused code
budgetpreneur Aug 9, 2023
13ad8b4
multi subscribes. add new channels to existing.
budgetpreneur Aug 9, 2023
35de95e
SubscribeEndpoint and UnsubscribeEndpoint
budgetpreneur Aug 18, 2023
a41c6ee
Added ee param for EE
budgetpreneur Aug 18, 2023
49c7846
Added Name to IEvent and IEffectInvocation
budgetpreneur Aug 21, 2023
1f7d822
default value, typos and minor fixes
budgetpreneur Aug 21, 2023
e859a53
happy path contract tests
budgetpreneur Aug 22, 2023
e011854
subscribe with timetoken and remove status for Receive(reconnect)
budgetpreneur Aug 24, 2023
29e86f7
Unsub state sub retstore event transition
budgetpreneur Aug 24, 2023
f961f96
fixes for sub with tt amid failures
budgetpreneur Aug 24, 2023
602157c
specflow auto generated
budgetpreneur Aug 24, 2023
793d9d4
Presence States WIP
budgetpreneur Aug 29, 2023
dedcee2
removed status check
budgetpreneur Aug 31, 2023
1617f6b
removed emit status from presence
budgetpreneur Aug 31, 2023
0b8c0d1
Presence States
budgetpreneur Sep 1, 2023
e942f2d
Updated Presence States
budgetpreneur Sep 1, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/Api/PubnubApi/Builder/UrlRequestBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,11 @@ Uri IUrlRequestBuilder.BuildMultiChannelSubscribeRequest(string requestMethod, s
requestQueryStringParams.Add("filter-expr", UriUtil.EncodeUriComponent(pubnubConfig[pubnubInstanceId].FilterExpression, currentType, false, false, false));
}

if (!requestQueryStringParams.ContainsKey("ee") && pubnubConfig.ContainsKey(pubnubInstanceId) && pubnubConfig[pubnubInstanceId].EnableEventEngine)
{
requestQueryStringParams.Add("ee", "1");
}

if (!requestQueryStringParams.ContainsKey("tt"))
{
requestQueryStringParams.Add("tt", timetoken.ToString(CultureInfo.InvariantCulture));
Expand Down
219 changes: 219 additions & 0 deletions src/Api/PubnubApi/EndPoint/PubSub/SubscribeEndpoint.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Net;
using System.Globalization;
using PubnubApi.EventEngine.Subscribe;
using PubnubApi;
using PubnubApi.EventEngine.Core;
using PubnubApi.EventEngine.Subscribe.Events;
using PubnubApi.EventEngine.Subscribe.States;
using PubnubApi.EventEngine.Subscribe.Common;

namespace PubnubApi.EndPoint
{
public class SubscribeEndpoint<T>: ISubscribeOperation<T>
{
private readonly PNConfiguration config;
private readonly IJsonPluggableLibrary jsonLibrary;
private readonly IPubnubUnitTest unit;
private readonly IPubnubLog pubnubLog;
private readonly EndPoint.TelemetryManager pubnubTelemetryMgr;
private readonly EndPoint.TokenManager pubnubTokenMgr;

private List<string> subscribeChannelNames = new List<string>();
private List<string> subscribeChannelGroupNames = new List<string>();
private long subscribeTimetoken = -1;
private bool presenceSubscribeEnabled;
private SubscribeManager2 manager;
private Dictionary<string, object> queryParam;
private Pubnub PubnubInstance;
private SubscribeEventEngine subscribeEventEngine;
private SubscribeEventEngineFactory subscribeEventEngineFactory { get; set; }
private string instanceId { get; set; }
public List<SubscribeCallback> SubscribeListenerList
{
get;
set;
} = new List<SubscribeCallback>();

public SubscribeEndpoint(PNConfiguration pubnubConfig, IJsonPluggableLibrary jsonPluggableLibrary, IPubnubUnitTest pubnubUnit, IPubnubLog log, EndPoint.TelemetryManager telemetryManager, EndPoint.TokenManager tokenManager,SubscribeEventEngineFactory subscribeEventEngineFactory, string instanceId, Pubnub instance)
{
PubnubInstance = instance;
config = pubnubConfig;
jsonLibrary = jsonPluggableLibrary;
unit = pubnubUnit;
pubnubLog = log;
pubnubTelemetryMgr = telemetryManager;
pubnubTokenMgr = tokenManager;
this.subscribeEventEngineFactory = subscribeEventEngineFactory;
this.instanceId = instanceId;
if (unit != null) { unit.EventTypeList = new List<KeyValuePair<string, string>>(); }
}

public ISubscribeOperation<T> Channels(string[] channels)
{
if (channels != null && channels.Length > 0 && !string.IsNullOrEmpty(channels[0]))
{
this.subscribeChannelNames.AddRange(channels);
}
return this;
}

public ISubscribeOperation<T> ChannelGroups(string[] channelGroups)
{
if (channelGroups != null && channelGroups.Length > 0 && !string.IsNullOrEmpty(channelGroups[0]))
{
this.subscribeChannelGroupNames.AddRange(channelGroups);
}
return this;
}

public ISubscribeOperation<T> WithTimetoken(long timetoken)
{
this.subscribeTimetoken = timetoken;
return this;
}

public ISubscribeOperation<T> WithPresence()
{
this.presenceSubscribeEnabled = true;
return this;
}

public ISubscribeOperation<T> QueryParam(Dictionary<string, object> customQueryParam)
{
this.queryParam = customQueryParam;
return this;
}

public void Execute()
{
if (this.subscribeChannelNames == null)
{
this.subscribeChannelNames = new List<string>();
}

if (this.subscribeChannelGroupNames == null)
{
this.subscribeChannelGroupNames = new List<string>();
}

if (this.presenceSubscribeEnabled)
{
List<string> presenceChannelNames = (this.subscribeChannelNames != null && this.subscribeChannelNames.Count > 0 && !string.IsNullOrEmpty(this.subscribeChannelNames[0]))
? this.subscribeChannelNames.Select(c => string.Format(CultureInfo.InvariantCulture, "{0}-pnpres", c)).ToList() : new List<string>();
List<string> presenceChannelGroupNames = (this.subscribeChannelGroupNames != null && this.subscribeChannelGroupNames.Count > 0 && !string.IsNullOrEmpty(this.subscribeChannelGroupNames[0]))
? this.subscribeChannelGroupNames.Select(c => string.Format(CultureInfo.InvariantCulture, "{0}-pnpres", c)).ToList() : new List<string>();

if (this.subscribeChannelNames != null && presenceChannelNames.Count > 0)
{
this.subscribeChannelNames.AddRange(presenceChannelNames);
}

if (this.subscribeChannelGroupNames != null && presenceChannelGroupNames.Count > 0)
{
this.subscribeChannelGroupNames.AddRange(presenceChannelGroupNames);
}
}

string[] channelNames = this.subscribeChannelNames != null ? this.subscribeChannelNames.ToArray() : null;
string[] channelGroupNames = this.subscribeChannelGroupNames != null ? this.subscribeChannelGroupNames.ToArray() : null;
SubscriptionCursor cursor = null;
if (subscribeTimetoken >= 1)
{
cursor = new SubscriptionCursor { Timetoken = subscribeTimetoken, Region = 0 };
}
Subscribe(channelNames, channelGroupNames, cursor, this.queryParam);
}

private void Subscribe(string[] channels, string[] channelGroups, SubscriptionCursor cursor, Dictionary<string, object> externalQueryParam)
{
if ((channels == null || channels.Length == 0) && (channelGroups == null || channelGroups.Length == 0))
{
throw new ArgumentException("Either Channel Or Channel Group or Both should be provided.");
}

if (this.subscribeEventEngineFactory.HasEventEngine(instanceId))
{
subscribeEventEngine = subscribeEventEngineFactory.GetEventEngine(instanceId);
}
else
{
var subscribeManager = new SubscribeManager2(config, jsonLibrary, unit, pubnubLog, pubnubTelemetryMgr, pubnubTokenMgr, PubnubInstance);
subscribeEventEngine = subscribeEventEngineFactory.InitializeEventEngine(instanceId, PubnubInstance, config, subscribeManager, StatusEmitter, MessageEmitter);
subscribeEventEngine.OnStateTransition += SubscribeEventEngine_OnStateTransition;
subscribeEventEngine.OnEventQueued += SubscribeEventEngine_OnEventQueued;
subscribeEventEngine.OnEffectDispatch += SubscribeEventEngine_OnEffectDispatch;
}
subscribeEventEngine.Subscribe<T>(channels, channelGroups, cursor);
}

private void SubscribeEventEngine_OnEffectDispatch(IEffectInvocation obj)
{
try
{
unit?.EventTypeList.Add(new KeyValuePair<string, string>("invocation", obj?.Name));
LoggingMethod.WriteToLog(pubnubLog, $"DateTime {DateTime.Now.ToString(CultureInfo.InvariantCulture)}, EE OnEffectDispatch : CurrentState = {subscribeEventEngine.CurrentState.GetType().Name} => Invocation = {obj.GetType().Name}", config.LogVerbosity);
}
catch (Exception ex)
{
LoggingMethod.WriteToLog(pubnubLog, $"DateTime {DateTime.Now.ToString(CultureInfo.InvariantCulture)}, EE OnEffectDispatch : CurrentState = {subscribeEventEngine.CurrentState.GetType().Name} => EXCEPTION = {ex}", config.LogVerbosity);
}
}

private void SubscribeEventEngine_OnEventQueued(IEvent @event)
{
try
{
unit?.EventTypeList.Add(new KeyValuePair<string, string>("event", @event?.Name));
int attempts = 0;
if (subscribeEventEngine.CurrentState is HandshakeReconnectingState handshakeReconnectingState)
{
attempts = handshakeReconnectingState.AttemptedRetries;
}
else if (subscribeEventEngine.CurrentState is ReceiveReconnectingState receiveReconnectingState)
{
attempts = receiveReconnectingState.AttemptedRetries;
}
LoggingMethod.WriteToLog(pubnubLog, $"DateTime {DateTime.Now.ToString(CultureInfo.InvariantCulture)}, EE OnEventQueued : CurrentState: {subscribeEventEngine.CurrentState.GetType().Name}; Event = {@event.GetType().Name}; Attempt = {attempts} of {config.ConnectionMaxRetries}", config.LogVerbosity);
}
catch(Exception ex)
{
LoggingMethod.WriteToLog(pubnubLog, $"DateTime {DateTime.Now.ToString(CultureInfo.InvariantCulture)}, EE OnEventQueued : CurrentState = {subscribeEventEngine.CurrentState.GetType().Name} => EXCEPTION = {ex}", config.LogVerbosity);
}
}

private void SubscribeEventEngine_OnStateTransition(EventEngine.Core.TransitionResult obj)
{
try
{
LoggingMethod.WriteToLog(pubnubLog, $"DateTime {DateTime.Now.ToString(CultureInfo.InvariantCulture)}, EE OnStateTransition : CurrentState = {subscribeEventEngine.CurrentState.GetType().Name} => Transition State = {obj?.State.GetType().Name}", config.LogVerbosity);
}
catch(Exception ex)
{
LoggingMethod.WriteToLog(pubnubLog, $"DateTime {DateTime.Now.ToString(CultureInfo.InvariantCulture)}, EE OnStateTransition : CurrentState = {subscribeEventEngine.CurrentState.GetType().Name} => EXCEPTION = {ex}", config.LogVerbosity);
}
}

private void MessageEmitter<T>(Pubnub pubnubInstance, PNMessageResult<T> messageResult)
{
foreach (var listener in SubscribeListenerList)
{
listener?.Message(pubnubInstance, messageResult);
}
}

private void StatusEmitter(Pubnub pubnubInstance, PNStatus status)
{
foreach (var listener in SubscribeListenerList)
{
listener?.Status(pubnubInstance, status);
}
}

}
}
Loading