Skip to content

Commit

Permalink
Update metrics to remove the unit string and make compliant / superse… (
Browse files Browse the repository at this point in the history
#2173)

* Update metrics to remove the unit string and make compliant / superset of 2.8 metrics
* Only updates legacy counters when connected to iot hub
  • Loading branch information
marcschier authored Feb 2, 2024
1 parent ffe907f commit bbe3d12
Show file tree
Hide file tree
Showing 8 changed files with 156 additions and 131 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -547,29 +547,29 @@ private void Drop(IEnumerable<IOpcUaSubscriptionNotification> messages)
private void InitializeMetrics(IMetricsContext metrics)
{
_meter.CreateObservableCounter("iiot_edge_publisher_encoded_notifications",
() => new Measurement<long>(NotificationsProcessedCount, metrics.TagList), "Notifications",
"Number of successfully processed subscription notifications received from OPC client.");
() => new Measurement<long>(NotificationsProcessedCount, metrics.TagList),
description: "Number of successfully processed subscription notifications received from OPC client.");
_meter.CreateObservableCounter("iiot_edge_publisher_dropped_notifications",
() => new Measurement<long>(NotificationsDroppedCount, metrics.TagList), "Notifications",
"Number of incoming subscription notifications that are too big to be processed based " +
() => new Measurement<long>(NotificationsDroppedCount, metrics.TagList),
description: "Number of incoming subscription notifications that are too big to be processed based " +
"on the message size limits or other issues with the notification.");
_meter.CreateObservableCounter("iiot_edge_publisher_processed_messages",
() => new Measurement<long>(MessagesProcessedCount, metrics.TagList), "Messages",
"Number of successfully generated messages that are to be sent using the message sender");
() => new Measurement<long>(MessagesProcessedCount, metrics.TagList),
description: "Number of successfully generated messages that are to be sent using the message sender");
_meter.CreateObservableGauge("iiot_edge_publisher_notifications_per_message_average",
() => new Measurement<double>(AvgNotificationsPerMessage, metrics.TagList), "Notifications/Message",
"Average subscription notifications packed into a message");
() => new Measurement<double>(AvgNotificationsPerMessage, metrics.TagList),
description: "Average subscription notifications packed into a message");
_meter.CreateObservableGauge("iiot_edge_publisher_encoded_message_size_average",
() => new Measurement<double>(AvgMessageSize, metrics.TagList), "Bytes",
"Average size of a message through the lifetime of the encoder.");
() => new Measurement<double>(AvgMessageSize, metrics.TagList),
description: "Average size of a message through the lifetime of the encoder.");
_meter.CreateObservableGauge("iiot_edge_publisher_chunk_size_average",
() => new Measurement<double>(AvgMessageSize / (4 * 1024), metrics.TagList), "4kb Chunks",
"IoT Hub chunk size average");
() => new Measurement<double>(AvgMessageSize / (4 * 1024), metrics.TagList),
description: "IoT Hub chunk size average");
_meter.CreateObservableGauge("iiot_edge_publisher_message_split_ratio_max",
() => new Measurement<double>(MaxMessageSplitRatio, metrics.TagList), "Splits",
"The message split ration specifies into how many messages a subscription notification had to be split. " +
"Less is better for performance. If the number is large user should attempt to limit the number of " +
"notifications in a message using configuration.");
() => new Measurement<double>(MaxMessageSplitRatio, metrics.TagList),
description: "The message split ration specifies into how many messages a subscription notification had " +
"to be split. Less is better for performance. If the number is large user should attempt to limit the " +
"number of notifications in a message using configuration.");
}

private static readonly ConfigurationVersionDataType kEmptyConfiguration = new() { MajorVersion = 1u };
Expand Down
62 changes: 42 additions & 20 deletions src/Azure.IIoT.OpcUa.Publisher/src/Services/NetworkMessageSink.cs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ public NetworkMessageSink(WriterGroupModel writerGroup,
options.Value.DefaultTransport?.ToString(),
StringComparison.OrdinalIgnoreCase))
?? registered[0];

