Skip to content

Commit

Permalink
Update resource reporting
Browse files Browse the repository at this point in the history
  • Loading branch information
marcschier committed Feb 25, 2025
1 parent eacf6d6 commit 922146e
Show file tree
Hide file tree
Showing 5 changed files with 123 additions and 118 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -481,5 +481,26 @@ public record class WriterGroupDiagnosticModel
[DataMember(Name = "MonitoredOpcNodesCount", Order = 76,
EmitDefaultValue = true)]
public int MonitoredOpcNodesCount { get; set; }

/// <summary>
/// Container Cpu limit utilization
/// </summary>
[DataMember(Name = "CpuLimitUtilization", Order = 77,
EmitDefaultValue = true)]
public double CpuLimitUtilization { get; set; }

/// <summary>
/// Container Cpu request utilization
/// </summary>
[DataMember(Name = "CpuRequestUtilization", Order = 78,
EmitDefaultValue = true)]
public double CpuRequestUtilization { get; set; }

/// <summary>
/// Container memory limit utilization
/// </summary>
[DataMember(Name = "MemoryLimitUtilization", Order = 79,
EmitDefaultValue = true)]
public double MemoryLimitUtilization { get; set; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ namespace Azure.IIoT.OpcUa.Publisher.Services
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.Diagnostics.Metrics;
using System.Diagnostics;

/// <summary>
/// Collects metrics from the writer groups inside the publisher using the .net Meter listener
Expand All @@ -29,12 +30,10 @@ public sealed class PublisherDiagnosticCollector : IDiagnosticCollector,
/// Create collector
/// </summary>
/// <param name="logger"></param>
/// <param name="resources"></param>
/// <param name="timeProvider"></param>
public PublisherDiagnosticCollector(ILogger<PublisherDiagnosticCollector> logger,
IResourceMonitor? resources = null, TimeProvider? timeProvider = null)
TimeProvider? timeProvider = null)
{
_resources = resources;
_logger = logger;
_timeProvider = timeProvider ?? TimeProvider.System;
_meterListener = new MeterListener
Expand Down Expand Up @@ -76,39 +75,17 @@ public bool TryGetDiagnosticsForWriterGroup(string writerGroupId,
_meterListener.RecordObservableInstruments();
var duration = _timeProvider.GetUtcNow() - value.IngestionStart;

if (_resources != null)
diagnostic = value with
{
var resources = _resources.GetUtilization(TimeSpan.FromSeconds(5));

diagnostic = value with
{
IngestionDuration = duration,
OpcEndpointConnected = value.NumberOfConnectedEndpoints != 0,

MemoryUsedPercentage =
resources.MemoryUsedPercentage,
MemoryUsedInBytes =
resources.MemoryUsedInBytes,
CpuUsedPercentage =
resources.CpuUsedPercentage,
GuaranteedCpuUnits =
resources.SystemResources.GuaranteedCpuUnits,
MaximumCpuUnits =
resources.SystemResources.MaximumCpuUnits,
GuaranteedMemoryInBytes =
resources.SystemResources.GuaranteedMemoryInBytes,
MaximumMemoryInBytes =
resources.SystemResources.MaximumMemoryInBytes
};
}
else
{
diagnostic = value with
{
IngestionDuration = duration,
OpcEndpointConnected = value.NumberOfConnectedEndpoints != 0,
};
}
IngestionDuration = duration,
OpcEndpointConnected = value.NumberOfConnectedEndpoints != 0,
MemoryLimitUtilization = _process.MemoryLimitUtilization,
CpuLimitUtilization = _process.CpuLimitUtilization,
CpuRequestUtilization = _process.CpuRequestUtilization,
CpuUsedPercentage = _process.CpuUsedPercentage,
MemoryUsedPercentage = _process.MemoryUsedPercentage,
MemoryUsedInBytes = _process.MemoryUsedInBytes
};
return true;
}
diagnostic = default;
Expand All @@ -124,41 +101,19 @@ public bool TryGetDiagnosticsForWriterGroup(string writerGroupId,
foreach (var (writerGroupId, info) in _diagnostics)
{
var duration = now - info.IngestionStart;

if (_resources == null)
{
yield return (writerGroupId, info with
{
Timestamp = now,
IngestionDuration = duration,
OpcEndpointConnected = info.NumberOfConnectedEndpoints != 0,
});
}
else
yield return (writerGroupId, info with
{
var resources = _resources.GetUtilization(TimeSpan.FromSeconds(5));
yield return (writerGroupId, info with
{
Timestamp = now,
IngestionDuration = duration,
OpcEndpointConnected = info.NumberOfConnectedEndpoints != 0,
Timestamp = now,
IngestionDuration = duration,
OpcEndpointConnected = info.NumberOfConnectedEndpoints != 0,

MemoryUsedPercentage =
resources.MemoryUsedPercentage,
MemoryUsedInBytes =
resources.MemoryUsedInBytes,
CpuUsedPercentage =
resources.CpuUsedPercentage,
GuaranteedCpuUnits =
resources.SystemResources.GuaranteedCpuUnits,
MaximumCpuUnits =
resources.SystemResources.MaximumCpuUnits,
GuaranteedMemoryInBytes =
resources.SystemResources.GuaranteedMemoryInBytes,
MaximumMemoryInBytes =
resources.SystemResources.MaximumMemoryInBytes
});
}
MemoryLimitUtilization = _process.MemoryLimitUtilization,
CpuLimitUtilization = _process.CpuLimitUtilization,
CpuRequestUtilization = _process.CpuRequestUtilization,
CpuUsedPercentage = _process.CpuUsedPercentage,
MemoryUsedPercentage = _process.MemoryUsedPercentage,
MemoryUsedInBytes = _process.MemoryUsedInBytes
});
}
}

