diff --git a/src/config.ts b/src/config.ts index 9834d0a4..3fe691f9 100644 --- a/src/config.ts +++ b/src/config.ts @@ -117,6 +117,9 @@ type StorageConfigType = { tracingMode?: string tracingTimeMinDuration: number tracingReturnServerTimings: boolean + tracingFeatures?: { + upload: boolean + } } function getOptionalConfigFromEnv(key: string, fallback?: string): string | undefined { @@ -326,6 +329,9 @@ export function getConfig(options?: { reload?: boolean }): StorageConfigType { ), tracingReturnServerTimings: getOptionalConfigFromEnv('TRACING_RETURN_SERVER_TIMINGS') === 'true', + tracingFeatures: { + upload: getOptionalConfigFromEnv('TRACING_FEATURE_UPLOAD') === 'true', + }, // Queue pgQueueEnable: getOptionalConfigFromEnv('PG_QUEUE_ENABLE', 'ENABLE_QUEUE_EVENTS') === 'true', diff --git a/src/http/plugins/tracing.ts b/src/http/plugins/tracing.ts index 4dc7d72d..a3859c73 100644 --- a/src/http/plugins/tracing.ts +++ b/src/http/plugins/tracing.ts @@ -24,6 +24,8 @@ const { tracingTimeMinDuration, } = getConfig() +const enableLogTraces = ['debug', 'logs'].includes(defaultTracingMode || '') + export const tracing = fastifyPlugin( async function tracingMode(fastify) { if (!tracingEnabled) { @@ -40,15 +42,15 @@ export const tracing = fastifyPlugin( request.tracingMode = defaultTracingMode } + if (!enableLogTraces) { + return + } + const span = trace.getSpan(context.active()) if (span) { // We collect logs only in full,logs,debug mode - if ( - tracingEnabled && - request.tracingMode && - !['logs', 'debug'].includes(request.tracingMode) - ) { + if (request.tracingMode && !['debug'].includes(request.tracingMode)) { traceCollector.clearTrace(span.spanContext().traceId) } } @@ -62,7 +64,7 @@ export const tracing = fastifyPlugin( export const traceServerTime = fastifyPlugin( async function traceServerTime(fastify) { - if (!tracingEnabled) { + if (!tracingEnabled || !enableLogTraces) { return } fastify.addHook('onRequest', async (req, res) => { @@ -94,9 +96,14 @@ export const traceServerTime = fastifyPlugin( }) fastify.addHook('onResponse', async (request, reply) => { - try { - const traceId = trace.getSpan(context.active())?.spanContext().traceId + const traceId = trace.getSpan(context.active())?.spanContext().traceId + if (request.tracingMode !== 'debug') { + if (traceId) traceCollector.clearTrace(traceId) + return + } + + try { if (traceId) { const spans = traceCollector.getSpansForTrace(traceId) if (spans) { @@ -116,19 +123,27 @@ export const traceServerTime = fastifyPlugin( .join(',') reply.header('Server-Timing', httpServerTimes) } - traceCollector.clearTrace(traceId) } } } catch (e) { logSchema.error(request.log, 'failed tracing on response', { error: e, type: 'tracing' }) + } finally { + if (traceId) { + traceCollector.clearTrace(traceId) + } } }) fastify.addHook('onRequestAbort', async (req) => { - try { - const span = trace.getSpan(context.active()) - const traceId = span?.spanContext().traceId + const span = trace.getSpan(context.active()) + const traceId = span?.spanContext().traceId + + if (req.tracingMode !== 'debug') { + if (traceId) traceCollector.clearTrace(traceId) + return + } + try { span?.setAttribute('req_aborted', true) if (traceId) { @@ -136,10 +151,13 @@ export const traceServerTime = fastifyPlugin( if (spans) { req.serverTimings = spansToServerTimings(spans, true) } - traceCollector.clearTrace(traceId) } } catch (e) { logSchema.error(logger, 'failed parsing server times on abort', { error: e, type: 'otel' }) + } finally { + if (traceId) { + traceCollector.clearTrace(traceId) + } } }) }, diff --git a/src/internal/monitoring/otel.ts b/src/internal/monitoring/otel.ts index fc3bb4d1..5741c737 100644 --- a/src/internal/monitoring/otel.ts +++ b/src/internal/monitoring/otel.ts @@ -16,7 +16,7 @@ import { ATTR_SERVICE_NAME, ATTR_SERVICE_VERSION } from '@opentelemetry/semantic import { OTLPTraceExporter } from '@opentelemetry/exporter-trace-otlp-grpc' import { getNodeAutoInstrumentations } from '@opentelemetry/auto-instrumentations-node' import { CompressionAlgorithm } from '@opentelemetry/otlp-exporter-base' -import { SpanExporter, BatchSpanProcessor } from '@opentelemetry/sdk-trace-base' +import { SpanExporter, BatchSpanProcessor, SpanProcessor } from '@opentelemetry/sdk-trace-base' import * as grpc from '@grpc/grpc-js' import { HttpInstrumentation } from '@opentelemetry/instrumentation-http' import { IncomingMessage } from 'node:http' @@ -36,6 +36,7 @@ import { StreamSplitter } from '@tus/server' const tracingEnabled = process.env.TRACING_ENABLED === 'true' const headersEnv = process.env.OTEL_EXPORTER_OTLP_TRACES_HEADERS || '' +const enableLogTraces = ['debug', 'logs'].includes(process.env.TRACING_MODE || '') const exporterHeaders = headersEnv .split(',') @@ -67,13 +68,23 @@ if (tracingEnabled && endpoint) { // Create a BatchSpanProcessor using the trace exporter const batchProcessor = traceExporter ? new BatchSpanProcessor(traceExporter) : undefined +const spanProcessors: SpanProcessor[] = [] + +if (batchProcessor) { + spanProcessors.push(batchProcessor) +} + +if (enableLogTraces) { + spanProcessors.push(traceCollector) +} + // Configure the OpenTelemetry Node SDK const sdk = new NodeSDK({ resource: new Resource({ [ATTR_SERVICE_NAME]: 'storage', [ATTR_SERVICE_VERSION]: version, }), - spanProcessors: batchProcessor ? [traceCollector, batchProcessor] : [traceCollector], + spanProcessors: spanProcessors, traceExporter, instrumentations: [ new HttpInstrumentation({ @@ -266,10 +277,8 @@ const sdk = new NodeSDK({ enabled: true, methodsToInstrument: [ 'done', - '__doConcurrentUpload', '__uploadUsingPut', '__createMultipartUpload', - '__notifyProgress', 'markUploadAsAborted', ], }), @@ -304,7 +313,7 @@ const sdk = new NodeSDK({ ], }) -if (tracingEnabled) { +if (tracingEnabled && spanProcessors.length > 0) { // Initialize the OpenTelemetry Node SDK sdk.start() diff --git a/src/storage/backend/s3.ts b/src/storage/backend/s3.ts index f6383092..681048f7 100644 --- a/src/storage/backend/s3.ts +++ b/src/storage/backend/s3.ts @@ -28,12 +28,11 @@ import { getSignedUrl } from '@aws-sdk/s3-request-presigner' import { ERRORS, StorageBackendError } from '@internal/errors' import { getConfig } from '../../config' import { addAbortSignal, PassThrough, Readable } from 'node:stream' -import stream from 'stream/promises' import { trace } from '@opentelemetry/api' import { createByteCounterStream } from '@internal/concurrency' import { AgentStats, createAgent, gatherHttpAgentStats, InstrumentedAgent } from '@internal/http' -const { storageS3MaxSockets, tracingEnabled } = getConfig() +const { tracingFeatures, storageS3MaxSockets, tracingEnabled } = getConfig() interface StreamStatus { time: Date @@ -155,93 +154,32 @@ export class S3Backend implements StorageBackendAdapter { throw ERRORS.Aborted('Upload was aborted') } - const passThrough = new PassThrough() - - if (signal) { - addAbortSignal(signal, passThrough) - } - - passThrough.on('error', () => { - body.unpipe(passThrough) - }) - - body.on('error', (err) => { - if (!passThrough.closed) { - passThrough.destroy(err) - } - }) - - const byteReader = createByteCounterStream() - const bodyStream = body.pipe(passThrough) + const streamWatcher = tracingFeatures?.upload ? this.watchUploadStream(body, signal) : undefined + const uploadStream = streamWatcher ? streamWatcher.dataStream : body - let upload: Upload | undefined = undefined - - // Upload stats - const uploadProgress: Progress[] = [] - const getStreamStatus = (): StreamStatus => ({ - time: new Date(), - bytesUploaded: uploadProgress[uploadProgress.length - 1]?.loaded || 0, - dataStream: { - closed: bodyStream.closed, - paused: bodyStream.isPaused(), - errored: Boolean(bodyStream.errored), - writable: bodyStream.writable, - byteRead: byteReader.bytes, + const upload = new Upload({ + client: this.client, + params: { + Bucket: bucketName, + Key: withOptionalVersion(key, version), + Body: uploadStream, + ContentType: contentType, + CacheControl: cacheControl, }, - httpAgentStats: gatherHttpAgentStats(this.agent.httpsAgent.getCurrentStatus()), - progress: uploadProgress, }) - let streamStatus = getStreamStatus() + streamWatcher?.watchUpload(upload) - const streamWatcher = setInterval(() => { - streamStatus = getStreamStatus() - }, 1000) + signal?.addEventListener( + 'abort', + () => { + upload.abort() + }, + { once: true } + ) try { - const data = await stream.pipeline( - bodyStream, - byteReader.transformStream, - async (bodyStream) => { - if (signal?.aborted) { - throw ERRORS.Aborted('Upload was aborted') - } - - upload = new Upload({ - client: this.client, - params: { - Bucket: bucketName, - Key: withOptionalVersion(key, version), - Body: bodyStream as Readable, - ContentType: contentType, - CacheControl: cacheControl, - }, - }) - - upload.on('httpUploadProgress', (progress) => { - uploadProgress.push({ - total: progress.total, - part: progress.part, - loaded: progress.loaded, - }) - if (uploadProgress.length > 100) { - uploadProgress.shift() - } - }) - - signal?.addEventListener( - 'abort', - () => { - upload?.abort() - }, - { once: true } - ) - - return await upload.done() - }, - { signal } - ) - + const data = await upload.done() const metadata = await this.headObject(bucketName, key, version) return { @@ -254,24 +192,28 @@ export class S3Backend implements StorageBackendAdapter { size: metadata.size, contentRange: metadata.contentRange, } - } catch (err: any) { + } catch (err) { if (err instanceof Error && err.name === 'AbortError') { const span = trace.getActiveSpan() if (span) { // Print how far we got uploading the file - const lastStreamStatus = getStreamStatus() - const { progress, ...lastSeenStatus } = streamStatus - span.setAttributes({ - lastStreamStatus: JSON.stringify(lastStreamStatus), - lastSeenStatus: JSON.stringify(lastSeenStatus), - }) + const lastSeenStatus = streamWatcher?.lastSeenStreamStatus + const lastStreamStatus = streamWatcher?.getStreamStatus() + + if (lastSeenStatus && lastStreamStatus) { + const { progress, ...lastSeenStream } = lastSeenStatus + span.setAttributes({ + lastStreamStatus: JSON.stringify(lastStreamStatus), + lastSeenStatus: JSON.stringify(lastSeenStream), + }) + } } throw ERRORS.AbortedTerminate('Upload was aborted', err) } throw StorageBackendError.fromError(err) } finally { - clearInterval(streamWatcher) + streamWatcher?.stop() } } @@ -551,6 +493,92 @@ export class S3Backend implements StorageBackendAdapter { this.agent.close() } + protected watchUploadStream(body: Readable, signal?: AbortSignal) { + const passThrough = new PassThrough() + + if (signal) { + addAbortSignal(signal, passThrough) + } + + passThrough.on('error', () => { + body.unpipe(passThrough) + }) + + body.on('error', (err) => { + if (!passThrough.closed) { + passThrough.destroy(err) + } + }) + + const byteReader = createByteCounterStream() + const bodyStream = body.pipe(passThrough) + + // Upload stats + const uploadProgress: Progress[] = [] + const getStreamStatus = (): StreamStatus => ({ + time: new Date(), + bytesUploaded: uploadProgress[uploadProgress.length - 1]?.loaded || 0, + dataStream: { + closed: bodyStream.closed, + paused: bodyStream.isPaused(), + errored: Boolean(bodyStream.errored), + writable: bodyStream.writable, + byteRead: byteReader.bytes, + }, + httpAgentStats: gatherHttpAgentStats(this.agent.httpsAgent.getCurrentStatus()), + progress: uploadProgress, + }) + + let streamStatus = getStreamStatus() + + const streamWatcher = setInterval(() => { + streamStatus = getStreamStatus() + }, 1000) + + const dataStream = passThrough.pipe(byteReader.transformStream) + + body.on('error', (err) => { + passThrough.destroy(err) + }) + + passThrough.on('error', (err) => { + body.destroy(err) + }) + + passThrough.on('close', () => { + body.unpipe(passThrough) + }) + + function watchUpload(upload: Upload) { + upload.on('httpUploadProgress', (progress) => { + uploadProgress.push({ + total: progress.total, + part: progress.part, + loaded: progress.loaded, + }) + if (uploadProgress.length > 100) { + uploadProgress.shift() + } + }) + } + + return { + dataStream, + byteReader, + get uploadProgress() { + return uploadProgress + }, + get lastSeenStreamStatus() { + return streamStatus + }, + getStreamStatus, + stop() { + clearInterval(streamWatcher) + }, + watchUpload, + } + } + protected createS3Client(options: S3ClientOptions & { name: string }) { const params: S3ClientConfig = { region: options.region, diff --git a/src/storage/renderer/info.ts b/src/storage/renderer/info.ts index 5f8833a4..2a069ab3 100644 --- a/src/storage/renderer/info.ts +++ b/src/storage/renderer/info.ts @@ -45,7 +45,7 @@ export class InfoRenderer extends HeadRenderer { .header('ETag', data.metadata.eTag) .header('Content-Length', data.metadata.contentLength) .header('Last-Modified', data.metadata.lastModified?.toUTCString()) - .header('CacheControl', data.metadata.cacheControl) + .header('Cache-Control', data.metadata.cacheControl) if (data.transformations && data.transformations.length > 0) { response.header('X-Transformations', data.transformations.join(','))