Skip to content

Commit

Permalink
fix cancellationToken
Browse files Browse the repository at this point in the history
  • Loading branch information
vobradovich committed Nov 28, 2024
1 parent c2fbae2 commit f07a47e
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 13 deletions.
3 changes: 2 additions & 1 deletion net/src/Sails.Remoting/Core/RemotingReplyViaNodeClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ public override async Task<T> ReadAsync(CancellationToken cancellationToken)
{
Ensure.Any.IsNotNull(this.blocksStream, nameof(this.blocksStream));

this.replyMessage = await this.blocksStream.ReadAllEventsAsync(cancellationToken)
this.replyMessage = await this.blocksStream
.ReadAllEventsAsync(cancellationToken)
.SelectGearEvents()
.SelectIfMatches(
GearEvent.UserMessageSent,
Expand Down
8 changes: 5 additions & 3 deletions net/src/Substrate.Gear.Client/BlocksStreamBuilder.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using EnsureThat;
Expand All @@ -17,8 +18,9 @@ internal static BlocksStreamBuilder FromNode(SubstrateClientExt nodeClient)
}

internal async Task<BlocksStream> CreateAsync(
Func<SubstrateClientExt, Action<string, Header>, Task<string>> subscribe,
Func<SubstrateClientExt, string, Task> unsubscribe)
Func<SubstrateClientExt, Action<string, Header>, CancellationToken, Task<string>> subscribe,
Func<SubstrateClientExt, string, Task> unsubscribe,
CancellationToken cancellationToken)
{
var channel = Channel.CreateUnbounded<Header>(
new UnboundedChannelOptions
Expand All @@ -27,7 +29,7 @@ internal async Task<BlocksStream> CreateAsync(
});

void Callback(string _, Header blockHeader) => channel.Writer.TryWrite(blockHeader);
var subscriptionId = await subscribe(nodeClient, Callback).ConfigureAwait(false);
var subscriptionId = await subscribe(nodeClient, Callback, cancellationToken).ConfigureAwait(false);

return new BlocksStream(nodeClient, subscriptionId, channel, unsubscribe);
}
Expand Down
21 changes: 12 additions & 9 deletions net/src/Substrate.Gear.Client/SubstrateClientExtExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -255,10 +255,11 @@ public static Task<BlocksStream> GetAllBlocksStreamAsync(
EnsureArg.IsNotNull(nodeClient, nameof(nodeClient));

return BlocksStreamBuilder.FromNode(nodeClient).CreateAsync(
(nodeClient, callback) =>
static (nodeClient, callback, cancellationToken) =>
nodeClient.Chain.SubscribeAllHeadsAsync(callback, cancellationToken),
(nodeClient, subscriptionId) =>
nodeClient.Chain.UnsubscribeAllHeadsAsync(subscriptionId, CancellationToken.None));
static (nodeClient, subscriptionId) =>
nodeClient.Chain.UnsubscribeAllHeadsAsync(subscriptionId, CancellationToken.None),
cancellationToken);
}

/// <summary>
Expand All @@ -274,10 +275,11 @@ public static Task<BlocksStream> GetNewBlocksStreamAsync(
EnsureArg.IsNotNull(nodeClient, nameof(nodeClient));

return BlocksStreamBuilder.FromNode(nodeClient).CreateAsync(
(nodeClient, callback) =>
static (nodeClient, callback, cancellationToken) =>
nodeClient.Chain.SubscribeNewHeadsAsync(callback, cancellationToken),
(nodeClient, subscriptionId) =>
nodeClient.Chain.UnsubscribeNewHeadsAsync(subscriptionId, CancellationToken.None));
static (nodeClient, subscriptionId) =>
nodeClient.Chain.UnsubscribeNewHeadsAsync(subscriptionId, CancellationToken.None),
cancellationToken);
}

/// <summary>
Expand All @@ -296,10 +298,11 @@ public static Task<BlocksStream> GetFinalizedBlocksStreamAsync(
// notification, i.e., if you observe block X and then X + 2, it means that block X + 1 was finalized too.
// Probably it should be accounted here and missed blocks should be fetched from the chain.
return BlocksStreamBuilder.FromNode(nodeClient).CreateAsync(
(nodeClient, callback) =>
static (nodeClient, callback, cancellationToken) =>
nodeClient.Chain.SubscribeFinalizedHeadsAsync(callback, cancellationToken),
(nodeClient, subscriptionId) =>
nodeClient.Chain.UnsubscribeFinalizedHeadsAsync(subscriptionId, CancellationToken.None));
static (nodeClient, subscriptionId) =>
nodeClient.Chain.UnsubscribeFinalizedHeadsAsync(subscriptionId, CancellationToken.None),
cancellationToken);
}

/// <summary>
Expand Down

0 comments on commit f07a47e

Please sign in to comment.