Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate BatchExporterProcessor to async #5838

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,5 @@ static Microsoft.Extensions.Logging.OpenTelemetryLoggingExtensions.UseOpenTeleme
static Microsoft.Extensions.Logging.OpenTelemetryLoggingExtensions.UseOpenTelemetry(this Microsoft.Extensions.Logging.ILoggingBuilder! builder, System.Action<OpenTelemetry.Logs.LoggerProviderBuilder!>! configure) -> Microsoft.Extensions.Logging.ILoggingBuilder!
static Microsoft.Extensions.Logging.OpenTelemetryLoggingExtensions.UseOpenTelemetry(this Microsoft.Extensions.Logging.ILoggingBuilder! builder, System.Action<OpenTelemetry.Logs.LoggerProviderBuilder!>? configureBuilder, System.Action<OpenTelemetry.Logs.OpenTelemetryLoggerOptions!>? configureOptions) -> Microsoft.Extensions.Logging.ILoggingBuilder!
static OpenTelemetry.Sdk.CreateLoggerProviderBuilder() -> OpenTelemetry.Logs.LoggerProviderBuilder!
virtual OpenTelemetry.BaseExporter<T>.ExportAsync(OpenTelemetry.Batch<T!> batch, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<OpenTelemetry.ExportResult>!
virtual OpenTelemetry.Metrics.FixedSizeExemplarReservoir.OnCollected() -> void
11 changes: 11 additions & 0 deletions src/OpenTelemetry/BaseExporter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,17 @@ public abstract class BaseExporter<T> : IDisposable
/// <returns>Result of the export operation.</returns>
public abstract ExportResult Export(in Batch<T> batch);

/// <summary>
/// Exports a batch of telemetry objects.
/// </summary>
/// <param name="batch">Batch of telemetry objects to export.</param>
/// <param name="cancellationToken">The cancellation token to cancel the export.</param>
/// <returns>Result of the export operation.</returns>
public virtual Task<ExportResult> ExportAsync(Batch<T> batch, CancellationToken cancellationToken = default)
{
return Task.FromResult(this.Export(batch));
}

/// <summary>
/// Flushes the exporter, blocks the current thread until flush
/// completed, shutdown signaled or timed out.
Expand Down
213 changes: 111 additions & 102 deletions src/OpenTelemetry/BatchExportProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,9 @@ public abstract class BatchExportProcessor<T> : BaseExportProcessor<T>
internal readonly int ExporterTimeoutMilliseconds;

private readonly CircularBuffer<T> circularBuffer;
private readonly Thread exporterThread;
private readonly AutoResetEvent exportTrigger = new(false);
private readonly ManualResetEvent dataExportedNotification = new(false);
private readonly ManualResetEvent shutdownTrigger = new(false);
private readonly CancellationTokenSource exporterTaskCancellation;
private readonly Task exporterTask;
private Task exportTask;
private long shutdownDrainTarget = long.MaxValue;
private long droppedCount;
private bool disposed;
Expand Down Expand Up @@ -57,12 +56,9 @@ protected BatchExportProcessor(
this.ScheduledDelayMilliseconds = scheduledDelayMilliseconds;
this.ExporterTimeoutMilliseconds = exporterTimeoutMilliseconds;
this.MaxExportBatchSize = maxExportBatchSize;
this.exporterThread = new Thread(this.ExporterProc)
{
IsBackground = true,
Name = $"OpenTelemetry-{nameof(BatchExportProcessor<T>)}-{exporter.GetType().Name}",
};
this.exporterThread.Start();
this.exportTask = Task.CompletedTask;
this.exporterTaskCancellation = new CancellationTokenSource();
this.exporterTask = Task.Run(this.ExporterProc);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we shouldn't change the default behavior here. If there are scenarios where a thread pool must be leveraged, then it can be a optional, opt-in behavior - either a modification for the BatchExportProcessor (UseDedicatedThread=false) or a new BatchExportProcessorThreadPool.

Having said that, I'll not have bandwidth to do a detailed review at the moment, but I believe the first step is to get some guidance from the maintainers about which direction should be taken:

  1. Provide a opt-in feature flag to BatchExportProcessor so it can do dedicated thread or threadpool.
  2. Make a new BatchExportProcessor to do the thread pool one - either in this repo OR in the contrib repo.
  3. Something else.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good idea. I'll do some experimentations.

}

/// <summary>
Expand All @@ -87,13 +83,7 @@ internal bool TryExport(T data)
{
if (this.circularBuffer.Count >= this.MaxExportBatchSize)
{
try
{
this.exportTrigger.Set();
}
catch (ObjectDisposedException)
{
}
_ = this.ExportAsync();
}

return true; // enqueue succeeded
Expand All @@ -113,6 +103,34 @@ protected override void OnExport(T data)

/// <inheritdoc/>
protected override bool OnForceFlush(int timeoutMilliseconds)
{
return this.FlushAsync(TimeSpan.FromMilliseconds(timeoutMilliseconds)).GetAwaiter().GetResult();
}

/// <inheritdoc/>
protected override bool OnShutdown(int timeoutMilliseconds)
{
return this.ShutdownAsync(TimeSpan.FromMilliseconds(timeoutMilliseconds)).GetAwaiter().GetResult();
}

/// <inheritdoc/>
protected override void Dispose(bool disposing)
{
if (!this.disposed)
{
if (disposing)
{
this.exporterTaskCancellation.Cancel();
this.exporterTaskCancellation.Dispose();
}

this.disposed = true;
}

base.Dispose(disposing);
}

private async Task<bool> FlushAsync(TimeSpan timeout)
{
var tail = this.circularBuffer.RemovedCount;
var head = this.circularBuffer.AddedCount;
Expand All @@ -122,82 +140,61 @@ protected override bool OnForceFlush(int timeoutMilliseconds)
return true; // nothing to flush
}

try
{
this.exportTrigger.Set();
}
catch (ObjectDisposedException)
_ = this.ExportAsync();

if (timeout == TimeSpan.Zero)
{
return false;
}

if (timeoutMilliseconds == 0)
CancellationTokenSource timeoutCancellation;
try
{
timeoutCancellation = CancellationTokenSource.CreateLinkedTokenSource(this.exporterTaskCancellation.Token);
}
catch (ObjectDisposedException)
{
return false;
}

var triggers = new WaitHandle[] { this.dataExportedNotification, this.shutdownTrigger };

var sw = timeoutMilliseconds == Timeout.Infinite
? null
: Stopwatch.StartNew();

// There is a chance that the export thread finished processing all the data from the queue,
// and signaled before we enter wait here, use polling to prevent being blocked indefinitely.
const int pollingMilliseconds = 1000;
var timeoutTask = Task.Delay(timeout, timeoutCancellation.Token);

while (true)
{
if (sw == null)
Task completedTask;
try
{
try
{
WaitHandle.WaitAny(triggers, pollingMilliseconds);
}
catch (ObjectDisposedException)
{
return false;
}
completedTask = await Task.WhenAny(timeoutTask, this.ExportAsync()).ConfigureAwait(false);
}
else
catch (ObjectDisposedException)
{
var timeout = timeoutMilliseconds - sw.ElapsedMilliseconds;

if (timeout <= 0)
{
return this.circularBuffer.RemovedCount >= head;
}

try
{
WaitHandle.WaitAny(triggers, Math.Min((int)timeout, pollingMilliseconds));
}
catch (ObjectDisposedException)
{
return false;
}
return false;
}

if (this.circularBuffer.RemovedCount >= head)
{
return true;
}

if (completedTask == timeoutTask)
{
return false;
}

if (Volatile.Read(ref this.shutdownDrainTarget) != long.MaxValue)
{
return false;
}
}
}

/// <inheritdoc/>
protected override bool OnShutdown(int timeoutMilliseconds)
private async Task<bool> ShutdownAsync(TimeSpan timeout)
{
Volatile.Write(ref this.shutdownDrainTarget, this.circularBuffer.AddedCount);

try
{
this.shutdownTrigger.Set();
this.exporterTaskCancellation.Cancel();
}
catch (ObjectDisposedException)
{
Expand All @@ -206,53 +203,37 @@ protected override bool OnShutdown(int timeoutMilliseconds)

OpenTelemetrySdkEventSource.Log.DroppedExportProcessorItems(this.GetType().Name, this.exporter.GetType().Name, this.DroppedCount);

if (timeoutMilliseconds == Timeout.Infinite)
if (timeout == Timeout.InfiniteTimeSpan)
{
this.exporterThread.Join();
await this.exporterTask.ConfigureAwait(false);
return this.exporter.Shutdown();
}

if (timeoutMilliseconds == 0)
if (timeout == TimeSpan.Zero)
{
return this.exporter.Shutdown(0);
}

var sw = Stopwatch.StartNew();
this.exporterThread.Join(timeoutMilliseconds);
var timeout = timeoutMilliseconds - sw.ElapsedMilliseconds;
return this.exporter.Shutdown((int)Math.Max(timeout, 0));
}

/// <inheritdoc/>
protected override void Dispose(bool disposing)
{
if (!this.disposed)
{
if (disposing)
{
this.exportTrigger.Dispose();
this.dataExportedNotification.Dispose();
this.shutdownTrigger.Dispose();
}

this.disposed = true;
}

base.Dispose(disposing);
await Task.WhenAny(this.exporterTask, Task.Delay(timeout)).ConfigureAwait(false);
var remainingTimeout = timeout.TotalMilliseconds - sw.ElapsedMilliseconds;
return this.exporter.Shutdown((int)Math.Max(remainingTimeout, 0));
}

private void ExporterProc()
private async Task ExporterProc()
{
var triggers = new WaitHandle[] { this.exportTrigger, this.shutdownTrigger };

while (true)
{
// only wait when the queue doesn't have enough items, otherwise keep busy and send data continuously
if (this.circularBuffer.Count < this.MaxExportBatchSize)
{
try
{
WaitHandle.WaitAny(triggers, this.ScheduledDelayMilliseconds);
await Task.Delay(this.ScheduledDelayMilliseconds, this.exporterTaskCancellation.Token).ConfigureAwait(false);
}
catch (TaskCanceledException)
{
// The delay was canceled for the exporter to shut down.
}
catch (ObjectDisposedException)
{
Expand All @@ -262,27 +243,55 @@ private void ExporterProc()
}

if (this.circularBuffer.Count > 0)
{
await this.ExportAsync().ConfigureAwait(false);
}

if (this.circularBuffer.RemovedCount >= Volatile.Read(ref this.shutdownDrainTarget))
{
return;
}
}
}

private Task ExportAsync()
{
var optimisticExportTask = this.exportTask;
if (!optimisticExportTask.IsCompleted)
{
// An export is currently being processed.
return optimisticExportTask;
}

TaskCompletionSource<object?> newCurrentExportTaskCompletion = new(TaskCreationOptions.RunContinuationsAsynchronously);
var localExportTask = Interlocked.CompareExchange(
ref this.exportTask,
newCurrentExportTaskCompletion.Task,
optimisticExportTask);
if (!localExportTask.IsCompleted)
{
// An export is currently being processed.
return localExportTask;
}

// Use Task.Run to yield the execution as soon as possible.
return Task.Run(CoreAsync);

async Task CoreAsync()
{
try
{
using (var batch = new Batch<T>(this.circularBuffer, this.MaxExportBatchSize))
{
this.exporter.Export(batch);
await this.exporter.ExportAsync(batch).ConfigureAwait(false);
}

try
{
this.dataExportedNotification.Set();
this.dataExportedNotification.Reset();
}
catch (ObjectDisposedException)
{
// the exporter is somehow disposed before the worker thread could finish its job
return;
}
newCurrentExportTaskCompletion.SetResult(null);
}

if (this.circularBuffer.RemovedCount >= Volatile.Read(ref this.shutdownDrainTarget))
catch (Exception e)
{
return;
newCurrentExportTaskCompletion.SetException(e);
throw;
}
}
}
Expand Down