Skip to content

Commit

Permalink
Error in Identity Deletion Jobs when using Google Cloud Pub/Sub (#838)
Browse files Browse the repository at this point in the history
* fix: catch exceptions thrown during disposal of DefaultGoogleCloudPubSubPersisterConnection

* test: update test

---------

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
tnotheis and mergify[bot] authored Sep 3, 2024
1 parent 4f237f3 commit 9c6d8ab
Show file tree
Hide file tree
Showing 3 changed files with 125 additions and 101 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,19 @@
using Google.Api.Gax;
using Google.Apis.Auth.OAuth2;
using Google.Cloud.PubSub.V1;
using Microsoft.Extensions.Logging;

namespace Backbone.BuildingBlocks.Infrastructure.EventBus.GoogleCloudPubSub;

public class DefaultGoogleCloudPubSubPersisterConnection : IGoogleCloudPubSubPersisterConnection
{
private readonly ILogger<DefaultGoogleCloudPubSubPersisterConnection> _logger;
private bool _disposed;

public DefaultGoogleCloudPubSubPersisterConnection(string projectId, string topicId,
public DefaultGoogleCloudPubSubPersisterConnection(ILogger<DefaultGoogleCloudPubSubPersisterConnection> logger, string projectId, string topicId,
string subscriptionName, string connectionInfo)
{
_logger = logger;
var topicName = TopicName.FromProjectTopic(projectId, topicId);
var gcpCredentials = connectionInfo.IsEmpty() ? GoogleCredential.GetApplicationDefault() : GoogleCredential.FromJson(connectionInfo);

Expand Down Expand Up @@ -39,8 +42,26 @@ public void Dispose()
if (_disposed) return;

_disposed = true;
PublisherClient.ShutdownAsync(CancellationToken.None).GetAwaiter().GetResult();

SubscriberClient.StopAsync(CancellationToken.None).GetAwaiter().GetResult();
try
{
PublisherClient.ShutdownAsync(CancellationToken.None).GetAwaiter().GetResult();
}
catch (Exception ex)
{
_logger.LogError(ex, "An error occurred while shutting down the publisher client.");
}

try
{
SubscriberClient.StopAsync(CancellationToken.None).GetAwaiter().GetResult();
}
catch (Exception ex)
{
if (ex.Message != "Can only stop a started instance.")
throw;

_logger.LogError(ex, "An error occurred while stopping the subscriber client.");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@ public static void AddGoogleCloudPubSub(this IServiceCollection services, Action

services.AddSingleton<IEventBusSubscriptionsManager, InMemoryEventBusSubscriptionsManager>();

services.AddSingleton<IGoogleCloudPubSubPersisterConnection>(
new DefaultGoogleCloudPubSubPersisterConnection(options.ProjectId, options.TopicName, options.SubscriptionClientName, options.ConnectionInfo));
services.AddSingleton<IGoogleCloudPubSubPersisterConnection>(sp =>
new DefaultGoogleCloudPubSubPersisterConnection(sp.GetRequiredService<ILogger<DefaultGoogleCloudPubSubPersisterConnection>>(), options.ProjectId, options.TopicName,
options.SubscriptionClientName, options.ConnectionInfo));

services.AddSingleton<IEventBus, EventBusGoogleCloudPubSub>(sp =>
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@
using Backbone.Tooling.Extensions;
using Backbone.UnitTestTools.BaseClasses;
using Divergic.Logging.Xunit;
using FakeItEasy;
using FluentAssertions;
using Google.Api.Gax;
using Google.Apis.Auth.OAuth2;
using Google.Cloud.PubSub.V1;
using Grpc.Core;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Xunit;
using Xunit.Abstractions;

Expand All @@ -27,17 +29,6 @@ public GoogleCloudPubSubTests(ITestOutputHelper output)
_factory = new EventBusFactory(output);
}

public override void Dispose()
{
_factory.Dispose();

TestEvent1DomainEventHandler1.Instances.Clear();
TestEvent1DomainEventHandler2.Instances.Clear();
TestEvent2DomainEventHandler.Instances.Clear();

base.Dispose();
}

[Fact(Skip = "No valid emulator for GCP")]
public async Task One_subscriber_for_one_event()
{
Expand Down Expand Up @@ -131,109 +122,120 @@ public async Task The_correct_event_handler_is_called_when_multiple_subscription
TestEvent1DomainEventHandler1.ShouldEventuallyHaveOneTriggeredInstance();
TestEvent1DomainEventHandler2.ShouldNotHaveAnyTriggeredInstance();
}
}

public class EventBusFactory : IDisposable
{
public record Instance(
AutofacServiceProvider AutofacServiceProviders,
EventBusGoogleCloudPubSub EventBusClient,
DefaultGoogleCloudPubSubPersisterConnection PersisterConnection);
public override void Dispose()
{
_factory.Dispose();

TestEvent1DomainEventHandler1.Instances.Clear();
TestEvent1DomainEventHandler2.Instances.Clear();
TestEvent2DomainEventHandler.Instances.Clear();

public const string PROJECT_ID = "nbp-nmshd-bkb";
public const string TOPIC_NAME = "test-topic";
public const string SUBSCRIPTION_NAME_PREFIX = "subscription1";
base.Dispose();
}

private readonly ICacheLogger<EventBusGoogleCloudPubSub> _logger;
private class EventBusFactory : IDisposable
{
private record Instance(
AutofacServiceProvider AutofacServiceProviders,
EventBusGoogleCloudPubSub EventBusClient,
DefaultGoogleCloudPubSubPersisterConnection PersisterConnection);

public const string CONNECTION_INFO = "";
private const string PROJECT_ID = "nbp-nmshd-bkb";
private const string TOPIC_NAME = "test-topic";
private const string SUBSCRIPTION_NAME_PREFIX = "subscription1";

private readonly List<Instance> _instances = [];
private readonly ICacheLogger<EventBusGoogleCloudPubSub> _logger;

public EventBusFactory(ITestOutputHelper output)
{
_logger = output.BuildLoggerFor<EventBusGoogleCloudPubSub>();
}
public const string CONNECTION_INFO = "";

public EventBusGoogleCloudPubSub CreateEventBus(string subscriptionNamePrefix = SUBSCRIPTION_NAME_PREFIX)
{
var builder = new ContainerBuilder();
builder.RegisterType<TestEvent1DomainEventHandler1>();
builder.RegisterType<TestEvent1DomainEventHandler2>();

var autofacServiceProvider = new AutofacServiceProvider(builder.Build());
var lifeTimeScope = autofacServiceProvider.GetRequiredService<ILifetimeScope>();
var eventBusSubscriptionsManager = new InMemoryEventBusSubscriptionsManager();
var persisterConnection = new DefaultGoogleCloudPubSubPersisterConnection(PROJECT_ID, TOPIC_NAME,
subscriptionNamePrefix, CONNECTION_INFO);
var eventBusClient = new EventBusGoogleCloudPubSub(
persisterConnection,
_logger,
eventBusSubscriptionsManager,
lifeTimeScope,
new HandlerRetryBehavior { NumberOfRetries = 5, MinimumBackoff = 2, MaximumBackoff = 120 });

var instance = new Instance(autofacServiceProvider, eventBusClient, persisterConnection);
_instances.Add(instance);

return eventBusClient;
}
private readonly List<Instance> _instances = [];

public void Dispose()
{
_logger.Dispose();
public EventBusFactory(ITestOutputHelper output)
{
_logger = output.BuildLoggerFor<EventBusGoogleCloudPubSub>();
}

foreach (var instance in _instances)
public EventBusGoogleCloudPubSub CreateEventBus(string subscriptionNamePrefix = SUBSCRIPTION_NAME_PREFIX)
{
instance.AutofacServiceProviders.Dispose();
instance.EventBusClient.Dispose();
instance.PersisterConnection.Dispose();
var builder = new ContainerBuilder();
builder.RegisterType<TestEvent1DomainEventHandler1>();
builder.RegisterType<TestEvent1DomainEventHandler2>();

var autofacServiceProvider = new AutofacServiceProvider(builder.Build());
var lifeTimeScope = autofacServiceProvider.GetRequiredService<ILifetimeScope>();
var eventBusSubscriptionsManager = new InMemoryEventBusSubscriptionsManager();
var persisterConnection = new DefaultGoogleCloudPubSubPersisterConnection(A.Dummy<ILogger<DefaultGoogleCloudPubSubPersisterConnection>>(), PROJECT_ID, TOPIC_NAME,
subscriptionNamePrefix, CONNECTION_INFO);
var eventBusClient = new EventBusGoogleCloudPubSub(
persisterConnection,
_logger,
eventBusSubscriptionsManager,
lifeTimeScope,
new HandlerRetryBehavior { NumberOfRetries = 5, MinimumBackoff = 2, MaximumBackoff = 120 });

var instance = new Instance(autofacServiceProvider, eventBusClient, persisterConnection);
_instances.Add(instance);

return eventBusClient;
}

CleanupTestSubscriptions();
}
public void Dispose()
{
_logger.Dispose();

public void CleanupTestSubscriptions()
{
var gcpCredentials = CONNECTION_INFO.IsEmpty() ? null : GoogleCredential.FromJson(CONNECTION_INFO);
foreach (var instance in _instances)
{
instance.AutofacServiceProviders.Dispose();
instance.EventBusClient.Dispose();
instance.PersisterConnection.Dispose();
}

var subscriberServiceApiClient = new SubscriberServiceApiClientBuilder
{
GoogleCredential = gcpCredentials,
EmulatorDetection = EmulatorDetection.EmulatorOrProduction
}.Build();

CleanupSubscription(
subscriberServiceApiClient,
SubscriptionName.FromProjectSubscription(PROJECT_ID, "subscription1-TestEvent1")
);

CleanupSubscription(
subscriberServiceApiClient,
SubscriptionName.FromProjectSubscription(PROJECT_ID, "subscription1-TestEvent2")
);

CleanupSubscription(
subscriberServiceApiClient,
SubscriptionName.FromProjectSubscription(PROJECT_ID, "subscription2-TestEvent1")
);

CleanupSubscription(
subscriberServiceApiClient,
SubscriptionName.FromProjectSubscription(PROJECT_ID, "subscription2-TestEvent2")
);
}
CleanupTestSubscriptions();
}

public static void CleanupSubscription(SubscriberServiceApiClient subscriberServiceApiClient, SubscriptionName subscriptionName)
{
try
private void CleanupTestSubscriptions()
{
subscriberServiceApiClient.GetSubscription(subscriptionName);
subscriberServiceApiClient.DeleteSubscription(subscriptionName);
var gcpCredentials = CONNECTION_INFO.IsEmpty() ? null : GoogleCredential.FromJson(CONNECTION_INFO);

var subscriberServiceApiClient = new SubscriberServiceApiClientBuilder
{
GoogleCredential = gcpCredentials,
EmulatorDetection = EmulatorDetection.EmulatorOrProduction
}.Build();

CleanupSubscription(
subscriberServiceApiClient,
SubscriptionName.FromProjectSubscription(PROJECT_ID, "subscription1-TestEvent1")
);

CleanupSubscription(
subscriberServiceApiClient,
SubscriptionName.FromProjectSubscription(PROJECT_ID, "subscription1-TestEvent2")
);

CleanupSubscription(
subscriberServiceApiClient,
SubscriptionName.FromProjectSubscription(PROJECT_ID, "subscription2-TestEvent1")
);

CleanupSubscription(
subscriberServiceApiClient,
SubscriptionName.FromProjectSubscription(PROJECT_ID, "subscription2-TestEvent2")
);
}
catch (RpcException ex)

private static void CleanupSubscription(SubscriberServiceApiClient subscriberServiceApiClient, SubscriptionName subscriptionName)
{
if (ex.Status.StatusCode != StatusCode.NotFound) throw;
try
{
subscriberServiceApiClient.GetSubscription(subscriptionName);
subscriberServiceApiClient.DeleteSubscription(subscriptionName);
}
catch (RpcException ex)
{
if (ex.Status.StatusCode != StatusCode.NotFound) throw;
}
}
}
}

0 comments on commit 9c6d8ab

Please sign in to comment.