Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Messaging improvements. #45

Merged
merged 1 commit into from
Mar 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 5 additions & 22 deletions caching/Squidex.Caching.Tests/ReplicatedCacheTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,11 @@ namespace Squidex.Caching;
public class ReplicatedCacheTests
{
private readonly IMessageBus pubSub = A.Fake<IMessageBus>();
private readonly ReplicatedCacheOptions options = new ReplicatedCacheOptions { Enable = true };
private readonly ReplicatedCache sut;

public ReplicatedCacheTests()
{
sut = new ReplicatedCache(CreateMemoryCache(), pubSub, Options.Create(options));
sut = new ReplicatedCache(CreateMemoryCache(), pubSub);
}

[Fact]
Expand Down Expand Up @@ -51,22 +50,19 @@ await sut.AddAsync(
AssertCache(sut, "Key2", 1, true);

await sut.RemoveAsync(
new[]
{
[
"Key1",
"Key2"
});
]);

AssertCache(sut, "Key1", null, false);
AssertCache(sut, "Key2", null, false);
}

[Fact]
public async Task Should_not_serve_from_cache_when_disabled()
public async Task Should_not_serve_from_cache_when_expiration_is_not_set()
{
options.Enable = false;

await sut.AddAsync("Key", 1, TimeSpan.FromMilliseconds(100));
await sut.AddAsync("Key", 1, TimeSpan.Zero);

AssertCache(sut, "Key", null, false);
}
Expand All @@ -93,25 +89,12 @@ public async Task Should_not_invalidate_other_instances_when_added()
[Fact]
public async Task Should_send_invalidation_message_when_removed()
{
options.Enable = true;

await sut.RemoveAsync("Key");

A.CallTo(() => pubSub.PublishAsync(A<CacheInvalidateMessage>.That.Matches(x => x.Keys.Contains("Key")), null, A<CancellationToken>._))
.MustHaveHappened();
}

[Fact]
public async Task Should_not_send_invalidation_message_when_not_enabled()
{
options.Enable = false;

await sut.RemoveAsync("Key");

A.CallTo(() => pubSub.PublishAsync(A<object>._, null, A<CancellationToken>._))
.MustNotHaveHappened();
}

