Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(net): implement Event listener #688

Closed
wants to merge 10 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 20 additions & 47 deletions net/rs/client-gen/src/events_generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ pub(crate) struct EventsGenerator<'a> {
type_generator: TypeDeclGenerator<'a>,
enum_tokens: Tokens,
class_tokens: Tokens,
listener_tokens: Tokens,
event_routes_tokens: Tokens,
}

impl<'a> EventsGenerator<'a> {
Expand All @@ -19,23 +19,25 @@ impl<'a> EventsGenerator<'a> {
type_generator,
enum_tokens: Tokens::new(),
class_tokens: Tokens::new(),
listener_tokens: Tokens::new(),
event_routes_tokens: Tokens::new(),
}
}

pub(crate) fn finalize(self) -> Tokens {
let name = &self.service_name.to_case(Case::Pascal);
let name = self.service_name;
let enum_name = &format!("{}Events", name);
let class_name = &format!("Enum{}Events", name);
let listener_name = &format!("{}Listener", name);

let system_buffer = &csharp::import("global::System", "Buffer");
let core_listener = &csharp::import(
"global::Sails.Remoting.Abstractions.Core",
"IRemotingListener",
);
let service_listener =
&csharp::import("global::Sails.Remoting.Abstractions", "IRemotingListener");
let task = &csharp::import("global::System.Threading.Tasks", "Task");
let cancellation_token = &csharp::import("global::System.Threading", "CancellationToken");
let async_enumerable =
&csharp::import("global::System.Collections.Generic", "IAsyncEnumerable");
let actor_id_type = primitive_type_to_dotnet(PrimitiveType::ActorId);

quote! {
public enum $enum_name
Expand All @@ -51,46 +53,20 @@ impl<'a> EventsGenerator<'a> {
}
}
$['\n']
public sealed partial class $listener_name : $service_listener<$class_name>
public static class $listener_name
{
private static readonly byte[][] EventRoutes =
[
$(self.listener_tokens)
];
$['\n']
private readonly $core_listener remoting;
private const string ROUTE = $(quoted(name));
$['\n']
public $listener_name($core_listener remoting)
{
this.remoting = remoting;
}
private static readonly string[] EventRoutes =
[
$(self.event_routes_tokens)
];
$['\n']
public async global::System.Collections.Generic.IAsyncEnumerable<$class_name> ListenAsync([global::System.Runtime.CompilerServices.EnumeratorCancellation] global::System.Threading.CancellationToken cancellationToken = default)
{
await foreach (var bytes in this.remoting.ListenAsync(cancellationToken))
{
byte idx = 0;
foreach (var route in EventRoutes)
{
if (route.Length > bytes.Length)
{
continue;
}
if (route.AsSpan().SequenceEqual(bytes.AsSpan()[..route.Length]))
{
var bytesLength = bytes.Length - route.Length + 1;
var data = new byte[bytesLength];
data[0] = idx;
$system_buffer.BlockCopy(bytes, route.Length, data, 1, bytes.Length - route.Length);

var p = 0;
$class_name ev = new();
ev.Decode(bytes, ref p);
yield return ev;
}
idx++;
}
}
public static async $task<$async_enumerable<($actor_id_type, $class_name)>> ListenAsync($core_listener remoting, $cancellation_token cancellationToken = default)
{$['\r']
var eventStream = await remoting.ListenAsync(cancellationToken);$['\r']
return eventStream.SelectEvent<$class_name>(ROUTE, EventRoutes);$['\r']
}
}
$['\n']
Expand All @@ -105,12 +81,9 @@ impl<'a> Visitor<'a> for EventsGenerator<'a> {

fn visit_service_event(&mut self, event: &'a ServiceEvent) {
let name = &self.service_name.to_case(Case::Pascal);
let service_route_bytes = path_bytes(self.service_name).0;
let event_route_bytes = path_bytes(event.name()).0;
let route_bytes = [service_route_bytes, event_route_bytes].join(", ");

quote_in! { self.listener_tokens =>
[$(&route_bytes)],
quote_in! { self.event_routes_tokens =>
$(quoted(event.name())),
};

quote_in! { self.enum_tokens =>
Expand Down
13 changes: 9 additions & 4 deletions net/rs/client-gen/tests/snapshots/generator__events_works.snap
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ using global::Sails.Remoting.Abstractions;
using global::Sails.Remoting.Abstractions.Core;
using global::System;
using global::System.Collections.Generic;
using global::System.Threading;
using global::System.Threading.Tasks;

#nullable enable

Expand Down Expand Up @@ -39,13 +41,16 @@ this.AddTypeDecoder<MyParam>(ServiceWithEventsEvents.Three);
this.AddTypeDecoder<global::Substrate.NetApi.Model.Types.Base.BaseVoid>(ServiceWithEventsEvents.Reset);
} }

public sealed partial class ServiceWithEventsListener : IRemotingListener<EnumServiceWithEventsEvents> { private static readonly byte[][] EventRoutes = [ [68, 83, 101, 114, 118, 105, 99, 101, 87, 105, 116, 104, 69, 118, 101, 110, 116, 115, 12, 79, 110, 101],[68, 83, 101, 114, 118, 105, 99, 101, 87, 105, 116, 104, 69, 118, 101, 110, 116, 115, 12, 84, 119, 111],[68, 83, 101, 114, 118, 105, 99, 101, 87, 105, 116, 104, 69, 118, 101, 110, 116, 115, 20, 84, 104, 114, 101, 101],[68, 83, 101, 114, 118, 105, 99, 101, 87, 105, 116, 104, 69, 118, 101, 110, 116, 115, 20, 82, 101, 115, 101, 116], ];
public static class ServiceWithEventsListener {

private readonly global::Sails.Remoting.Abstractions.Core.IRemotingListener remoting;
private const string ROUTE = "ServiceWithEvents";

public ServiceWithEventsListener(global::Sails.Remoting.Abstractions.Core.IRemotingListener remoting) { this.remoting = remoting; }
private static readonly string[] EventRoutes = [ "One","Two","Three","Reset", ];

public async global::System.Collections.Generic.IAsyncEnumerable<EnumServiceWithEventsEvents> ListenAsync([global::System.Runtime.CompilerServices.EnumeratorCancellation] global::System.Threading.CancellationToken cancellationToken = default) { await foreach (var bytes in this.remoting.ListenAsync(cancellationToken)) { byte idx = 0; foreach (var route in EventRoutes) { if (route.Length > bytes.Length) { continue; } if (route.AsSpan().SequenceEqual(bytes.AsSpan()[..route.Length])) { var bytesLength = bytes.Length - route.Length + 1; var data = new byte[bytesLength]; data[0] = idx; Buffer.BlockCopy(bytes, route.Length, data, 1, bytes.Length - route.Length); var p = 0; EnumServiceWithEventsEvents ev = new(); ev.Decode(bytes, ref p); yield return ev; } idx++; } } } }
public static async Task<IAsyncEnumerable<(global::Substrate.Gear.Api.Generated.Model.gprimitives.ActorId, EnumServiceWithEventsEvents)>> ListenAsync(IRemotingListener remoting, CancellationToken cancellationToken = default) {
var eventStream = await remoting.ListenAsync(cancellationToken);
return eventStream.SelectEvent<EnumServiceWithEventsEvents>(ROUTE, EventRoutes);
} }

public sealed partial class MyParam : global::Substrate.NetApi.Model.Types.Base.BaseType {
public global::Substrate.Gear.Client.NetApi.Model.Types.Primitive.NonZeroU256 F1 { get; init; } = new();
Expand Down
13 changes: 9 additions & 4 deletions net/rs/client-gen/tests/snapshots/generator__full.snap
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ using global::Sails.Remoting.Abstractions;
using global::Sails.Remoting.Abstractions.Core;
using global::System;
using global::System.Collections.Generic;
using global::System.Threading;
using global::System.Threading.Tasks;

#nullable enable

Expand Down Expand Up @@ -72,13 +74,16 @@ public enum ServiceEvents {
this.AddTypeDecoder<global::Substrate.NetApi.Model.Types.Primitive.Str>(ServiceEvents.ThatDone);
} }

public sealed partial class ServiceListener : IRemotingListener<EnumServiceEvents> { private static readonly byte[][] EventRoutes = [ [28, 83, 101, 114, 118, 105, 99, 101, 32, 84, 104, 105, 115, 68, 111, 110, 101],[28, 83, 101, 114, 118, 105, 99, 101, 32, 84, 104, 97, 116, 68, 111, 110, 101], ];
public static class ServiceListener {

private readonly global::Sails.Remoting.Abstractions.Core.IRemotingListener remoting;
private const string ROUTE = "Service";

public ServiceListener(global::Sails.Remoting.Abstractions.Core.IRemotingListener remoting) { this.remoting = remoting; }
private static readonly string[] EventRoutes = [ "ThisDone","ThatDone", ];

public async global::System.Collections.Generic.IAsyncEnumerable<EnumServiceEvents> ListenAsync([global::System.Runtime.CompilerServices.EnumeratorCancellation] global::System.Threading.CancellationToken cancellationToken = default) { await foreach (var bytes in this.remoting.ListenAsync(cancellationToken)) { byte idx = 0; foreach (var route in EventRoutes) { if (route.Length > bytes.Length) { continue; } if (route.AsSpan().SequenceEqual(bytes.AsSpan()[..route.Length])) { var bytesLength = bytes.Length - route.Length + 1; var data = new byte[bytesLength]; data[0] = idx; Buffer.BlockCopy(bytes, route.Length, data, 1, bytes.Length - route.Length); var p = 0; EnumServiceEvents ev = new(); ev.Decode(bytes, ref p); yield return ev; } idx++; } } } }
public static async Task<IAsyncEnumerable<(global::Substrate.Gear.Api.Generated.Model.gprimitives.ActorId, EnumServiceEvents)>> ListenAsync(IRemotingListener remoting, CancellationToken cancellationToken = default) {
var eventStream = await remoting.ListenAsync(cancellationToken);
return eventStream.SelectEvent<EnumServiceEvents>(ROUTE, EventRoutes);
} }

/// <summary>
/// ThisThatSvcAppTupleStruct docs
Expand Down
11 changes: 7 additions & 4 deletions net/src/Sails.Remoting.Abstractions/Core/IRemotingListener.cs
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
using System.Collections.Generic;
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Substrate.Gear.Api.Generated.Model.gprimitives;

namespace Sails.Remoting.Abstractions.Core;

public interface IRemotingListener
public interface IRemotingListener : IAsyncDisposable
{
/// <summary>
/// Listen to Gear events
/// Asynchronously subscribe to Gear events
/// </summary>
/// <param name="cancellationToken"></param>
/// <returns></returns>
IAsyncEnumerable<byte[]> ListenAsync(CancellationToken cancellationToken);
Task<IAsyncEnumerable<(ActorId Source, byte[] Payload)>> ListenAsync(CancellationToken cancellationToken);
}
6 changes: 6 additions & 0 deletions net/src/Sails.Remoting.Abstractions/Core/IRemotingProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,10 @@ public interface IRemotingProvider
/// <param name="signingAccount"></param>
/// <returns></returns>
IRemoting CreateRemoting(Account signingAccount);

/// <summary>
/// Creates an instance implementing the <see cref="IRemotingListener"/> interface
/// </summary>
/// <returns></returns>
IRemotingListener CreateRemotingListener();
}
15 changes: 0 additions & 15 deletions net/src/Sails.Remoting.Abstractions/IRemotingListener.cs

This file was deleted.

63 changes: 63 additions & 0 deletions net/src/Sails.Remoting/Core/RemotingListenerViaNodeClient.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using EnsureThat;
using Sails.Remoting.Abstractions.Core;
using StreamJsonRpc;
using Substrate.Gear.Api.Generated.Model.gear_core.message.user;
using Substrate.Gear.Api.Generated.Model.gprimitives;
using Substrate.Gear.Api.Generated.Model.vara_runtime;
using Substrate.Gear.Client;
using Substrate.Gear.Client.GearApi.Model.gprimitives;
using Substrate.Gear.Client.NetApi.Model.Rpc;
using Substrate.Gear.Client.NetApi.Model.Types.Base;

namespace Sails.Remoting.Core;

internal sealed class RemotingListenerViaNodeClient : IRemotingListener
{
public RemotingListenerViaNodeClient(INodeClientProvider nodeClientProvider)
{
EnsureArg.IsNotNull(nodeClientProvider, nameof(nodeClientProvider));

this.nodeClientProvider = nodeClientProvider;
}

private readonly INodeClientProvider nodeClientProvider;
private Substrate.Gear.Api.Generated.SubstrateClientExt? nodeClient;
private BlocksStream? blocksStream;

public async Task<IAsyncEnumerable<(ActorId Source, byte[] Payload)>> ListenAsync(CancellationToken cancellationToken)
{
this.nodeClient ??= await this.nodeClientProvider.GetNodeClientAsync(cancellationToken).ConfigureAwait(false);
this.blocksStream ??= await this.nodeClient.GetNewBlocksStreamAsync(cancellationToken).ConfigureAwait(false);

return this.blocksStream.ReadAllHeadersAsync(cancellationToken)
.SelectAwait(
async blockHeader =>
await this.nodeClient.ListBlockEventsAsync(blockHeader.GetBlockHash(), cancellationToken)
.ConfigureAwait(false))
.SelectMany(eventRecords => eventRecords.AsAsyncEnumerable())
.Select(eventRecord => eventRecord.Event.ToBaseEnumRust())
.SelectIfMatches(
RuntimeEvent.Gear,
(EnumGearEvent gearEvent) => gearEvent.ToBaseEnumRust())
.SelectIfMatches(
GearEvent.UserMessageSent,
(UserMessageSentEventData data) => (UserMessage)data.Value[0])
.Where(userMessage => userMessage.Destination.IsEqualTo(ActorIdExtensions.Zero))
.Select(userMessage => (userMessage.Source, userMessage.Payload.Value.Value.Select(@byte => @byte.Value).ToArray()));
}

public async ValueTask DisposeAsync()
{
var bs = Interlocked.Exchange(ref this.blocksStream, null);
if (bs is not null)
{
await bs.DisposeAsync().ConfigureAwait(false);
}
var nc = Interlocked.Exchange(ref this.nodeClient, null);
nc?.Dispose();
}
}
7 changes: 6 additions & 1 deletion net/src/Sails.Remoting/Core/RemotingProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,17 @@ namespace Sails.Remoting.Core;

internal sealed class RemotingProvider : IRemotingProvider
{
public RemotingProvider(Func<Account, IRemoting> remotingFactory)
public RemotingProvider(Func<Account, IRemoting> remotingFactory, Func<IRemotingListener> remotingListenerFactory)
{
EnsureArg.IsNotNull(remotingFactory, nameof(remotingFactory));
EnsureArg.IsNotNull(remotingListenerFactory, nameof(remotingListenerFactory));

this.remotingFactory = remotingFactory;
this.remotingListenerFactory = remotingListenerFactory;
}

private readonly Func<Account, IRemoting> remotingFactory;
private readonly Func<IRemotingListener> remotingListenerFactory;

/// <inheritdoc/>
public IRemoting CreateRemoting(Account signingAccount)
Expand All @@ -23,4 +26,6 @@ public IRemoting CreateRemoting(Account signingAccount)

return this.remotingFactory(signingAccount);
}

public IRemotingListener CreateRemotingListener() => this.remotingListenerFactory();
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ public static IServiceCollection AddRemotingViaNodeClient(
serviceProvicer => new RemotingProvider(
signingAccount => new RemotingViaNodeClient(
serviceProvicer.GetRequiredService<INodeClientProvider>(),
signingAccount)));
signingAccount),
() => new RemotingListenerViaNodeClient(serviceProvicer.GetRequiredService<INodeClientProvider>())
)
);