_isIoTEdge = _eventClient.Name.Equals(nameof(WriterGroupTransport.IoTHub),
StringComparison.OrdinalIgnoreCase);
_messageEncoder = encoder;
_logger = logger;
_diagnostics = diagnostics;
Expand Down Expand Up @@ -249,7 +252,7 @@ private async Task SendAsync((IEvent Event, Action Complete) message)
catch (OperationCanceledException) { }
catch (Exception e) when (e is not ObjectDisposedException)
{
kMessagesErrors.Add(1, _metrics.TagList);
_errorCount++;

// Fail fast for authentication exceptions
var aux = e as AuthenticationException;
Expand Down Expand Up @@ -447,33 +450,50 @@ static string Stringify(IList<MonitoredItemNotificationModel> notifications)
/// </summary>
private void InitializeMetrics()
{
_meter.CreateObservableCounter("iiot_edge_publisher_iothub_queue_dropped_count",
() => new Measurement<long>(_sinkBlockInputDroppedCount, _metrics.TagList), "Messages",
"Telemetry messages dropped due to overflow.");
_meter.CreateObservableUpDownCounter("iiot_edge_publisher_iothub_queue_size",
() => new Measurement<long>(_sinkBlock.InputCount, _metrics.TagList), "Messages",
"Telemetry messages queued for sending upstream.");
_meter.CreateObservableCounter("iiot_edge_publisher_send_queue_dropped_count",
() => new Measurement<long>(_sinkBlockInputDroppedCount, _metrics.TagList),
description: "Telemetry messages dropped due to overflow.");
_meter.CreateObservableUpDownCounter("iiot_edge_publisher_send_queue_size",
() => new Measurement<long>(_sinkBlock.InputCount, _metrics.TagList),
description: "Telemetry messages queued for sending upstream.");
_meter.CreateObservableUpDownCounter("iiot_edge_publisher_batch_input_queue_size",
() => new Measurement<long>(_notificationBufferInputCount, _metrics.TagList), "Notifications",
"Telemetry messages queued for sending upstream.");
() => new Measurement<long>(_notificationBufferInputCount, _metrics.TagList),
description: "Telemetry messages queued for sending upstream.");
_meter.CreateObservableUpDownCounter("iiot_edge_publisher_encoding_input_queue_size",
() => new Measurement<long>(_encodingBlock.InputCount, _metrics.TagList), "Notifications",
"Telemetry messages queued for sending upstream.");
() => new Measurement<long>(_encodingBlock.InputCount, _metrics.TagList),
description: "Telemetry messages queued for sending upstream.");
_meter.CreateObservableUpDownCounter("iiot_edge_publisher_encoding_output_queue_size",
() => new Measurement<long>(_encodingBlock.OutputCount, _metrics.TagList), "Messages",
"Telemetry messages queued for sending upstream.");
() => new Measurement<long>(_encodingBlock.OutputCount, _metrics.TagList),
description: "Telemetry messages queued for sending upstream.");
_meter.CreateObservableCounter("iiot_edge_publisher_messages",
() => new Measurement<long>(_messagesSentCount, _metrics.TagList), "Messages",
"Number of IoT messages successfully sent via transport.");
() => new Measurement<long>(_messagesSentCount, _metrics.TagList),
description: "Number of IoT messages successfully sent via transport.");
_meter.CreateObservableGauge("iiot_edge_publisher_messages_per_second",
() => new Measurement<double>(_messagesSentCount / UpTime, _metrics.TagList), "Messages/second",
"Messages/second sent via transport.");
() => new Measurement<double>(_messagesSentCount / UpTime, _metrics.TagList),
description: "Messages/second sent via transport.");
_meter.CreateObservableCounter("iiot_edge_publisher_message_send_failures",
() => new Measurement<long>(_errorCount, _metrics.TagList),
description: "Number of failures sending a network message.");

