Skip to content

Commit

Permalink
fix: Defensively handle null source address in MassTransit instrument…
Browse files Browse the repository at this point in the history
…ation. (#2060)

* Defensively handle null source address

* Also handle sourceAddress with no underscores

Added unit tests

* Refactor MassTransitHelpers and tests to extensions project
  • Loading branch information
nr-ahemsath authored Nov 14, 2023
1 parent 70dfcf9 commit 0249582
Show file tree
Hide file tree
Showing 9 changed files with 171 additions and 160 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright 2020 New Relic, Inc. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

using System;
using NewRelic.Agent.Extensions.Providers.Wrapper;


namespace NewRelic.Agent.Extensions.Helpers
{
public class MassTransitQueueData
{
public string QueueName { get; set; } = "Unknown";
public MessageBrokerDestinationType DestinationType { get; set; } = MessageBrokerDestinationType.Queue;
}

public class MassTransitHelpers
{
public static MassTransitQueueData GetQueueData(Uri sourceAddress)
{
var data = new MassTransitQueueData();

if (sourceAddress != null)
{
// rabbitmq://localhost/SomeHostname_MassTransitTest_bus_iyeoyyge44oc7yijbdp5i1opfd?temporary=true
var items = sourceAddress.AbsoluteUri.Split('_');
if (items.Length > 1)
{
var queueData = items[items.Length - 1].Split('?');
data.QueueName = queueData[0];
if (queueData.Length == 2 && queueData[1] == "temporary=true")
{
data.DestinationType = MessageBrokerDestinationType.TempQueue;
}
}
}
return data;
}
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
using System.Collections.Generic;
using System.Threading.Tasks;
using MassTransit;
using NewRelic.Agent.Api;
using NewRelic.Agent.Extensions.Helpers;
using NewRelic.Agent.Extensions.Providers.Wrapper;
using MethodCall = NewRelic.Agent.Extensions.Providers.Wrapper.MethodCall;

Expand Down Expand Up @@ -38,19 +40,19 @@ public async Task Send(ConsumeContext context, IPipe<ConsumeContext> next)

var mc = new MethodCall(_consumeMethod, context, default(string[]), true);

var destName = MassTransitHelpers.GetQueue(context.SourceAddress);
var queueData = MassTransitHelpers.GetQueueData(context.SourceAddress);

var transaction = _agent.CreateTransaction(
destinationType: MassTransitHelpers.GetBrokerDestinationType(context.SourceAddress),
destinationType: queueData.DestinationType,
brokerVendorName: MessageBrokerVendorName,
destination: destName);
destination: queueData.QueueName);

transaction.AttachToAsync();
transaction.DetachFromPrimary();

transaction.AcceptDistributedTraceHeaders(context.Headers, GetHeaderValue, TransportType.AMQP);

var segment = transaction.StartMessageBrokerSegment(mc, MessageBrokerDestinationType.Queue, MessageBrokerAction.Consume, MessageBrokerVendorName, destName);
var segment = transaction.StartMessageBrokerSegment(mc, MessageBrokerDestinationType.Queue, MessageBrokerAction.Consume, MessageBrokerVendorName, queueData.QueueName);

await next.Send(context);
segment.End();
Expand Down Expand Up @@ -84,12 +86,11 @@ public async Task Send(PublishContext context, IPipe<PublishContext> next)

var mc = new MethodCall(_publishMethod, context, default(string[]), true);

var destName = MassTransitHelpers.GetQueue(context.SourceAddress);
var destType = MassTransitHelpers.GetBrokerDestinationType(context.SourceAddress);
var queueData = MassTransitHelpers.GetQueueData(context.SourceAddress);

var transaction = _agent.CurrentTransaction;
MassTransitHelpers.InsertDistributedTraceHeaders(context.Headers, transaction);
var segment = transaction.StartMessageBrokerSegment(mc, destType, MessageBrokerAction.Produce, MessageBrokerVendorName, destName);
InsertDistributedTraceHeaders(context.Headers, transaction);
var segment = transaction.StartMessageBrokerSegment(mc, queueData.DestinationType, MessageBrokerAction.Produce, MessageBrokerVendorName, queueData.QueueName);

await next.Send(context);
segment.End();
Expand All @@ -102,15 +103,25 @@ public async Task Send(SendContext context, IPipe<SendContext> next)

var mc = new MethodCall(_sendMethod, context, default(string[]), true);

var destName = MassTransitHelpers.GetQueue(context.SourceAddress);
var destType = MassTransitHelpers.GetBrokerDestinationType(context.SourceAddress);
var queueData = MassTransitHelpers.GetQueueData(context.SourceAddress);

var transaction = _agent.CurrentTransaction;
MassTransitHelpers.InsertDistributedTraceHeaders(context.Headers, transaction);
var segment = transaction.StartMessageBrokerSegment(mc, destType, MessageBrokerAction.Produce, MessageBrokerVendorName, destName);
InsertDistributedTraceHeaders(context.Headers, transaction);
var segment = transaction.StartMessageBrokerSegment(mc, queueData.DestinationType, MessageBrokerAction.Produce, MessageBrokerVendorName, queueData.QueueName);

await next.Send(context);
segment.End();
}

public static void InsertDistributedTraceHeaders(SendHeaders headers, ITransaction transaction)
{
var setHeaders = new Action<SendHeaders, string, string>((carrier, key, value) =>
{
carrier.Set(key, value);
});

transaction.InsertDistributedTraceHeaders(headers, setHeaders);
}

}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
using System.Threading.Tasks;
using GreenPipes;
using MassTransit;
using NewRelic.Agent.Api;
using NewRelic.Agent.Extensions.Helpers;
using NewRelic.Agent.Extensions.Providers.Wrapper;
using MethodCall = NewRelic.Agent.Extensions.Providers.Wrapper.MethodCall;

