Skip to content

Commit

Permalink
feat: outbox strategy
Browse files Browse the repository at this point in the history
  • Loading branch information
ronimizy committed Nov 30, 2024
1 parent 882eefe commit 8ff0bfc
Show file tree
Hide file tree
Showing 10 changed files with 163 additions and 71 deletions.
2 changes: 1 addition & 1 deletion scripts/changes-scripts.sh
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ function find_changed_projects {
echo "$project" | find_project_references_by_short_name | short_name_to_dir_path | while IFS= read -r dependency
do
>&2 echo "\t"'checking changes for project dependency = '"$dependency"
changed=$(echo "$changes" | grep -Ec "$dependency")
changed=$(echo "$changes" | grep -Ec "$dependency"$)

if [[ ! "$changed" -eq 0 ]]
then
Expand Down
4 changes: 2 additions & 2 deletions src/Itmo.Dev.Platform.Kafka/Itmo.Dev.Platform.Kafka.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@

<PropertyGroup>
<PackageReleaseNotes>
Added ssl/sasl configuration
Added outbox strategy
</PackageReleaseNotes>
</PropertyGroup>

<PropertyGroup Label="PlatformVersion">
<MajorVersion>2</MajorVersion>
<MinorVersion>2</MinorVersion>
<MinorVersion>3</MinorVersion>
<PatchVersion>234</PatchVersion>
<PackageVersion>$(MajorVersion).$(MinorVersion).$(PatchVersion)</PackageVersion>
</PropertyGroup>
Expand Down
26 changes: 18 additions & 8 deletions src/Itmo.Dev.Platform.Kafka/Producer/Builders/ProducerBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -120,24 +120,34 @@ public IOutboxProducerBuilder SerializeValueByDefault()

public IProducerBuilder WithOutbox()
{
if (_configuration.GetSection("Outbox").Exists() is false)
IConfigurationSection outboxSection = _configuration.GetSection("Outbox");

if (outboxSection.Exists() is false)
{
string message = $"Outbox for topic {_topicName} is configured, but Outbox sub-section is not specified";
throw new InvalidOperationException(message);
}

var messageName = $"_platform_kafka_outbox_{_topicName}";
var outboxStrategy = outboxSection.GetValue("Strategy", OutboxStrategy.Always);

_collection.AddScoped<IKafkaMessageProducer<TKey, TValue>>(
p => ActivatorUtilities.CreateInstance<OutboxMessageProducer<TKey, TValue>>(p, messageName));
if (outboxStrategy is OutboxStrategy.Fallback)
{
_collection.AddScoped<IKafkaMessageProducer<TKey, TValue>>(provider => ActivatorUtilities
.CreateInstance<FallbackOutboxMessageProducer<TKey, TValue>>(provider, _topicName));
}
else
{
_collection.AddScoped<IKafkaMessageProducer<TKey, TValue>>(provider => ActivatorUtilities
.CreateInstance<AlwaysOutboxMessageProducer<TKey, TValue>>(provider, _topicName));
}

_collection.AddPlatformMessagePersistenceHandler(builder => builder
.Called(messageName)
.WithConfiguration(_configuration.GetSection("Outbox"))
.Called(KafkaOutboxMessageName.ForTopic(_topicName))
.WithConfiguration(outboxSection)
.WithKey<TKey>()
.WithValue<TValue>()
.HandleBy<OutboxMessagePersistenceHandler<TKey, TValue>>(
(p, _) => new OutboxMessagePersistenceHandler<TKey, TValue>(_topicName, p)));
(provider, _) => new OutboxMessagePersistenceHandler<TKey, TValue>(_topicName, provider)));

return this;
}
Expand All @@ -147,7 +157,7 @@ public void Build()
_collection.AddKeyedScoped<IKafkaMessageProducer<TKey, TValue>>(
_topicName,
(p, _) => ActivatorUtilities.CreateInstance<KafkaMessageProducer<TKey, TValue>>(p, _topicName));

_collection.TryAddScoped<IKafkaMessageProducer<TKey, TValue>>(
p => ActivatorUtilities.CreateInstance<KafkaMessageProducer<TKey, TValue>>(p, _topicName));
}
Expand Down
11 changes: 11 additions & 0 deletions src/Itmo.Dev.Platform.Kafka/Producer/KafkaOutboxMessageName.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
using System.Collections.Concurrent;

namespace Itmo.Dev.Platform.Kafka.Producer;