_meter.CreateObservableCounter("iiot_edge_publisher_sent_iot_messages",
() => new Measurement<long>(_isIoTEdge ? _messagesSentCount : 0, _metrics.TagList),
description: "Number of IoT messages successfully sent via transport.");
_meter.CreateObservableGauge("iiot_edge_publisher_sent_iot_messages_per_second",
() => new Measurement<double>(_isIoTEdge ? _messagesSentCount / UpTime : 0d, _metrics.TagList),
description: "Messages/second sent via transport.");
_meter.CreateObservableCounter("iiot_edge_publisher_iothub_queue_dropped_count",
() => new Measurement<long>(_isIoTEdge ? _sinkBlockInputDroppedCount : 0, _metrics.TagList),
description: "Telemetry messages dropped due to overflow.");
_meter.CreateObservableUpDownCounter("iiot_edge_publisher_iothub_queue_size",
() => new Measurement<long>(_isIoTEdge ? _sinkBlock.InputCount : 0, _metrics.TagList),
description: "Telemetry messages queued for sending upstream.");
_meter.CreateObservableCounter("iiot_edge_publisher_failed_iot_messages",
() => new Measurement<long>(_isIoTEdge ? _errorCount : 0, _metrics.TagList),
description: "Number of failures sending a network message.");
}

static readonly Counter<long> kMessagesErrors = Diagnostics.Meter.CreateCounter<long>(
"iiot_edge_publisher_failed_iot_messages", "messages", "Number of failures sending a network message.");
static readonly Histogram<double> kSendingDuration = Diagnostics.Meter.CreateHistogram<double>(
"iiot_edge_publisher_messages_duration", "milliseconds", "Histogram of message sending durations.");
"iiot_edge_publisher_messages_duration", description: "Histogram of message sending durations.");

/// <summary>
/// With 256k limit this is 1 GB.
Expand All @@ -483,6 +503,7 @@ private void InitializeMetrics()

private double UpTime => (DateTime.UtcNow - _startTime).TotalSeconds;
private long _messagesSentCount;
private long _errorCount;
private long _sinkBlockInputDroppedCount;
private long _notificationBufferInputCount;
private DateTime _dataFlowStartTime = DateTime.MinValue;
Expand All @@ -504,6 +525,7 @@ private void InitializeMetrics()
private readonly CancellationTokenSource _cts;
private readonly IMetricsContext _metrics;
private readonly IEventClient _eventClient;
private readonly bool _isIoTEdge;
private readonly Meter _meter = Diagnostics.NewMeter();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -291,9 +291,9 @@ internal WriterGroupDiagnosticModel AggregateModel

["iiot_edge_publisher_batch_input_queue_size"] =
(d, i) => d.IngressBatchBlockBufferSize = (long)i,
["iiot_edge_publisher_iothub_queue_size"] =
["iiot_edge_publisher_send_queue_size"] =
(d, i) => d.OutgressInputBufferCount = (long)i,
["iiot_edge_publisher_iothub_queue_dropped_count"] =
["iiot_edge_publisher_send_queue_dropped_count"] =
(d, i) => d.OutgressInputBufferDropped = (long)i,

["iiot_edge_publisher_encoding_input_queue_size"] =
Expand Down Expand Up @@ -322,7 +322,7 @@ internal WriterGroupDiagnosticModel AggregateModel
(d, i) => d.SentMessagesPerSec = (double)i,
["iiot_edge_publisher_messages"] =
(d, i) => d.OutgressIoTMessageCount = (long)i,
["iiot_edge_publisher_failed_iot_messages"] =
["iiot_edge_publisher_message_send_failures"] =
(d, i) => d.OutgressIoTMessageFailedCount = (long)i

// ... Add here more items if needed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -626,13 +626,13 @@ private void InitializeMetrics()
{
_meter.CreateObservableGauge("iiot_edge_publisher_module_start",
() => new Measurement<int>(_runtimeState == RuntimeStateEventType.RestartAnnouncement ? 0 : 1,
_metrics.TagList), "Count", "Publisher module started.");
_metrics.TagList), description: "Publisher module started.");
_meter.CreateObservableGauge("iiot_edge_publisher_module_state",
() => new Measurement<int>((int)_runtimeState,
_metrics.TagList), "State", "Publisher module runtime state.");
_metrics.TagList), description: "Publisher module runtime state.");
_meter.CreateObservableCounter("iiot_edge_publisher_certificate_renewal_count",
() => new Measurement<int>(_certificateRenewals,
_metrics.TagList), "Count", "Publisher certificate renewals.");
_metrics.TagList), description: "Publisher certificate renewals.");
}

private const int kCertificateLifetimeDays = 30;
Expand Down
Loading

0 comments on commit bbe3d12

Please sign in to comment.