Expand Down Expand Up @@ -210,16 +165,23 @@ private void OnInstrumentPublished(Instrument instrument, MeterListener listener
private void OnMeasurementRecorded<T>(Instrument instrument, T measurement,
ReadOnlySpan<KeyValuePair<string, object?>> tags, object? state)
{
if (_bindings.TryGetValue(instrument.Name, out var binding) &&
TryGetIds(tags, out var writerGroupId, out var writerGroupName) &&
_diagnostics.TryGetValue(writerGroupId, out var diag))
if (_bindings.TryGetValue(instrument.Name, out var binding))
{
if (writerGroupName != null)
if (TryGetIds(tags, out var writerGroupId, out var writerGroupName) &&
_diagnostics.TryGetValue(writerGroupId, out var diag))
{
diag.WriterGroupName = writerGroupName;
if (writerGroupName != null)
{
diag.WriterGroupName = writerGroupName;
}
binding(diag, measurement!);
}
else
{
binding(_process, measurement!);
}
binding(diag, measurement!);
}

static bool TryGetIds(ReadOnlySpan<KeyValuePair<string, object?>> tags,
[NotNullWhen(true)] out string? writerGroupId, out string? writerGroupName)
{
Expand Down Expand Up @@ -251,9 +213,9 @@ static bool TryGetIds(ReadOnlySpan<KeyValuePair<string, object?>> tags,
}

private readonly MeterListener _meterListener;
private readonly IResourceMonitor? _resources;
private readonly ILogger _logger;
private readonly TimeProvider _timeProvider;
private readonly WriterGroupDiagnosticModel _process = new();
private readonly ConcurrentDictionary<string, WriterGroupDiagnosticModel> _diagnostics = new();

// TODO: Split this per measurement type to avoid boxing
Expand Down Expand Up @@ -372,9 +334,22 @@ static bool TryGetIds(ReadOnlySpan<KeyValuePair<string, object?>> tags,
["iiot_edge_publisher_messages"] =
(d, i) => d.OutgressIoTMessageCount = (long)i,
["iiot_edge_publisher_message_send_failures"] =
(d, i) => d.OutgressIoTMessageFailedCount = (long)i
(d, i) => d.OutgressIoTMessageFailedCount = (long)i,

["container.cpu.limit.utilization"] =
(d, i) => d.CpuLimitUtilization = (double)i,
["container.cpu.request.utilization"] =
(d, i) => d.CpuRequestUtilization = (double)i,
["process.cpu.utilization"] =
(d, i) => d.CpuUsedPercentage = (double)i,
["container.memory.limit.utilization"] =
(d, i) => d.MemoryLimitUtilization = (double)i,
["dotnet.process.memory.virtual.utilization"] =
(d, i) => d.MemoryUsedPercentage = (double)i,
["dotnet.process.memory.working_set"] =
(d, i) => d.MemoryUsedInBytes = (ulong)(long)i,

// ... Add here more items if needed
// ... Add here more items if needed
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -592,23 +592,20 @@ static string Format(long changes, long lastMinute, double s)
if (includeResourceInfo)
{
sb = sb
.Append(" # Cpu/Memory max : ")
.AppendFormat(CultureInfo.CurrentCulture, "{0,14:n2}", info.MaximumCpuUnits)
.Append(" # Cpu (%limit/%req/%used) : ")
.AppendFormat(CultureInfo.CurrentCulture, "{0,14:p2}", info.CpuLimitUtilization)
.Append(" | ")
.AppendFormat(CultureInfo.CurrentCulture, "{0:n0}", info.MaximumMemoryInBytes / 1000d)
.AppendLine(" KB")
.Append(" # Cpu/Memory available : ")
.AppendFormat(CultureInfo.CurrentCulture, "{0,14:n2}", info.GuaranteedCpuUnits)
.Append(" | ")
.AppendFormat(CultureInfo.CurrentCulture, "{0:n0}", info.GuaranteedMemoryInBytes / 1000d)
.AppendLine(" KB")
.Append(" # Cpu/Memory % used (window/total) : ")
.AppendFormat(CultureInfo.CurrentCulture, "{0,14:p2}", info.CpuUsedPercentage)
.AppendFormat(CultureInfo.CurrentCulture, "{0:p2}", info.CpuRequestUtilization)
.Append(" (")
.AppendFormat(CultureInfo.CurrentCulture, "{0:p2}", info.CpuUsedPercentage)
.AppendLine(")")
.Append(" # Memory (%limit/%used/total used) : ")
.AppendFormat(CultureInfo.CurrentCulture, "{0,14:p2}", info.MemoryLimitUtilization)
.Append(" | ")
.AppendFormat(CultureInfo.CurrentCulture, "{0:p2}", info.MemoryUsedPercentage)
.Append(" (")
.AppendFormat(CultureInfo.CurrentCulture, "{0:n0}", info.MemoryUsedInBytes / 1000d)
.AppendLine(" kb)")
.AppendLine(" KB)")
;
}
return sb
Expand Down
64 changes: 41 additions & 23 deletions src/Azure.IIoT.OpcUa.Publisher/src/Stack/Services/OpcUaClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -969,22 +969,25 @@ private async Task ManageSessionStateMachineAsync(CancellationToken ct)
{
case SessionState.Connected: // only valid when connected.
Debug.Assert(_reconnectHandler.State == SessionReconnectHandler.ReconnectState.Ready);
_logger.LogInformation("{Client}: Reconnecting session {Session} due to {Reason}...",
this, _sessionName, (context is ServiceResult sr) ? "error " + sr : "RESET");

Check failure

Code scanning / CodeQL

Log entries created from user input High

This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.

// Ensure no more access to the session through reader locks
Debug.Assert(_disconnectLock == null);
_disconnectLock = await _lock.WriterLockAsync(ct);

_logger.LogInformation("{Client}: Reconnecting session {Session} due to {Reason}...",
this, _sessionName, (context is ServiceResult sr) ? "error " + sr : "RESET");
_logger.LogInformation("{Client}: Begin reconnecting session {Session}...",
this, _sessionName);

Check failure

Code scanning / CodeQL

Log entries created from user input High

This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
Debug.Assert(_session != null);
var state = _reconnectHandler.BeginReconnect(_session,
_reverseConnectManager, GetMinReconnectPeriod(), (sender, evt) =>
{
if (ReferenceEquals(sender, _reconnectHandler))
if (!ReferenceEquals(sender, _reconnectHandler))
{
TriggerConnectionEvent(ConnectionEvent.ReconnectComplete,
_reconnectHandler.Session);
_logger.LogError("{Client}: Reconnect handler mismatch.", this);
return;
}
TriggerConnectionEvent(ConnectionEvent.ReconnectComplete,
_reconnectHandler.Session);
});

// Save session while reconnecting.
Expand All @@ -1010,6 +1013,8 @@ private async Task ManageSessionStateMachineAsync(CancellationToken ct)
switch (currentSessionState)
{
case SessionState.Reconnecting:
_logger.LogInformation("{Client}: Completed reconnecting session {Session}...",
this, _sessionName);

Check failure

Code scanning / CodeQL

Log entries created from user input High

This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
//
// Behavior of the reconnect handler is as follows:
// 1) newSession == null
Expand Down Expand Up @@ -1047,12 +1052,11 @@ private async Task ManageSessionStateMachineAsync(CancellationToken ct)
Debug.Assert(_disconnectLock != null);
_disconnectLock.Dispose();
_disconnectLock = null;

await SyncAsync(ct).ConfigureAwait(false);

_reconnectRequired = 0;
reconnectPeriod = GetMinReconnectPeriod();
currentSessionState = SessionState.Connected;

await SyncAsync(ct).ConfigureAwait(false);
NotifySubscriptions(_session, false);
break;

Expand Down Expand Up @@ -1395,23 +1399,31 @@ private async ValueTask<bool> TryConnectAsync(CancellationToken ct)
/// <param name="e"></param>
internal void Session_HandlePublishError(ISession session, PublishErrorEventArgs e)
{
if (_disconnectLock == null && session == _session)
if (!ReferenceEquals(session, _session))
{
switch (e.Status.Code)
if (_session != null)
{
case StatusCodes.BadSessionIdInvalid:
case StatusCodes.BadSecureChannelClosed:
case StatusCodes.BadSessionClosed:
case StatusCodes.BadConnectionClosed:
case StatusCodes.BadNotConnected:
case StatusCodes.BadNoCommunication:
TriggerReconnect(e.Status, "Publish");
return;
default:
_logger.LogInformation("{Client}: Publish error: {Error}...",
this, e.Status);
break;
_logger.LogError(
"{Client}: Received publish error for different session {Session}!",
this, session);
}
return;
}
switch (e.Status.Code)
{
case StatusCodes.BadSessionIdInvalid:
case StatusCodes.BadSecureChannelClosed:
case StatusCodes.BadSessionClosed:
case StatusCodes.BadConnectionClosed:
case StatusCodes.BadServerHalted:
case StatusCodes.BadNotConnected:
case StatusCodes.BadNoCommunication:
TriggerReconnect(e.Status, "Publish");
return;
default:
_logger.LogInformation("{Client}: Publish error: {Error}...",
this, e.Status);
break;
}
}

Expand Down Expand Up @@ -1490,6 +1502,12 @@ internal void Session_KeepAlive(ISession session, KeepAliveEventArgs e)
// check for events from discarded sessions.
if (!ReferenceEquals(session, _session))
{
if (_session != null)
{
_logger.LogError(
"{Client}: Received keep alive for different session {Session}!",
this, session);
}
return;
}
// start reconnect sequence on communication error.
Expand Down
Loading

0 comments on commit 922146e

Please sign in to comment.