Skip to content

Commit a7ff6de

Browse files
authored
fix: otel logs better DynamicFlushScheduler (#2318)
* Improve dynamic flush scheduler for otel data The changes introduce a more flexible and adaptive dynamic flush scheduler to address production issues where the system wasn't flushing data fast enough, causing memory growth and crashes. This issue arises from the existing scheduler handling only a single flush at a time, limiting concurrency and failing to cope with the influx of logs. - Added configuration options for setting minimum and maximum concurrency levels, maximum batch size, and memory pressure threshold. These parameters ensure that flush operations adjust dynamically based on workload and pressure. - Implemented `pLimit` to facilitate concurrent flush operations, with adjustments made according to batch queue length and memory pressure. - Metrics reporting improvements were added to monitor the dynamic behavior of the flush scheduler, aiding in identifying performance issues and optimizing the operation accordingly. * Implement load shedding for TaskEvent records This change introduces load shedding mechanisms to manage TaskEvent records, particularly those of kind LOG, when the system experiences high volumes and is unable to flush to the database in a timely manner. The addition aims to prevent overwhelming the system and ensure critical tasks are prioritized. - Added configuration options for `loadSheddingThreshold` and `loadSheddingEnabled` in multiple modules to activate load shedding. - Introduced `isDroppableEvent` function to allow specific events to be dropped when load shedding is enabled. - Ensured metrics are updated to reflect dropped events and load shedding status, providing visibility into system performance during high load conditions. - Updated loggers to inform about load shedding state changes, ensuring timely awareness of load management activities. * Fix undefined 'queuePressure' variable in DynamicFlushScheduler The 'queuePressure' variable was being used without being defined in the DynamicFlushScheduler class, causing potential runtime errors. This commit adds the missing definition and ensures that the variable is correctly calculated based on the 'totalQueuedItems' and 'memoryPressureThreshold'. - Addressed code inconsistencies and improved formatting. - Defined 'queuePressure' in the 'adjustConcurrency' method to prevent potential undefined errors. - Enhanced readability by maintaining consistent spacing and format across the file, contributing to the stability and maintainability of the code. - Adjusted batch size logic based on the newly defined 'queuePressure' variable. * Refactor concurrency adjustment logic in scheduler The concurrency adjustment logic in the dynamic flush scheduler has been refactored to improve clarity and maintainability. This change moves the calculation of pressure metrics outside of the conditional blocks to ensure they are always determined prior to decision-making. - The queue pressure and time since last flush calculations were moved up in the code to be independent of the 'backOff' condition. - This refactor sets up the groundwork for more reliable concurrency scaling and better performance monitoring capabilities. The overall logic of adjusting concurrency based on system pressure metrics remains unchanged. This adjustment addresses ongoing issues with the scheduler that were not resolved by previous changes. * Some tweaks
1 parent 0d136a3 commit a7ff6de

File tree

4 files changed

+481
-20
lines changed

4 files changed

+481
-20
lines changed

apps/webapp/app/env.server.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -252,6 +252,12 @@ const EnvironmentSchema = z.object({
252252
EVENTS_BATCH_SIZE: z.coerce.number().int().default(100),
253253
EVENTS_BATCH_INTERVAL: z.coerce.number().int().default(1000),
254254
EVENTS_DEFAULT_LOG_RETENTION: z.coerce.number().int().default(7),
255+
EVENTS_MIN_CONCURRENCY: z.coerce.number().int().default(1),
256+
EVENTS_MAX_CONCURRENCY: z.coerce.number().int().default(10),
257+
EVENTS_MAX_BATCH_SIZE: z.coerce.number().int().default(500),
258+
EVENTS_MEMORY_PRESSURE_THRESHOLD: z.coerce.number().int().default(5000),
259+
EVENTS_LOAD_SHEDDING_THRESHOLD: z.coerce.number().int().default(100000),
260+
EVENTS_LOAD_SHEDDING_ENABLED: z.string().default("1"),
255261
SHARED_QUEUE_CONSUMER_POOL_SIZE: z.coerce.number().int().default(10),
256262
SHARED_QUEUE_CONSUMER_INTERVAL_MS: z.coerce.number().int().default(100),
257263
SHARED_QUEUE_CONSUMER_NEXT_TICK_INTERVAL_MS: z.coerce.number().int().default(100),
Lines changed: 323 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,38 +1,140 @@
1+
import { Logger } from "@trigger.dev/core/logger";
12
import { nanoid } from "nanoid";
3+
import pLimit from "p-limit";
24

35
export type DynamicFlushSchedulerConfig<T> = {
46
batchSize: number;
57
flushInterval: number;
68
callback: (flushId: string, batch: T[]) => Promise<void>;
9+
// New configuration options
10+
minConcurrency?: number;
11+
maxConcurrency?: number;
12+
maxBatchSize?: number;
13+
memoryPressureThreshold?: number; // Number of items that triggers increased concurrency
14+
loadSheddingThreshold?: number; // Number of items that triggers load shedding
15+
loadSheddingEnabled?: boolean;
16+
isDroppableEvent?: (item: T) => boolean; // Function to determine if an event can be dropped
717
};
818

919
export class DynamicFlushScheduler<T> {
10-
private batchQueue: T[][]; // Adjust the type according to your data structure
11-
private currentBatch: T[]; // Adjust the type according to your data structure
20+
private batchQueue: T[][];
21+
private currentBatch: T[];
1222
private readonly BATCH_SIZE: number;
1323
private readonly FLUSH_INTERVAL: number;
1424
private flushTimer: NodeJS.Timeout | null;
1525
private readonly callback: (flushId: string, batch: T[]) => Promise<void>;
1626

27+
// New properties for dynamic scaling
28+
private readonly minConcurrency: number;
29+
private readonly maxConcurrency: number;
30+
private readonly maxBatchSize: number;
31+
private readonly memoryPressureThreshold: number;
32+
private limiter: ReturnType<typeof pLimit>;
33+
private currentBatchSize: number;
34+
private totalQueuedItems: number = 0;
35+
private consecutiveFlushFailures: number = 0;
36+
private lastFlushTime: number = Date.now();
37+
private metrics = {
38+
flushedBatches: 0,
39+
failedBatches: 0,
40+
totalItemsFlushed: 0,
41+
droppedEvents: 0,
42+
droppedEventsByKind: new Map<string, number>(),
43+
};
44+
45+
// New properties for load shedding
46+
private readonly loadSheddingThreshold: number;
47+
private readonly loadSheddingEnabled: boolean;
48+
private readonly isDroppableEvent?: (item: T) => boolean;
49+
private isLoadShedding: boolean = false;
50+
51+
private readonly logger: Logger = new Logger("EventRepo.DynamicFlushScheduler", "debug");
52+
1753
constructor(config: DynamicFlushSchedulerConfig<T>) {
1854
this.batchQueue = [];
1955
this.currentBatch = [];
2056
this.BATCH_SIZE = config.batchSize;
57+
this.currentBatchSize = config.batchSize;
2158
this.FLUSH_INTERVAL = config.flushInterval;
2259
this.callback = config.callback;
2360
this.flushTimer = null;
61+
62+
// Initialize dynamic scaling parameters
63+
this.minConcurrency = config.minConcurrency ?? 1;
64+
this.maxConcurrency = config.maxConcurrency ?? 10;
65+
this.maxBatchSize = config.maxBatchSize ?? config.batchSize * 5;
66+
this.memoryPressureThreshold = config.memoryPressureThreshold ?? config.batchSize * 20;
67+
68+
// Initialize load shedding parameters
69+
this.loadSheddingThreshold = config.loadSheddingThreshold ?? config.batchSize * 50;
70+
this.loadSheddingEnabled = config.loadSheddingEnabled ?? true;
71+
this.isDroppableEvent = config.isDroppableEvent;
72+
73+
// Start with minimum concurrency
74+
this.limiter = pLimit(this.minConcurrency);
75+
2476
this.startFlushTimer();
77+
this.startMetricsReporter();
2578
}
2679

2780
addToBatch(items: T[]): void {
28-
this.currentBatch.push(...items);
81+
let itemsToAdd = items;
82+
83+
// Apply load shedding if enabled and we're over the threshold
84+
if (this.loadSheddingEnabled && this.totalQueuedItems >= this.loadSheddingThreshold) {
85+
const { kept, dropped } = this.applyLoadShedding(items);
86+
itemsToAdd = kept;
87+
88+
if (dropped.length > 0) {
89+
this.metrics.droppedEvents += dropped.length;
2990

30-
if (this.currentBatch.length >= this.BATCH_SIZE) {
31-
this.batchQueue.push(this.currentBatch);
32-
this.currentBatch = [];
33-
this.flushNextBatch();
34-
this.resetFlushTimer();
91+
// Track dropped events by kind if possible
92+
dropped.forEach((item) => {
93+
const kind = this.getEventKind(item);
94+
if (kind) {
95+
const currentCount = this.metrics.droppedEventsByKind.get(kind) || 0;
96+
this.metrics.droppedEventsByKind.set(kind, currentCount + 1);
97+
}
98+
});
99+
100+
if (!this.isLoadShedding) {
101+
this.isLoadShedding = true;
102+
}
103+
104+
this.logger.warn("Load shedding", {
105+
totalQueuedItems: this.totalQueuedItems,
106+
threshold: this.loadSheddingThreshold,
107+
droppedCount: dropped.length,
108+
});
109+
}
110+
} else if (this.isLoadShedding && this.totalQueuedItems < this.loadSheddingThreshold * 0.8) {
111+
this.isLoadShedding = false;
112+
this.logger.info("Load shedding deactivated", {
113+
totalQueuedItems: this.totalQueuedItems,
114+
threshold: this.loadSheddingThreshold,
115+
totalDropped: this.metrics.droppedEvents,
116+
});
35117
}
118+
119+
this.currentBatch.push(...itemsToAdd);
120+
this.totalQueuedItems += itemsToAdd.length;
121+
122+
// Check if we need to create a batch
123+
if (this.currentBatch.length >= this.currentBatchSize) {
124+
this.createBatch();
125+
}
126+
127+
// Adjust concurrency based on queue pressure
128+
this.adjustConcurrency();
129+
}
130+
131+
private createBatch(): void {
132+
if (this.currentBatch.length === 0) return;
133+
134+
this.batchQueue.push(this.currentBatch);
135+
this.currentBatch = [];
136+
this.flushBatches();
137+
this.resetFlushTimer();
36138
}
37139

38140
private startFlushTimer(): void {
@@ -48,23 +150,224 @@ export class DynamicFlushScheduler<T> {
48150

49151
private checkAndFlush(): void {
50152
if (this.currentBatch.length > 0) {
51-
this.batchQueue.push(this.currentBatch);
52-
this.currentBatch = [];
153+
this.createBatch();
53154
}
54-
this.flushNextBatch();
155+
this.flushBatches();
55156
}
56157

57-
private async flushNextBatch(): Promise<void> {
58-
if (this.batchQueue.length === 0) return;
158+
private async flushBatches(): Promise<void> {
159+
const batchesToFlush: T[][] = [];
160+
161+
// Dequeue all available batches up to current concurrency limit
162+
while (this.batchQueue.length > 0 && batchesToFlush.length < this.limiter.concurrency) {
163+
const batch = this.batchQueue.shift();
164+
if (batch) {
165+
batchesToFlush.push(batch);
166+
}
167+
}
168+
169+
if (batchesToFlush.length === 0) return;
170+
171+
// Schedule all batches for concurrent processing
172+
const flushPromises = batchesToFlush.map((batch) =>
173+
this.limiter(async () => {
174+
const flushId = nanoid();
175+
const itemCount = batch.length;
176+
177+
try {
178+
const startTime = Date.now();
179+
await this.callback(flushId, batch);
180+
181+
const duration = Date.now() - startTime;
182+
this.totalQueuedItems -= itemCount;
183+
this.consecutiveFlushFailures = 0;
184+
this.lastFlushTime = Date.now();
185+
this.metrics.flushedBatches++;
186+
this.metrics.totalItemsFlushed += itemCount;
59187

60-
const batchToFlush = this.batchQueue.shift();
61-
try {
62-
await this.callback(nanoid(), batchToFlush!);
188+
this.logger.debug("Batch flushed successfully", {
189+
flushId,
190+
itemCount,
191+
duration,
192+
remainingQueueDepth: this.totalQueuedItems,
193+
activeConcurrency: this.limiter.activeCount,
194+
pendingConcurrency: this.limiter.pendingCount,
195+
});
196+
} catch (error) {
197+
this.consecutiveFlushFailures++;
198+
this.metrics.failedBatches++;
199+
200+
this.logger.error("Error flushing batch", {
201+
flushId,
202+
itemCount,
203+
error,
204+
consecutiveFailures: this.consecutiveFlushFailures,
205+
});
206+
207+
// Re-queue the batch at the front if it fails
208+
this.batchQueue.unshift(batch);
209+
this.totalQueuedItems += itemCount;
210+
211+
// Back off on failures
212+
if (this.consecutiveFlushFailures > 3) {
213+
this.adjustConcurrency(true);
214+
}
215+
}
216+
})
217+
);
218+
219+
// Don't await here - let them run concurrently
220+
Promise.allSettled(flushPromises).then(() => {
221+
// After flush completes, check if we need to flush more
63222
if (this.batchQueue.length > 0) {
64-
this.flushNextBatch();
223+
this.flushBatches();
224+
}
225+
});
226+
}
227+
228+
private lastConcurrencyAdjustment: number = Date.now();
229+
230+
private adjustConcurrency(backOff: boolean = false): void {
231+
const currentConcurrency = this.limiter.concurrency;
232+
let newConcurrency = currentConcurrency;
233+
234+
// Calculate pressure metrics - moved outside the if/else block
235+
const queuePressure = this.totalQueuedItems / this.memoryPressureThreshold;
236+
const timeSinceLastFlush = Date.now() - this.lastFlushTime;
237+
const timeSinceLastAdjustment = Date.now() - this.lastConcurrencyAdjustment;
238+
239+
// Don't adjust too frequently (except for backoff)
240+
if (!backOff && timeSinceLastAdjustment < 1000) {
241+
return;
242+
}
243+
244+
if (backOff) {
245+
// Reduce concurrency on failures
246+
newConcurrency = Math.max(this.minConcurrency, Math.floor(currentConcurrency * 0.75));
247+
} else {
248+
if (queuePressure > 0.8 || timeSinceLastFlush > this.FLUSH_INTERVAL * 2) {
249+
// High pressure - increase concurrency
250+
newConcurrency = Math.min(this.maxConcurrency, currentConcurrency + 2);
251+
} else if (queuePressure < 0.2 && currentConcurrency > this.minConcurrency) {
252+
// Low pressure - decrease concurrency
253+
newConcurrency = Math.max(this.minConcurrency, currentConcurrency - 1);
254+
}
255+
}
256+
257+
// Adjust batch size based on pressure
258+
if (this.totalQueuedItems > this.memoryPressureThreshold) {
259+
this.currentBatchSize = Math.min(
260+
this.maxBatchSize,
261+
Math.floor(this.BATCH_SIZE * (1 + queuePressure))
262+
);
263+
} else {
264+
this.currentBatchSize = this.BATCH_SIZE;
265+
}
266+
267+
// Update concurrency if changed
268+
if (newConcurrency !== currentConcurrency) {
269+
this.limiter = pLimit(newConcurrency);
270+
271+
this.logger.info("Adjusted flush concurrency", {
272+
previousConcurrency: currentConcurrency,
273+
newConcurrency,
274+
queuePressure,
275+
totalQueuedItems: this.totalQueuedItems,
276+
currentBatchSize: this.currentBatchSize,
277+
memoryPressureThreshold: this.memoryPressureThreshold,
278+
});
279+
}
280+
}
281+
282+
private startMetricsReporter(): void {
283+
// Report metrics every 30 seconds
284+
setInterval(() => {
285+
const droppedByKind: Record<string, number> = {};
286+
this.metrics.droppedEventsByKind.forEach((count, kind) => {
287+
droppedByKind[kind] = count;
288+
});
289+
290+
this.logger.info("DynamicFlushScheduler metrics", {
291+
totalQueuedItems: this.totalQueuedItems,
292+
batchQueueLength: this.batchQueue.length,
293+
currentBatchLength: this.currentBatch.length,
294+
currentConcurrency: this.limiter.concurrency,
295+
activeConcurrent: this.limiter.activeCount,
296+
pendingConcurrent: this.limiter.pendingCount,
297+
currentBatchSize: this.currentBatchSize,
298+
isLoadShedding: this.isLoadShedding,
299+
metrics: {
300+
...this.metrics,
301+
droppedByKind,
302+
},
303+
});
304+
}, 30000);
305+
}
306+
307+
private applyLoadShedding(items: T[]): { kept: T[]; dropped: T[] } {
308+
if (!this.isDroppableEvent) {
309+
// If no function provided to determine droppable events, keep all
310+
return { kept: items, dropped: [] };
311+
}
312+
313+
const kept: T[] = [];
314+
const dropped: T[] = [];
315+
316+
for (const item of items) {
317+
if (this.isDroppableEvent(item)) {
318+
dropped.push(item);
319+
} else {
320+
kept.push(item);
65321
}
66-
} catch (error) {
67-
console.error("Error inserting batch:", error);
322+
}
323+
324+
return { kept, dropped };
325+
}
326+
327+
private getEventKind(item: T): string | undefined {
328+
// Try to extract the kind from the event if it has one
329+
if (item && typeof item === "object" && "kind" in item) {
330+
return String(item.kind);
331+
}
332+
return undefined;
333+
}
334+
335+
// Method to get current status
336+
getStatus() {
337+
const droppedByKind: Record<string, number> = {};
338+
this.metrics.droppedEventsByKind.forEach((count, kind) => {
339+
droppedByKind[kind] = count;
340+
});
341+
342+
return {
343+
queuedItems: this.totalQueuedItems,
344+
batchQueueLength: this.batchQueue.length,
345+
currentBatchSize: this.currentBatch.length,
346+
concurrency: this.limiter.concurrency,
347+
activeFlushes: this.limiter.activeCount,
348+
pendingFlushes: this.limiter.pendingCount,
349+
isLoadShedding: this.isLoadShedding,
350+
metrics: {
351+
...this.metrics,
352+
droppedEventsByKind: droppedByKind,
353+
},
354+
};
355+
}
356+
357+
// Graceful shutdown
358+
async shutdown(): Promise<void> {
359+
if (this.flushTimer) {
360+
clearInterval(this.flushTimer);
361+
}
362+
363+
// Flush any remaining items
364+
if (this.currentBatch.length > 0) {
365+
this.createBatch();
366+
}
367+
368+
// Wait for all pending flushes to complete
369+
while (this.batchQueue.length > 0 || this.limiter.activeCount > 0) {
370+
await new Promise((resolve) => setTimeout(resolve, 100));
68371
}
69372
}
70-
}
373+
}

0 commit comments

Comments
 (0)