internal static class KafkaOutboxMessageName
{
private static readonly ConcurrentDictionary<string, string> Values = [];

public static string ForTopic(string topicName)
=> Values.GetOrAdd(topicName, static topicName => $"_platform_kafka_outbox_{topicName}");
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@

namespace Itmo.Dev.Platform.Kafka.Producer.Outbox;

internal class OutboxMessageProducer<TKey, TValue> : IKafkaMessageProducer<TKey, TValue>
internal class AlwaysOutboxMessageProducer<TKey, TValue> : IKafkaMessageProducer<TKey, TValue>
{
private readonly string _messageName;
private readonly string _topicName;
private readonly IMessagePersistenceConsumer _consumer;

public OutboxMessageProducer(string messageName, IMessagePersistenceConsumer consumer)
public AlwaysOutboxMessageProducer(string topicName, IMessagePersistenceConsumer consumer)
{
_messageName = messageName;
_topicName = topicName;
_consumer = consumer;
}

Expand All @@ -21,6 +21,6 @@ public async Task ProduceAsync(
.Select(x => new PersistedMessage<TKey, TValue>(x.Key, x.Value))
.ToArrayAsync(cancellationToken);

await _consumer.ConsumeAsync(_messageName, persistedMessages, cancellationToken);
await _consumer.ConsumeAsync(KafkaOutboxMessageName.ForTopic(_topicName), persistedMessages, cancellationToken);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
using Itmo.Dev.Platform.MessagePersistence;
using Microsoft.Extensions.DependencyInjection;

namespace Itmo.Dev.Platform.Kafka.Producer.Outbox;

internal class FallbackOutboxMessageProducer<TKey, TValue> : IKafkaMessageProducer<TKey, TValue>
{
private readonly string _topicName;
private readonly IMessagePersistenceConsumer _consumer;
private readonly IKafkaMessageProducer<TKey, TValue> _producer;

public FallbackOutboxMessageProducer(string topicName, IServiceProvider serviceProvider)
{
_topicName = topicName;
_consumer = serviceProvider.GetRequiredService<IMessagePersistenceConsumer>();
_producer = serviceProvider.GetRequiredKeyedService<IKafkaMessageProducer<TKey, TValue>>(topicName);
}

public async Task ProduceAsync(
IAsyncEnumerable<KafkaProducerMessage<TKey, TValue>> messages,
CancellationToken cancellationToken)
{
var messagesArray = await messages.ToArrayAsync(cancellationToken);

try
{
await _producer.ProduceAsync(messagesArray.ToAsyncEnumerable(), cancellationToken);
}
catch
{
var persistedMessages = messagesArray
.Select(x => new PersistedMessage<TKey, TValue>(x.Key, x.Value))
.ToArray();

await _consumer.ConsumeAsync(
KafkaOutboxMessageName.ForTopic(_topicName),
persistedMessages,
cancellationToken);
}
}
}
7 changes: 7 additions & 0 deletions src/Itmo.Dev.Platform.Kafka/Producer/OutboxStrategy.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
namespace Itmo.Dev.Platform.Kafka.Producer;

public enum OutboxStrategy
{
Always,
Fallback,
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@
using FluentAssertions;
using Itmo.Dev.Platform.Kafka.Extensions;
using Itmo.Dev.Platform.Kafka.Producer;
using Itmo.Dev.Platform.Kafka.Producer.Outbox;
using Itmo.Dev.Platform.Kafka.Tests.Extensions;
using Itmo.Dev.Platform.Kafka.Tests.Fixtures;
using Itmo.Dev.Platform.Kafka.Tests.Outbox.Models;
using Itmo.Dev.Platform.Kafka.Tools;
using Itmo.Dev.Platform.MessagePersistence;
using Itmo.Dev.Platform.MessagePersistence.Configuration;
Expand All @@ -14,13 +16,12 @@
using Microsoft.AspNetCore.Mvc.Testing;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Serilog;
using System.Data;
using Xunit;
using Xunit.Abstractions;

namespace Itmo.Dev.Platform.Kafka.Tests;
namespace Itmo.Dev.Platform.Kafka.Tests.Outbox;

#pragma warning disable CA1506

Expand All @@ -44,21 +45,23 @@ public KafkaOutboxTests(KafkaFixture kafkaFixture, ITestOutputHelper output, Kaf

[Theory]
[MemberData(nameof(GetMessages))]
public async Task ProduceAsync_ShouldWriteMessage(int bufferSize, KafkaProducerMessage<int, string>[] messages)
public async Task ProduceAsync_ShouldWriteMessage(KafkaOutboxTestData testData, KafkaOutboxConfigData configData)
{
// Arrange
await using var fixtureScope = _databaseFixture.Scope;

void ConfigureAppConfiguration(IConfigurationBuilder configuration)
{
configuration.AddInMemoryCollection(
new Dictionary<string, string?>
{
["MessagePersistence:SchemaName"] = "message_persistence",
[$"Producer:{nameof(KafkaProducerOptions.Topic)}"] = TopicName,
[$"Producer:Outbox:{nameof(MessagePersistenceHandlerOptions.BatchSize)}"] = bufferSize.ToString(),
[$"Producer:Outbox:{nameof(MessagePersistenceHandlerOptions.PollingDelay)}"] = "00:00:00.500",
});
var dictionary = new Dictionary<string, string?>
{
["MessagePersistence:SchemaName"] = "message_persistence",
[$"Producer:{nameof(KafkaProducerOptions.Topic)}"] = TopicName,
[$"Producer:Outbox:{nameof(MessagePersistenceHandlerOptions.BatchSize)}"] = testData.BufferSizeString,
[$"Producer:Outbox:{nameof(MessagePersistenceHandlerOptions.PollingDelay)}"] = "00:00:00.500",
};

configData.ApplyConfig(dictionary);
configuration.AddInMemoryCollection(dictionary);
}

void ConfigureServices(IServiceCollection collection, IConfiguration configuration)
Expand Down Expand Up @@ -113,15 +116,17 @@ void ConfigureServices(IServiceCollection collection, IConfiguration configurati

consumer.Subscribe(TopicName);

await producer.ProduceAsync(messages.ToAsyncEnumerable(), default);
await producer.ProduceAsync(testData.Messages.ToAsyncEnumerable(), default);

// Act

using var cts = new CancellationTokenSource();
cts.CancelAfter(TimeSpan.FromSeconds(5 * messages.Length));
cts.CancelAfter(TimeSpan.FromSeconds(5 * testData.Messages.Length));

// Assert
var consumedMessages = messages
producer.Should().BeOfType(configData.ExpectedProducerType);

var consumedMessages = testData.Messages
.Select(
_ =>
{
Expand All @@ -134,7 +139,7 @@ void ConfigureServices(IServiceCollection collection, IConfiguration configurati
.Select(x => x.Message)
.ToArray();

consumedMessages.Zip(messages)
consumedMessages.Zip(testData.Messages)
.Should()
.AllSatisfy(
tuple => tuple.First
Expand All @@ -149,66 +154,46 @@ void ConfigureServices(IServiceCollection collection, IConfiguration configurati

var query = SerializedMessageQuery.Build(builder => builder
.WithPageSize(int.MaxValue)
.WithName($"_platform_kafka_outbox_{TopicName}")
.WithName(KafkaOutboxMessageName.ForTopic(TopicName))
.WithCursor(DateTimeOffset.MinValue)
.WithState(MessageState.Completed));

var outboxMessages = await outboxRepository
.QueryAsync(query, default)
.ToArrayAsync(default);

outboxMessages.Should().HaveCount(messages.Length);
if (configData.ShouldWriteOutboxMessages)
{
outboxMessages.Should().HaveCount(testData.Messages.Length);
}
else
{
outboxMessages.Should().BeEmpty();
}

// Dispose
consumer.Close();
}

public static IEnumerable<object[]> GetMessages()
{
yield return
KafkaOutboxConfigData[] configs =
[
1,
new[]
{
new KafkaProducerMessage<int, string>(1, "aboba"),
},
new(null, typeof(AlwaysOutboxMessageProducer<int, string>), true),
new(OutboxStrategy.Always, typeof(AlwaysOutboxMessageProducer<int, string>), true),
new(OutboxStrategy.Fallback, typeof(FallbackOutboxMessageProducer<int, string>), false),
];

yield return
KafkaOutboxTestData[] data =
[
10,
new[]
{
new KafkaProducerMessage<int, string>(1, "aboba"),
},
KafkaOutboxTestData.Single(1),
KafkaOutboxTestData.Single(10),
KafkaOutboxTestData.Many(1, 10),
KafkaOutboxTestData.Many(5, 10),
KafkaOutboxTestData.Many(10, 10),
];

yield return
[
1,
Enumerable
.Range(0, 10)
.Select(i => new KafkaProducerMessage<int, string>(i, i.ToString()))
.ToArray(),
];

yield return
[
5,
Enumerable
.Range(0, 10)
.Select(i => new KafkaProducerMessage<int, string>(i, i.ToString()))
.ToArray(),
];

yield return
[
5,
Enumerable
.Range(0, 10)
.Select(i => new KafkaProducerMessage<int, string>(i, i.ToString()))
.ToArray(),
];
return data.SelectMany(_ => configs, static (data, config) => new object[] { data, config });
}

public async Task InitializeAsync()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
using Itmo.Dev.Platform.Kafka.Producer;

namespace Itmo.Dev.Platform.Kafka.Tests.Outbox.Models;

public record KafkaOutboxConfigData(
OutboxStrategy? OutboxStrategy,
Type ExpectedProducerType,
bool ShouldWriteOutboxMessages)
{
public void ApplyConfig(Dictionary<string, string?> dictionary)
{
if (OutboxStrategy is null)
return;

dictionary["Producer:Outbox:Strategy"] = OutboxStrategy.ToString();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
using Itmo.Dev.Platform.Kafka.Producer;

namespace Itmo.Dev.Platform.Kafka.Tests.Outbox.Models;

public record KafkaOutboxTestData(int BufferSize, KafkaProducerMessage<int, string>[] Messages)
{
public string BufferSizeString => BufferSize.ToString();

public static KafkaOutboxTestData Single(int bufferSize)
=> new(bufferSize, [new KafkaProducerMessage<int, string>(1, "aboba")]);

public static KafkaOutboxTestData Many(int bufferSize, int messageCount)
{
var messages = Enumerable
.Range(0, messageCount)
.Select(i => new KafkaProducerMessage<int, string>(i, i.ToString()))
.ToArray();

return new KafkaOutboxTestData(bufferSize, messages);
}
}

0 comments on commit 8ff0bfc

Please sign in to comment.