return services;
}
Expand Down
69 changes: 69 additions & 0 deletions net/src/Sails.Remoting/EventAsyncIterator.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using Substrate.Gear.Api.Generated.Model.gprimitives;
using Substrate.NetApi.Model.Types;
using Substrate.NetApi.Model.Types.Primitive;

namespace Sails.Remoting;

internal class EventAsyncIterator<T> : IAsyncEnumerable<(ActorId Source, T Event)>
where T : IType, new()
{

private readonly IAsyncEnumerable<(ActorId Source, byte[] Payload)> source;
private readonly byte[] serviceRoute;
private readonly byte[][] eventRoutes;

internal EventAsyncIterator(
IAsyncEnumerable<(ActorId Source, byte[] Payload)> source,
string serviceRoute,
string[] eventRoutes)
{
this.source = source;
this.serviceRoute = new Str(serviceRoute).Encode();
this.eventRoutes = eventRoutes.Select(r => new Str(r).Encode()).ToArray();
}

public IAsyncEnumerator<(ActorId Source, T Event)> GetAsyncEnumerator(CancellationToken cancellationToken = default)
=> this.source
.Select(this.Map)
.Where(x => x != null)
.Select(x => x!.Value)
.GetAsyncEnumerator(cancellationToken);

private (ActorId Source, T Event)? Map((ActorId, byte[]) tuple)
{
var (source, bytes) = tuple;
var serviceLength = this.serviceRoute.Length;
if (bytes.Length < serviceLength || !this.serviceRoute.AsSpan().SequenceEqual(bytes.AsSpan(0, serviceLength)))
{
return null;
}
var offset = serviceLength;
byte idx = 0;
foreach (var route in this.eventRoutes)
{
if (bytes.Length < route.Length + offset)
{
continue;
}
if (route.AsSpan().SequenceEqual(bytes.AsSpan(offset, route.Length)))
{
offset += route.Length;
var bytesLength = bytes.Length - offset + 1;
var data = new byte[bytesLength];
data[0] = idx;
Buffer.BlockCopy(bytes, offset, data, 1, bytesLength - 1);

var p = 0;
T ev = new();
ev.Decode(data, ref p);
return (source, ev);
}
idx++;
}
return null;
}
}
Loading