Expand Down Expand Up @@ -39,19 +41,19 @@ public async Task Send(ConsumeContext context, IPipe<ConsumeContext> next)

var mc = new MethodCall(_consumeMethod, context, default(string[]), true);

var destName = MassTransitHelpers.GetQueue(context.SourceAddress);
var queueData = MassTransitHelpers.GetQueueData(context.SourceAddress);

var transaction = _agent.CreateTransaction(
destinationType: MassTransitHelpers.GetBrokerDestinationType(context.SourceAddress),
destinationType: queueData.DestinationType,
brokerVendorName: MessageBrokerVendorName,
destination: destName);
destination: queueData.QueueName);

transaction.AttachToAsync();
transaction.DetachFromPrimary();

transaction.AcceptDistributedTraceHeaders(context.Headers, GetHeaderValue, TransportType.AMQP);

var segment = transaction.StartMessageBrokerSegment(mc, MessageBrokerDestinationType.Queue, MessageBrokerAction.Consume, MessageBrokerVendorName, destName);
var segment = transaction.StartMessageBrokerSegment(mc, MessageBrokerDestinationType.Queue, MessageBrokerAction.Consume, MessageBrokerVendorName, queueData.QueueName);

await next.Send(context);
segment.End();
Expand Down Expand Up @@ -85,12 +87,11 @@ public async Task Send(PublishContext context, IPipe<PublishContext> next)

var mc = new MethodCall(_publishMethod, context, default(string[]), true);

var destName = MassTransitHelpers.GetQueue(context.SourceAddress);
var destType = MassTransitHelpers.GetBrokerDestinationType(context.SourceAddress);
var queueData = MassTransitHelpers.GetQueueData(context.SourceAddress);

var transaction = _agent.CurrentTransaction;
MassTransitHelpers.InsertDistributedTraceHeaders(context.Headers, transaction);
var segment = transaction.StartMessageBrokerSegment(mc, destType, MessageBrokerAction.Produce, MessageBrokerVendorName, destName);
InsertDistributedTraceHeaders(context.Headers, transaction);
var segment = transaction.StartMessageBrokerSegment(mc, queueData.DestinationType, MessageBrokerAction.Produce, MessageBrokerVendorName, queueData.QueueName);

await next.Send(context);
segment.End();
Expand All @@ -103,15 +104,25 @@ public async Task Send(SendContext context, IPipe<SendContext> next)

var mc = new MethodCall(_sendMethod, context, default(string[]), true);

var destName = MassTransitHelpers.GetQueue(context.SourceAddress);
var destType = MassTransitHelpers.GetBrokerDestinationType(context.SourceAddress);
var queueData = MassTransitHelpers.GetQueueData(context.SourceAddress);

var transaction = _agent.CurrentTransaction;
MassTransitHelpers.InsertDistributedTraceHeaders(context.Headers, transaction);
var segment = transaction.StartMessageBrokerSegment(mc, destType, MessageBrokerAction.Produce, MessageBrokerVendorName, destName);
InsertDistributedTraceHeaders(context.Headers, transaction);
var segment = transaction.StartMessageBrokerSegment(mc, queueData.DestinationType, MessageBrokerAction.Produce, MessageBrokerVendorName, queueData.QueueName);

await next.Send(context);
segment.End();
}

public static void InsertDistributedTraceHeaders(SendHeaders headers, ITransaction transaction)
{
var setHeaders = new Action<SendHeaders, string, string>((carrier, key, value) =>
{
carrier.Set(key, value);
});

transaction.InsertDistributedTraceHeaders(headers, setHeaders);
}

}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2020 New Relic, Inc. All rights reserved.
// Copyright 2020 New Relic, Inc. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

using System.Collections.Generic;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright 2020 New Relic, Inc. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

using System;
using NUnit.Framework;
using NewRelic.Agent.Extensions.Providers.Wrapper;
using NewRelic.Agent.Extensions.Helpers;

namespace Agent.Extensions.Tests.Helpers
{
[TestFixture]
public class MassTransitHelperTests
{

[Test]
[TestCase("rabbitmq://localhost/SomeHostname_MassTransitTest_bus_myqueuename?temporary=true", "myqueuename")]
[TestCase("rabbitmq://localhost/SomeHostname_MassTransitTest_bus_myqueuename", "myqueuename")]
[TestCase("rabbitmq://localhost/bogus", "Unknown")]
[TestCase(null, "Unknown")]
public void GetQueueName(Uri uri, string expectedQueueName)
{
// Act
var queueName = MassTransitHelpers.GetQueueData(uri).QueueName;

// Assert
Assert.AreEqual(expectedQueueName, queueName, "Did not get expected queue name");
}

[Test]
[TestCase("rabbitmq://localhost/NRHXPSQL3_MassTransitTest_bus_myqueuename?temporary=true", MessageBrokerDestinationType.TempQueue)]
[TestCase("rabbitmq://localhost/NRHXPSQL3_MassTransitTest_bus_myqueuename?temporary=false", MessageBrokerDestinationType.Queue)]
[TestCase("rabbitmq://localhost/NRHXPSQL3_MassTransitTest_bus_myqueuename", MessageBrokerDestinationType.Queue)]
[TestCase(null, MessageBrokerDestinationType.Queue)]
public void GetBrokerDestinationType(Uri uri, MessageBrokerDestinationType expectedDestType)
{
// Act
var destType = MassTransitHelpers.GetQueueData(uri).DestinationType;

// Assert
Assert.AreEqual(expectedDestType, destType, "Did not get expected queue type");
}
}

}
Loading

0 comments on commit 0249582

Please sign in to comment.