diff --git a/src/Blumchen.DependencyInjection/Configuration/DatabaseOptions.cs b/src/Blumchen.DependencyInjection/Configuration/DatabaseOptions.cs deleted file mode 100644 index 1be0098..0000000 --- a/src/Blumchen.DependencyInjection/Configuration/DatabaseOptions.cs +++ /dev/null @@ -1,2 +0,0 @@ -namespace Blumchen.Configuration; -public record DatabaseOptions(string ConnectionString); diff --git a/src/Blumchen.DependencyInjection/Workers/Worker.cs b/src/Blumchen.DependencyInjection/Workers/Worker.cs index 8e147fc..e8fb7d1 100644 --- a/src/Blumchen.DependencyInjection/Workers/Worker.cs +++ b/src/Blumchen.DependencyInjection/Workers/Worker.cs @@ -1,18 +1,18 @@ using System.Collections.Concurrent; using System.Text.Json.Serialization; -using Blumchen.Configuration; using Blumchen.Serialization; using Blumchen.Subscriptions; using Blumchen.Subscriptions.Management; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; +using Npgsql; using Polly; namespace Blumchen.Workers; public abstract class Worker( - DatabaseOptions databaseOptions, + NpgsqlDataSource dataSource, IHandler handler, JsonSerializerContext jsonSerializerContext, IErrorProcessor errorProcessor, @@ -21,9 +21,8 @@ public abstract class Worker( PublicationManagement.PublicationSetupOptions publicationSetupOptions, ReplicationSlotManagement.ReplicationSlotSetupOptions replicationSlotSetupOptions, Func tableDescriptorBuilder, - ILoggerFactory loggerFactory): BackgroundService where T : class + ILogger logger): BackgroundService where T : class { - private readonly ILogger> _logger = loggerFactory.CreateLogger>(); private string WorkerName { get; } = $"{nameof(Worker)}<{typeof(T).Name}>"; private static readonly ConcurrentDictionary> LoggingActions = new(StringComparer.OrdinalIgnoreCase); private static void Notify(ILogger logger, LogLevel level, string template, params object[] parameters) @@ -33,9 +32,9 @@ static Action LoggerAction(LogLevel ll, bool enabled) { (LogLevel.Information, true) => (logger, template, parameters) => logger.LogInformation(template, parameters), (LogLevel.Debug, true) => (logger, template, parameters) => logger.LogDebug(template, parameters), - (_, _) => (_, __, ___) => { } + (_, _) => (_, _, _) => { } }; - LoggingActions.GetOrAdd(template,s => LoggerAction(level, logger.IsEnabled(level)))(logger, template, parameters); + LoggingActions.GetOrAdd(template,_ => LoggerAction(level, logger.IsEnabled(level)))(logger, template, parameters); } protected override async Task ExecuteAsync(CancellationToken stoppingToken) @@ -45,7 +44,7 @@ await pipeline.ExecuteAsync(async token => await using var subscription = new Subscription(); await using var cursor = subscription.Subscribe(builder => builder - .ConnectionString(databaseOptions.ConnectionString) + .DataSource(dataSource) .WithTable(tableDescriptorBuilder) .WithErrorProcessor(errorProcessor) .Handles>(handler) @@ -53,13 +52,13 @@ await pipeline.ExecuteAsync(async token => .JsonContext(jsonSerializerContext) .WithPublicationOptions(publicationSetupOptions) .WithReplicationOptions(replicationSlotSetupOptions) - , ct: token, loggerFactory: loggerFactory).GetAsyncEnumerator(token); - Notify(_logger, LogLevel.Information,"{WorkerName} started", WorkerName); + , ct: token).GetAsyncEnumerator(token); + Notify(logger, LogLevel.Information,"{WorkerName} started", WorkerName); while (await cursor.MoveNextAsync().ConfigureAwait(false) && !token.IsCancellationRequested) - Notify(_logger, LogLevel.Debug, "{cursor.Current} processed", cursor.Current); + Notify(logger, LogLevel.Debug, "{cursor.Current} processed", cursor.Current); }, stoppingToken).ConfigureAwait(false); - Notify(_logger, LogLevel.Information, "{WorkerName} stopped", WorkerName); + Notify(logger, LogLevel.Information, "{WorkerName} stopped", WorkerName); return; } diff --git a/src/Blumchen/Serialization/ITypeResolver.cs b/src/Blumchen/Serialization/ITypeResolver.cs index 71e49d3..cdddbf4 100644 --- a/src/Blumchen/Serialization/ITypeResolver.cs +++ b/src/Blumchen/Serialization/ITypeResolver.cs @@ -24,8 +24,8 @@ internal sealed class JsonTypeResolver( internal void WhiteList(Type type) { var typeInfo = SerializationContext.GetTypeInfo(type) ?? throw new NotSupportedException(type.FullName); - _typeDictionary.AddOrUpdate(_namingPolicy.Bind(typeInfo.Type), _ => typeInfo.Type, (s,t) =>typeInfo.Type); - _typeInfoDictionary.AddOrUpdate(typeInfo.Type, _ => typeInfo, (_,__)=> typeInfo); + _typeDictionary.AddOrUpdate(_namingPolicy.Bind(typeInfo.Type), _ => typeInfo.Type, (_,_) =>typeInfo.Type); + _typeInfoDictionary.AddOrUpdate(typeInfo.Type, _ => typeInfo, (_,_)=> typeInfo); } public (string, JsonTypeInfo) Resolve(Type type) => diff --git a/src/Blumchen/Subscriptions/ISubscriptionOptions.cs b/src/Blumchen/Subscriptions/ISubscriptionOptions.cs index 79894e1..8b59e25 100644 --- a/src/Blumchen/Subscriptions/ISubscriptionOptions.cs +++ b/src/Blumchen/Subscriptions/ISubscriptionOptions.cs @@ -1,5 +1,6 @@ using Blumchen.Subscriptions.Replication; using JetBrains.Annotations; +using Npgsql; using static Blumchen.Subscriptions.Management.PublicationManagement; using static Blumchen.Subscriptions.Management.ReplicationSlotManagement; @@ -7,14 +8,14 @@ namespace Blumchen.Subscriptions; internal interface ISubscriptionOptions { - [UsedImplicitly] string ConnectionString { get; } + [UsedImplicitly] NpgsqlDataSource DataSource { get; } IReplicationDataMapper DataMapper { get; } [UsedImplicitly] PublicationSetupOptions PublicationOptions { get; } [UsedImplicitly] ReplicationSlotSetupOptions ReplicationOptions { get; } [UsedImplicitly] IErrorProcessor ErrorProcessor { get; } void Deconstruct( - out string connectionString, + out NpgsqlDataSource dataSource, out PublicationSetupOptions publicationSetupOptions, out ReplicationSlotSetupOptions replicationSlotSetupOptions, out IErrorProcessor errorProcessor, @@ -23,7 +24,7 @@ void Deconstruct( } internal record SubscriptionOptions( - string ConnectionString, + NpgsqlDataSource DataSource, PublicationSetupOptions PublicationOptions, ReplicationSlotSetupOptions ReplicationOptions, IErrorProcessor ErrorProcessor, diff --git a/src/Blumchen/Subscriptions/Subscription.cs b/src/Blumchen/Subscriptions/Subscription.cs index 57dd724..c5e81a2 100644 --- a/src/Blumchen/Subscriptions/Subscription.cs +++ b/src/Blumchen/Subscriptions/Subscription.cs @@ -30,22 +30,17 @@ public enum CreateStyle private ISubscriptionOptions? _options; public async IAsyncEnumerable Subscribe( Func builder, - ILoggerFactory? loggerFactory = null, [EnumeratorCancellation] CancellationToken ct = default ) { _options = builder(_builder).Build(); - var (connectionString, publicationSetupOptions, replicationSlotSetupOptions, errorProcessor, replicationDataMapper, registry) = _options; - var dataSourceBuilder = new NpgsqlDataSourceBuilder(connectionString); - dataSourceBuilder.UseLoggerFactory(loggerFactory); - - var dataSource = dataSourceBuilder.Build(); + var (dataSource, publicationSetupOptions, replicationSlotSetupOptions, errorProcessor, replicationDataMapper, registry) = _options; + await dataSource.EnsureTableExists(publicationSetupOptions.TableDescriptor, ct).ConfigureAwait(false); - _connection = new LogicalReplicationConnection(connectionString); + _connection = new LogicalReplicationConnection(dataSource.ConnectionString); await _connection.Open(ct).ConfigureAwait(false); - await dataSource.SetupPublication(publicationSetupOptions, ct).ConfigureAwait(false); var result = await dataSource.SetupReplicationSlot(_connection, replicationSlotSetupOptions, ct).ConfigureAwait(false); diff --git a/src/Blumchen/Subscriptions/SubscriptionOptionsBuilder.cs b/src/Blumchen/Subscriptions/SubscriptionOptionsBuilder.cs index 9b7e7c5..b7948f6 100644 --- a/src/Blumchen/Subscriptions/SubscriptionOptionsBuilder.cs +++ b/src/Blumchen/Subscriptions/SubscriptionOptionsBuilder.cs @@ -2,13 +2,14 @@ using Blumchen.Subscriptions.Management; using Blumchen.Subscriptions.Replication; using JetBrains.Annotations; +using Npgsql; using System.Text.Json.Serialization; namespace Blumchen.Subscriptions; public sealed class SubscriptionOptionsBuilder { - private static string? _connectionString; + private static NpgsqlDataSource? _dataSource; private static PublicationManagement.PublicationSetupOptions _publicationSetupOptions; private static ReplicationSlotManagement.ReplicationSlotSetupOptions? _replicationSlotSetupOptions; private static IReplicationDataMapper? _dataMapper; @@ -22,7 +23,6 @@ public sealed class SubscriptionOptionsBuilder static SubscriptionOptionsBuilder() { - _connectionString = null; _publicationSetupOptions = new(); _replicationSlotSetupOptions = default; _dataMapper = default; @@ -38,9 +38,9 @@ public SubscriptionOptionsBuilder WithTable( } [UsedImplicitly] - public SubscriptionOptionsBuilder ConnectionString(string connectionString) + public SubscriptionOptionsBuilder DataSource(NpgsqlDataSource dataSource) { - _connectionString = connectionString; + _dataSource = dataSource; return this; } @@ -91,7 +91,7 @@ public SubscriptionOptionsBuilder WithErrorProcessor(IErrorProcessor? errorProce internal ISubscriptionOptions Build() { _messageTable ??= TableDescriptorBuilder.Build(); - ArgumentNullException.ThrowIfNull(_connectionString); + ArgumentNullException.ThrowIfNull(_dataSource); ArgumentNullException.ThrowIfNull(_jsonSerializerContext); var typeResolver = new JsonTypeResolver(_jsonSerializerContext, _namingPolicy); @@ -104,7 +104,7 @@ internal ISubscriptionOptions Build() if (_registry.Count == 0)_registry.Add(typeof(object), new ObjectTracingConsumer()); return new SubscriptionOptions( - _connectionString, + _dataSource, _publicationSetupOptions, _replicationSlotSetupOptions ?? new ReplicationSlotManagement.ReplicationSlotSetupOptions(), _errorProcessor ?? new ConsoleOutErrorProcessor(), diff --git a/src/Subscriber/Program.cs b/src/Subscriber/Program.cs index ca08c10..8fd331f 100644 --- a/src/Subscriber/Program.cs +++ b/src/Subscriber/Program.cs @@ -2,6 +2,7 @@ using Blumchen.Subscriptions; using Commons; using Microsoft.Extensions.Logging; +using Npgsql; using Subscriber; #pragma warning disable CS8601 // Possible null reference assignment. @@ -20,9 +21,11 @@ try { + var dataSourceBuilder = new NpgsqlDataSourceBuilder(Settings.ConnectionString) + .UseLoggerFactory(LoggerFactory.Create(builder => builder.AddConsole())); var cursor = subscription.Subscribe( builder => builder - .ConnectionString(Settings.ConnectionString) + .DataSource(dataSourceBuilder.Build()) .WithTable(options => options .Id("id") .MessageType("message_type") @@ -31,7 +34,7 @@ .NamingPolicy(new AttributeNamingPolicy()) .JsonContext(SourceGenerationContext.Default) .Handles(consumer) - .Handles(consumer), LoggerFactory.Create(builder => builder.AddConsole()), ct + .Handles(consumer), ct:ct ).GetAsyncEnumerator(ct); await using var cursor1 = cursor.ConfigureAwait(false); while (await cursor.MoveNextAsync().ConfigureAwait(false) && !ct.IsCancellationRequested); diff --git a/src/SubscriberWorker/Program.cs b/src/SubscriberWorker/Program.cs index 6f224e4..46e5512 100644 --- a/src/SubscriberWorker/Program.cs +++ b/src/SubscriberWorker/Program.cs @@ -1,5 +1,4 @@ using System.Text.Json.Serialization; -using Blumchen.Configuration; using Blumchen.Serialization; using Blumchen.Subscriptions; using Blumchen.Workers; @@ -10,6 +9,7 @@ using Polly.Retry; using Polly; using SubscriberWorker; +using Npgsql; #pragma warning disable CS8601 // Possible null reference assignment. @@ -29,12 +29,13 @@ .AddSingleton, Handler>() .AddBlumchen, UserDeletedContract>() .AddSingleton, Handler>() - + .AddTransient(sp => + new NpgsqlDataSourceBuilder(Settings.ConnectionString) + .UseLoggerFactory(sp.GetRequiredService()).Build()) .AddSingleton() .AddSingleton() .AddSingleton() - .AddSingleton(new DatabaseOptions(Settings.ConnectionString)) - .AddResiliencePipeline("default",(pipelineBuilder,context) => + .AddResiliencePipeline("default",(pipelineBuilder,_) => pipelineBuilder .AddRetry(new RetryStrategyOptions { diff --git a/src/SubscriberWorker/SubscriberWorker.cs b/src/SubscriberWorker/SubscriberWorker.cs index d6c4a67..5298c3e 100644 --- a/src/SubscriberWorker/SubscriberWorker.cs +++ b/src/SubscriberWorker/SubscriberWorker.cs @@ -1,23 +1,23 @@ using System.Text.Json.Serialization; -using Blumchen.Configuration; using Blumchen.Serialization; using Blumchen.Subscriptions; using Blumchen.Subscriptions.Management; using Blumchen.Workers; using Microsoft.Extensions.Logging; +using Npgsql; using Polly.Registry; // ReSharper disable ClassNeverInstantiated.Global namespace SubscriberWorker; public class SubscriberWorker( - DatabaseOptions databaseOptions, + NpgsqlDataSource dataSource, IHandler handler, JsonSerializerContext jsonSerializerContext, ResiliencePipelineProvider pipelineProvider, INamingPolicy namingPolicy, IErrorProcessor errorProcessor, - ILoggerFactory loggerFactory -): Worker(databaseOptions + ILogger logger +): Worker(dataSource , handler , jsonSerializerContext , errorProcessor @@ -26,4 +26,4 @@ ILoggerFactory loggerFactory , new PublicationManagement.PublicationSetupOptions($"{typeof(T).Name}_pub") , new ReplicationSlotManagement.ReplicationSlotSetupOptions($"{typeof(T).Name}_slot") , tableDescriptorBuilder => tableDescriptorBuilder.UseDefaults() - , loggerFactory) where T : class; + , logger) where T : class; diff --git a/src/Tests/DatabaseFixture.cs b/src/Tests/DatabaseFixture.cs index 9bc2e9a..f62c79a 100644 --- a/src/Tests/DatabaseFixture.cs +++ b/src/Tests/DatabaseFixture.cs @@ -81,7 +81,7 @@ protected static async Task InsertPoisoningMessage(string connectionString, stri var consumer = new TestHandler(log, jsonTypeInfo); var subscriptionOptionsBuilder = new SubscriptionOptionsBuilder() .WithErrorProcessor(new TestOutErrorProcessor(Output)) - .ConnectionString(connectionString) + .DataSource(new NpgsqlDataSourceBuilder(connectionString).Build()) .JsonContext(info) .NamingPolicy(namingPolicy) .Handles>(consumer) diff --git a/src/Tests/When_Subscription_Already_Exists.cs b/src/Tests/When_Subscription_Already_Exists.cs index 4575b69..71b78c9 100644 --- a/src/Tests/When_Subscription_Already_Exists.cs +++ b/src/Tests/When_Subscription_Already_Exists.cs @@ -46,7 +46,7 @@ await MessageAppender.AppendAsync( var subscription = new Subscription(); await using var subscription1 = subscription.ConfigureAwait(false); - await foreach (var envelope in subscription.Subscribe(_ => subscriptionOptions, null, ct).ConfigureAwait(false)) + await foreach (var envelope in subscription.Subscribe(_ => subscriptionOptions, ct).ConfigureAwait(false)) { Assert.Equal(@expected, ((OkEnvelope)envelope).Value); return; diff --git a/src/Tests/When_Subscription_Does_Not_Exist_And_Table_Is_Empty.cs b/src/Tests/When_Subscription_Does_Not_Exist_And_Table_Is_Empty.cs index 71e78e4..3a52e23 100644 --- a/src/Tests/When_Subscription_Does_Not_Exist_And_Table_Is_Empty.cs +++ b/src/Tests/When_Subscription_Does_Not_Exist_And_Table_Is_Empty.cs @@ -38,7 +38,7 @@ public async Task Read_from_table_using_named_transaction_snapshot() SubscriberContext.Default, sharedNamingPolicy, Output.WriteLine); var subscription = new Subscription(); await using var subscription1 = subscription.ConfigureAwait(false); - await foreach (var envelope in subscription.Subscribe(_ => subscriptionOptions, null, ct).ConfigureAwait(false)) + await foreach (var envelope in subscription.Subscribe(_ => subscriptionOptions, ct).ConfigureAwait(false)) { Assert.Equal(@expected, ((OkEnvelope)envelope).Value); return; diff --git a/src/Tests/When_Subscription_Does_Not_Exist_And_Table_Is_Not_Empty.cs b/src/Tests/When_Subscription_Does_Not_Exist_And_Table_Is_Not_Empty.cs index 6422396..d1029fb 100644 --- a/src/Tests/When_Subscription_Does_Not_Exist_And_Table_Is_Not_Empty.cs +++ b/src/Tests/When_Subscription_Does_Not_Exist_And_Table_Is_Not_Empty.cs @@ -40,7 +40,7 @@ public async Task Read_from_table_using_named_transaction_snapshot() var subscription = new Subscription(); await using var subscription1 = subscription.ConfigureAwait(false); - await foreach (var envelope in subscription.Subscribe(_ => subscriptionOptions, null, ct).ConfigureAwait(false)) + await foreach (var envelope in subscription.Subscribe(_ => subscriptionOptions, ct).ConfigureAwait(false)) { Assert.Equal(@expected, ((OkEnvelope)envelope).Value); return;