[Fact]
public async Task Should_invalidate_keys_when_message_received()
{
Expand Down
13 changes: 4 additions & 9 deletions caching/Squidex.Caching/CachingServiceExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,11 @@ public static IServiceCollection AddAsyncLocalCache(this IServiceCollection serv
return services;
}

public static IServiceCollection AddReplicatedCache(this IServiceCollection services, Action<ReplicatedCacheOptions>? configureOptions = null)
public static IServiceCollection AddReplicatedCache(this IServiceCollection services)
{
services.ConfigureOptional(configureOptions!);
services.AddSingleton<ReplicatedCache>();

services.TryAddSingleton<IReplicatedCache>(
c => c.GetRequiredService<ReplicatedCache>());

services.TryAddSingleton<IMessageHandler<CacheInvalidateMessage>>(
c => c.GetRequiredService<ReplicatedCache>());
services.AddMemoryCache();
services.AddSingletonAs<ReplicatedCache>()
.As<IReplicatedCache>().As<IMessageHandler<CacheInvalidateMessage>>();

return services;
}
Expand Down
22 changes: 4 additions & 18 deletions caching/Squidex.Caching/ReplicatedCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
// ==========================================================================

using Microsoft.Extensions.Caching.Memory;
using Microsoft.Extensions.Options;
using Squidex.Caching;
using Squidex.Messaging;

namespace Squidex.Caching;
Expand All @@ -15,15 +15,13 @@ public sealed class ReplicatedCache : IReplicatedCache, IMessageHandler<CacheInv
{
private readonly IMemoryCache memoryCache;
private readonly IMessageBus messageBus;
private readonly ReplicatedCacheOptions options;

public Guid InstanceId { get; } = Guid.NewGuid();

public ReplicatedCache(IMemoryCache memoryCache, IMessageBus messageBus, IOptions<ReplicatedCacheOptions> options)
public ReplicatedCache(IMemoryCache memoryCache, IMessageBus messageBus)
{
this.memoryCache = memoryCache;
this.messageBus = messageBus;
this.options = options.Value;
}

public Task HandleAsync(CacheInvalidateMessage message,
Expand All @@ -46,7 +44,7 @@ public Task HandleAsync(CacheInvalidateMessage message,
public Task AddAsync(string key, object? value, TimeSpan expiration,
CancellationToken ct = default)
{
if (!options.Enable)
if (expiration <= TimeSpan.Zero)
{
return Task.CompletedTask;
}
Expand All @@ -59,7 +57,7 @@ public Task AddAsync(string key, object? value, TimeSpan expiration,
public Task AddAsync(IEnumerable<KeyValuePair<string, object?>> items, TimeSpan expiration,
CancellationToken ct = default)
{
if (!options.Enable)
if (expiration <= TimeSpan.Zero)
{
return Task.CompletedTask;
}
Expand Down Expand Up @@ -87,11 +85,6 @@ public Task RemoveAsync(string key1, string key2,
public async Task RemoveAsync(string[] keys,
CancellationToken ct = default)
{
if (!options.Enable)
{
return;
}

foreach (var key in keys)
{
if (key != null)
Expand All @@ -105,13 +98,6 @@ public async Task RemoveAsync(string[] keys,

public bool TryGetValue(string key, out object? value)
{
if (!options.Enable)
{
value = null;

return false;
}

return memoryCache.TryGetValue(key, out value);
}

Expand Down
13 changes: 0 additions & 13 deletions caching/Squidex.Caching/ReplicatedCacheOptions.cs

This file was deleted.

1 change: 1 addition & 0 deletions caching/Squidex.Caching/Squidex.Caching.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
<PackageReference Include="Microsoft.Extensions.Caching.Abstractions" Version="8.0.0" />
<PackageReference Include="Microsoft.Extensions.Caching.Memory" Version="8.0.0" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="8.0.0" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="8.0.0" />
<PackageReference Include="Microsoft.Extensions.Options" Version="8.0.2" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using Microsoft.Extensions.Configuration;
using Squidex.Messaging;
using Squidex.Messaging.Mongo;
using Squidex.Messaging.Subscriptions;

namespace Microsoft.Extensions.DependencyInjection;

Expand All @@ -24,13 +25,13 @@ public static IServiceCollection AddMongoTransport(this IServiceCollection servi
return services;
}

public static IServiceCollection AddMongoSubscriptions(this IServiceCollection services, IConfiguration config, Action<MongoSubscriptionStoreOptions>? configure = null,
public static IServiceCollection AddMongoMessagingData(this IServiceCollection services, IConfiguration config, Action<MongoMessagingDataOptions>? configure = null,
string configPath = "messaging:mongoDb:subscriptions")
{
services.ConfigureAndValidate(config, configPath, configure);

services.AddSingletonAs<MongoSubscriptionStore>()
.As<IMessagingSubscriptionStore>();
services.AddSingletonAs<MongoMessagingDataStore>()
.As<IMessagingDataStore>();

return services;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

namespace Squidex.Messaging.Mongo;

public sealed class MongoSubscriptionStoreOptions : IValidatableOptions
public sealed class MongoMessagingDataOptions : IValidatableOptions
{
public string CollectionName { get; set; } = "Subscriptions";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@
using Microsoft.Extensions.Options;
using MongoDB.Driver;
using Squidex.Hosting;
using Squidex.Messaging.Subscriptions;

namespace Squidex.Messaging.Mongo;

public sealed class MongoSubscriptionStore : IMessagingSubscriptionStore, IInitializable
public sealed class MongoMessagingDataStore : IMessagingDataStore, IInitializable
{
private readonly IMongoCollection<Entity> collection;

Expand All @@ -32,7 +33,7 @@ private sealed class Entity
public DateTime Expiration { get; set; }
}

public MongoSubscriptionStore(IMongoDatabase database, IOptions<MongoSubscriptionStoreOptions> options)
public MongoMessagingDataStore(IMongoDatabase database, IOptions<MongoMessagingDataOptions> options)
{
collection = database.GetCollection<Entity>(options.Value.CollectionName);
}
Expand All @@ -57,10 +58,10 @@ public Task InitializeAsync(
}, ct);
}

public async Task<IReadOnlyList<(string Key, SerializedObject Value, DateTime Expiration)>> GetSubscriptionsAsync(string group,
public async Task<IReadOnlyList<Entry>> GetEntriesAsync(string group,
CancellationToken ct)
{
var result = new List<(string Key, SerializedObject Value, DateTime Expiration)>();
var result = new List<Entry>();

var cursor = await collection.Find(x => x.Group == group).ToCursorAsync(ct);

Expand All @@ -70,14 +71,14 @@ public Task InitializeAsync(
{
var value = new SerializedObject(item.ValueData, item.ValueType, item.ValueFormat);

result.Add((item.Key, value, item.Expiration));
result.Add(new Entry(group, item.Key, value, item.Expiration));
}
}

return result;
}

public async Task SubscribeManyAsync(SubscribeRequest[] requests,
public async Task StoreManyAsync(Entry[] requests,
CancellationToken ct)
{
List<WriteModel<Entity>>? updates = null;
Expand Down Expand Up @@ -105,20 +106,14 @@ public async Task SubscribeManyAsync(SubscribeRequest[] requests,
}
}

public Task UnsubscribeAsync(string group, string key,
public Task DeleteAsync(string group, string key,
CancellationToken ct)
{
string id = GetId(group, key);

return collection.DeleteOneAsync(x => x.Id == id, ct);
}

public Task CleanupAsync(DateTime now,
CancellationToken ct)
{
return Task.CompletedTask;
}

private static string GetId(string group, string key)
{
return $"{group}/{key}";
Expand Down
18 changes: 11 additions & 7 deletions messaging/Squidex.Messaging.Mongo/MongoSubscription.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,24 @@ internal sealed class MongoSubscription : IAsyncDisposable, IMessageAck
private readonly string? queueFilter;
private readonly IMongoCollection<MongoMessage> collection;
private readonly MongoTransportOptions options;
private readonly IClock clock;
private readonly ILogger log;
private readonly TimeProvider timeProvider;
private readonly SimpleTimer timer;
private readonly ILogger log;

public MongoSubscription(MessageTransportCallback callback, IMongoCollection<MongoMessage> collection,
public MongoSubscription(
MessageTransportCallback callback,
IMongoCollection<MongoMessage> collection,
string? collectionName,
string? queueFilter,
MongoTransportOptions options, IClock clock, ILogger log)
MongoTransportOptions options,
TimeProvider timeProvider,
ILogger log)
{
this.queueFilter = queueFilter;
this.collectionName = collectionName;
this.collection = collection;
this.options = options;
this.clock = clock;
this.timeProvider = timeProvider;
this.log = log;

timer = new SimpleTimer(async ct =>
Expand Down Expand Up @@ -60,7 +64,7 @@ private Task<bool> PollMessageAsync(MessageTransportCallback callback,
private async Task<bool> PollNormalAsync(MessageTransportCallback callback,
CancellationToken ct)
{
var now = clock.UtcNow;
var now = timeProvider.GetUtcNow().UtcDateTime;

// We can fetch an document in one go with this operation.
var mongoMessage =
Expand All @@ -81,7 +85,7 @@ await collection.FindOneAndUpdateAsync(CreateFilter(now),
private async Task<bool> PollPrefetchAsync(MessageTransportCallback callback,
CancellationToken ct)
{
var now = clock.UtcNow;
var now = timeProvider.GetUtcNow().UtcDateTime;

// There is no way to limit the updates, therefore we have to query candidates first.
var candidates =
Expand Down
Loading
Loading