Skip to content

Commit

Permalink
test: remove eventgrid from self-contained eventhubs integration tests (
Browse files Browse the repository at this point in the history
#451)

* test: remove eventgrid from self-contained eventhubs integration tests

* pr-fix: remove using references

* pr-fix: remove unnecessary message filter

* pr-fix: add additional message body filter
  • Loading branch information
stijnmoreels authored Sep 10, 2024
1 parent 9196498 commit cd6d613
Show file tree
Hide file tree
Showing 16 changed files with 693 additions and 601 deletions.
11 changes: 11 additions & 0 deletions src/Arcus.Messaging.Tests.Core/Events/v1/SensorReadEventData.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
using Arcus.Messaging.Abstractions;

namespace Arcus.Messaging.Tests.Core.Events.v1
{
public class SensorReadEventData
{
public string SensorId { get; set; }
public double SensorValue { get; set; }
public MessageCorrelationInfo CorrelationInfo { get; set; }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
namespace Arcus.Messaging.Tests.Core.Messages.v1
{
public class SensorReadingBatch
{
public SensorReading[] Readings { get; set; }
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
using System;
using Arcus.Messaging.Abstractions.ServiceBus.MessageHandling;
using Arcus.Messaging.Pumps.ServiceBus;
using Azure;
using Azure.Messaging.EventGrid;
using GuardNet;
using Microsoft.Extensions.Azure;
using Microsoft.Extensions.DependencyInjection;

// ReSharper disable once CheckNamespace
Expand All @@ -15,28 +12,7 @@ namespace Arcus.Messaging.Tests.Integration.Fixture
/// </summary>
public static class WorkerOptionsExtensions
{
/// <summary>
/// Adds an <see cref="EventGridPublisherClient"/> instance to the <paramref name="options"/>.
/// </summary>
/// <param name="options">The options to add the publisher to.</param>
/// <param name="config">The test configuration which will be used to retrieve the Azure Event Grid authentication information.</param>
/// <exception cref="ArgumentNullException">Thrown when the <paramref name="options"/> or the <paramref name="config"/> is <c>null</c>.</exception>
public static WorkerOptions AddEventGridPublisher(this WorkerOptions options, TestConfig config)
{
Guard.NotNull(options, nameof(options), "Requires a set of worker options to add the Azure Event Grid publisher to");
Guard.NotNull(config, nameof(config), "Requires a test configuration instance to retrieve the Azure Event Grid authentication inforation");

options.Services.AddAzureClients(clients =>
{
string topicEndpoint = config.GetTestInfraEventGridTopicUri();
string authenticationKey = config.GetTestInfraEventGridAuthKey();
clients.AddEventGridPublisherClient(new Uri(topicEndpoint), new AzureKeyCredential(authenticationKey));
});

return options;
}

/// <summary>
/// <summary>
/// Adds a message pump to consume messages from Azure Service Bus Topic.
/// </summary>
/// <remarks>
Expand Down
22 changes: 0 additions & 22 deletions src/Arcus.Messaging.Tests.Integration/Fixture/TestConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,28 +39,6 @@ public static TestConfig Create()
return new TestConfig(config);
}

/// <summary>
/// Gets the EventGrid topic URI for the test infrastructure.
/// </summary>
public string GetTestInfraEventGridTopicUri()
{
var value = _config.GetValue<string>("Arcus:Infra:EventGrid:TopicUri");
Guard.NotNullOrWhitespace(value, "No non-blank EventGrid topic URI was found for the test infrastructure in the application configuration");

return value;
}

/// <summary>
/// Gets the EventGrid authentication key for the test infrastructure.
/// </summary>
public string GetTestInfraEventGridAuthKey()
{
var value = _config.GetValue<string>("Arcus:Infra:EventGrid:AuthKey");
Guard.NotNullOrWhitespace(value, "No non-blank EventGrid authentication key was found for the test infrastructure in the application configuration");

return value;
}

public string GetServiceBusTopicConnectionString()
{
return GetServiceBusConnectionString(ServiceBusEntityType.Topic);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Arcus.Messaging.Pumps.Abstractions;
using Arcus.Messaging.Pumps.EventHubs;
using Arcus.Messaging.Tests.Core.Correlation;
using Arcus.Messaging.Tests.Core.Events.v1;
using Arcus.Messaging.Tests.Core.Messages.v1;
using Arcus.Messaging.Tests.Integration.Fixture;
using Arcus.Messaging.Tests.Integration.MessagePump.EventHubs;
using Arcus.Messaging.Tests.Integration.MessagePump.ServiceBus;
using Arcus.Messaging.Tests.Workers.EventHubs.Core.MessageHandlers;
using Azure.Messaging.EventHubs;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Xunit;

namespace Arcus.Messaging.Tests.Integration.MessagePump
{
public partial class EventHubsMessagePumpTests
{
[Fact]
public async Task EventHubsMessagePumpUsingManagedIdentity_PublishesMessage_MessageSuccessfullyProcessed()
{
using var auth = TemporaryManagedIdentityConnection.Create(_config, _logger);
await TestEventHubsMessageHandlingAsync(options =>
{
options.AddEventHubsMessagePumpUsingManagedIdentity(
eventHubsName: EventHubsName,
fullyQualifiedNamespace: FullyQualifiedEventHubsNamespace,
blobContainerUri: _blobStorageContainer.ContainerUri,
clientId: auth.ClientId)
.WithEventHubsMessageHandler<WriteSensorToDiskEventHubsMessageHandler, SensorReading>();
});
}

[Fact]
public async Task RestartedEventHubsMessagePump_PublishMessage_MessageSuccessfullyProcessed()
{
// Arrange
var options = new WorkerOptions();
AddEventHubsMessagePump(options)
.WithEventHubsMessageHandler<WriteSensorToDiskEventHubsMessageHandler, SensorReading>();

EventData expected = CreateSensorEventDataForW3C();
TestEventHubsMessageProducer producer = CreateEventHubsMessageProducer();

await using var worker = await Worker.StartNewAsync(options);

IEnumerable<AzureEventHubsMessagePump> messagePumps =
worker.Services.GetServices<IHostedService>()
.OfType<AzureEventHubsMessagePump>();

AzureEventHubsMessagePump messagePump = Assert.Single(messagePumps);
Assert.NotNull(messagePump);

await messagePump.RestartAsync(CancellationToken.None);

// Act
await producer.ProduceAsync(expected);

// Assert
SensorReadEventData actual = await DiskMessageEventConsumer.ConsumeSensorReadAsync(expected.MessageId);
AssertReceivedSensorEventDataForW3C(expected, actual);
}

[Fact]
public async Task EventHubsMessagePump_PausesViaLifetime_RestartsAgain()
{
// Arrange
string jobId = Guid.NewGuid().ToString();
var options = new WorkerOptions();
AddEventHubsMessagePump(options, opt => opt.JobId = jobId)
.WithEventHubsMessageHandler<WriteSensorToDiskEventHubsMessageHandler, SensorReading>();

EventData expected = CreateSensorEventDataForW3C();
TestEventHubsMessageProducer producer = CreateEventHubsMessageProducer();

await using var worker = await Worker.StartNewAsync(options);

var lifetime = worker.Services.GetRequiredService<IMessagePumpLifetime>();
await lifetime.PauseProcessingMessagesAsync(jobId, TimeSpan.FromSeconds(5), CancellationToken.None);

// Act
await producer.ProduceAsync(expected);

// Assert
SensorReadEventData actual = await DiskMessageEventConsumer.ConsumeSensorReadAsync(expected.MessageId);
AssertReceivedSensorEventDataForW3C(expected, actual);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
using System;
using System.Threading.Tasks;
using Arcus.Messaging.Abstractions;
using Arcus.Messaging.Abstractions.EventHubs;
using Arcus.Messaging.Abstractions.EventHubs.MessageHandling;
using Arcus.Messaging.Tests.Core.Messages.v1;
using Arcus.Messaging.Tests.Workers.EventHubs.Core.MessageBodySerializers;
using Arcus.Messaging.Tests.Workers.EventHubs.Core.MessageHandlers;
using Arcus.Messaging.Tests.Workers.MessageBodyHandlers;
using Arcus.Messaging.Tests.Workers.MessageHandlers;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Xunit;

namespace Arcus.Messaging.Tests.Integration.MessagePump
{
public partial class EventHubsMessagePumpTests
{
[Fact]
public async Task EventHubsMessagePumpWithMessageContextFilter_PublishesMessage_MessageSuccessfullyProcessed()
{
await TestEventHubsMessageHandlingAsync(options =>
{
AddEventHubsMessagePump(options)
.WithEventHubsMessageHandler<TestEventHubsMessageHandler<SensorReading>, SensorReading>(messageContextFilter: _ => false)
.WithEventHubsMessageHandler<WriteSensorToDiskEventHubsMessageHandler, SensorReading>();
});
}

[Fact]
public async Task EventHubsMessagePumpWithMessageFilter_PublishesMessage_MessageSuccessfullyProcessed()
{
await TestEventHubsMessageHandlingAsync(options =>
{
AddEventHubsMessagePump(options)
.WithEventHubsMessageHandler<TestEventHubsMessageHandler<SensorReading>, SensorReading>(messageBodyFilter: _ => false)
.WithEventHubsMessageHandler<WriteSensorToDiskEventHubsMessageHandler, SensorReading>();
});
}

[Fact]
public async Task EventHubsMessagePumpWithDifferentMessageType_PublishesMessage_MessageSuccessfullyProcessed()
{
await TestEventHubsMessageHandlingAsync(options =>
{
AddEventHubsMessagePump(options)
.WithEventHubsMessageHandler<TestEventHubsMessageHandler<Shipment>, Shipment>()
.WithEventHubsMessageHandler<WriteSensorToDiskEventHubsMessageHandler, SensorReading>();
});
}

[Fact]
public async Task EventHubsMessagePumpWithMessageBodySerializer_PublishesMessage_MessageSuccessfullyProcessed()
{
await TestEventHubsMessageHandlingAsync(options =>
{
AddEventHubsMessagePump(options)
.WithEventHubsMessageHandler<SensorReadingBatchEventHubsMessageHandler, SensorReadingBatch>(
messageBodySerializerImplementationFactory: provider =>
{
var logger = provider.GetRequiredService<ILogger<SensorReadingBatchBodySerializer>>();
return new SensorReadingBatchBodySerializer(logger);
});
});
}

[Fact]
public async Task EventHubsMessagePumpWithFallback_PublishesMessage_MessageSuccessfullyProcessed()
{
await TestEventHubsMessageHandlingAsync(options =>
{
AddEventHubsMessagePump(options)
.WithEventHubsMessageHandler<TestEventHubsMessageHandler<Shipment>, Shipment>()
.WithFallbackMessageHandler<SabotageEventHubsFallbackMessageHandler, AzureEventHubsMessageContext>()
.WithFallbackMessageHandler<WriteSensorToDiskEventHubsMessageHandler>();
});
}

[Fact]
public async Task EventHubsMessagePumpWithAllFiltersAndOptions_PublishesMessage_MessageSuccessfullyProcessed()
{
await TestEventHubsMessageHandlingAsync(options =>
{
AddEventHubsMessagePump(options)
.WithEventHubsMessageHandler<TestEventHubsMessageHandler<Shipment>, Shipment>()
.WithEventHubsMessageHandler<TestEventHubsMessageHandler<SensorReading>, SensorReading>(messageBodyFilter: _ => false)
.WithEventHubsMessageHandler<TestEventHubsMessageHandler<SensorReading>, SensorReading>(messageContextFilter: _ => false)
.WithEventHubsMessageHandler<WriteSensorToDiskEventHubsMessageHandler, SensorReading>(
messageContextFilter: context => context.ConsumerGroup == "$Default"
&& context.EventHubsName == EventHubsName
&& context.EventHubsNamespace == FullyQualifiedEventHubsNamespace,
messageBodySerializerImplementationFactory: provider =>
{
var logger = provider.GetService<ILogger<OrderBatchMessageBodySerializer>>();
return new OrderBatchMessageBodySerializer(logger);
},
messageBodyFilter: order => Guid.TryParse(order.SensorId, out Guid _),
messageHandlerImplementationFactory: provider =>
{
return new WriteSensorToDiskEventHubsMessageHandler(
provider.GetRequiredService<IMessageCorrelationInfoAccessor>(),
provider.GetRequiredService<ILogger<WriteSensorToDiskEventHubsMessageHandler>>());
});
});
}

[Fact]
public async Task EventHubsMessagePumpWithoutSameJobId_PublishesMessage_MessageFailsToBeProcessed()
{
await Assert.ThrowsAsync<TimeoutException>(() =>
{
return TestEventHubsMessageHandlingAsync(options =>
{
EventHubsMessageHandlerCollection collection = AddEventHubsMessagePump(options);
Assert.False(string.IsNullOrWhiteSpace(collection.JobId));

var otherCollection = new EventHubsMessageHandlerCollection(new ServiceCollection())
{
JobId = Guid.NewGuid().ToString()
};

otherCollection.WithEventHubsMessageHandler<TestEventHubsMessageHandler<SensorReading>, SensorReading>(messageContextFilter: _ => false)
.WithEventHubsMessageHandler<WriteSensorToDiskEventHubsMessageHandler, SensorReading>();
});
});
}
}
}
Loading

0 comments on commit cd6d613

Please sign in to comment.