Skip to content

Commit 22c38bd

Browse files
committed
line log generator
1 parent 2f252cf commit 22c38bd

File tree

7 files changed

+478
-0
lines changed

7 files changed

+478
-0
lines changed

Cargo.lock

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

lading/src/generator/file_gen/traditional.rs

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ use tokio::{
3030
fs,
3131
io::{AsyncWriteExt, BufWriter},
3232
task::{JoinError, JoinSet},
33+
time::{Duration, Instant},
3334
};
3435
use tracing::{error, info};
3536

@@ -120,6 +121,13 @@ pub struct Config {
120121
rotate: bool,
121122
/// The load throttle configuration
122123
pub throttle: Option<BytesThrottleConfig>,
124+
/// Optional fixed interval between blocks. When set, the generator waits
125+
/// this duration before emitting the next block, regardless of byte size.
126+
pub block_interval_millis: Option<u64>,
127+
/// Flush after each block. Useful when block intervals are large and the
128+
/// buffered writer would otherwise delay writes to disk.
129+
#[serde(default)]
130+
pub flush_each_block: bool,
123131
}
124132

125133
#[derive(Debug)]
@@ -195,6 +203,10 @@ impl Server {
195203
file_index: Arc::clone(&file_index),
196204
rotate: config.rotate,
197205
shutdown: shutdown.clone(),
206+
block_interval: config
207+
.block_interval_millis
208+
.map(Duration::from_millis),
209+
flush_each_block: config.flush_each_block,
198210
};
199211

200212
handles.spawn(child.spin());
@@ -269,6 +281,8 @@ struct Child {
269281
rotate: bool,
270282
file_index: Arc<AtomicU32>,
271283
shutdown: lading_signal::Watcher,
284+
block_interval: Option<Duration>,
285+
flush_each_block: bool,
272286
}
273287

274288
impl Child {
@@ -303,13 +317,33 @@ impl Child {
303317

304318
let shutdown_wait = self.shutdown.recv();
305319
tokio::pin!(shutdown_wait);
320+
let mut next_tick = self
321+
.block_interval
322+
.as_ref()
323+
.map(|dur| Instant::now() + *dur);
306324
loop {
307325
let total_bytes = self.block_cache.peek_next_size(&handle);
308326

309327
tokio::select! {
310328
result = self.throttle.wait_for(total_bytes) => {
311329
match result {
312330
Ok(()) => {
331+
if let Some(dur) = self.block_interval {
332+
if let Some(deadline) = next_tick {
333+
tokio::select! {
334+
_ = tokio::time::sleep_until(deadline) => {},
335+
() = &mut shutdown_wait => {
336+
fp.flush().await?;
337+
info!("shutdown signal received");
338+
return Ok(());
339+
},
340+
}
341+
next_tick = Some(deadline + dur);
342+
} else {
343+
next_tick = Some(Instant::now() + dur);
344+
}
345+
}
346+
313347
let block = self.block_cache.advance(&mut handle);
314348
let total_bytes = u64::from(total_bytes.get());
315349

@@ -318,6 +352,9 @@ impl Child {
318352
counter!("bytes_written").increment(total_bytes);
319353
total_bytes_written += total_bytes;
320354
}
355+
if self.flush_each_block {
356+
fp.flush().await?;
357+
}
321358

322359
if total_bytes_written > maximum_bytes_per_file {
323360
fp.flush().await?;

lading_payload/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ rand = { workspace = true, features = [
2828
"std",
2929
"std_rng",
3030
] }
31+
chrono = { version = "0.4", default-features = true, features = ["std"] }
3132
rmp-serde = { version = "1.1", default-features = false }
3233
serde = { workspace = true }
3334
serde_json = { workspace = true }

lading_payload/src/block.rs

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,12 @@ pub enum SpinError {
2323
/// Static payload creation error
2424
#[error(transparent)]
2525
Static(#[from] crate::statik::Error),
26+
/// Static line-rate payload creation error
27+
#[error(transparent)]
28+
StaticLinesPerSecond(#[from] crate::statik_line_rate::Error),
29+
/// Static second-grouped payload creation error
30+
#[error(transparent)]
31+
StaticSecond(#[from] crate::statik_second::Error),
2632
/// rng slice is Empty
2733
#[error("RNG slice is empty")]
2834
EmptyRng,
@@ -55,6 +61,12 @@ pub enum Error {
5561
/// Static payload creation error
5662
#[error(transparent)]
5763
Static(#[from] crate::statik::Error),
64+
/// Static line-rate payload creation error
65+
#[error(transparent)]
66+
StaticLinesPerSecond(#[from] crate::statik_line_rate::Error),
67+
/// Static second-grouped payload creation error
68+
#[error(transparent)]
69+
StaticSecond(#[from] crate::statik_second::Error),
5870
/// Error for crate deserialization
5971
#[error("Deserialization error: {0}")]
6072
Deserialize(#[from] crate::Error),
@@ -337,6 +349,37 @@ impl Cache {
337349
total_bytes.get(),
338350
)?
339351
}
352+
crate::Config::StaticLinesPerSecond {
353+
static_path,
354+
lines_per_second,
355+
} => {
356+
let span = span!(Level::INFO, "fixed", payload = "static-lines-per-second");
357+
let _guard = span.enter();
358+
let mut serializer =
359+
crate::StaticLinesPerSecond::new(static_path, *lines_per_second)?;
360+
construct_block_cache_inner(
361+
&mut rng,
362+
&mut serializer,
363+
maximum_block_bytes,
364+
total_bytes.get(),
365+
)?
366+
}
367+
crate::Config::StaticSecond {
368+
static_path,
369+
timestamp_format,
370+
emit_placeholder,
371+
} => {
372+
let span = span!(Level::INFO, "fixed", payload = "static-second");
373+
let _guard = span.enter();
374+
let mut serializer =
375+
crate::StaticSecond::new(static_path, &timestamp_format, *emit_placeholder)?;
376+
construct_block_cache_inner(
377+
&mut rng,
378+
&mut serializer,
379+
maximum_block_bytes,
380+
total_bytes.get(),
381+
)?
382+
}
340383
crate::Config::OpentelemetryTraces => {
341384
let mut pyld = crate::OpentelemetryTraces::new(&mut rng);
342385
let span = span!(Level::INFO, "fixed", payload = "otel-traces");

lading_payload/src/lib.rs

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ pub use opentelemetry::metric::OpentelemetryMetrics;
2727
pub use opentelemetry::trace::OpentelemetryTraces;
2828
pub use splunk_hec::SplunkHec;
2929
pub use statik::Static;
30+
pub use statik_second::StaticSecond;
31+
pub use statik_line_rate::StaticLinesPerSecond;
3032
pub use syslog::Syslog5424;
3133

3234
pub mod apache_common;
@@ -40,6 +42,8 @@ pub mod opentelemetry;
4042
pub mod procfs;
4143
pub mod splunk_hec;
4244
pub mod statik;
45+
pub mod statik_second;
46+
pub mod statik_line_rate;
4347
pub mod syslog;
4448
pub mod trace_agent;
4549

@@ -129,6 +133,28 @@ pub enum Config {
129133
/// assumed to be line-oriented but no other claim is made on the file.
130134
static_path: PathBuf,
131135
},
136+
/// Generates static data but limits the number of lines emitted per block
137+
StaticLinesPerSecond {
138+
/// Defines the file path to read static variant data from. Content is
139+
/// assumed to be line-oriented but no other claim is made on the file.
140+
static_path: PathBuf,
141+
/// Number of lines to emit in each generated block
142+
lines_per_second: u32,
143+
},
144+
/// Generates static data grouped by second; each block contains one
145+
/// second's worth of logs as determined by a parsed timestamp prefix.
146+
StaticSecond {
147+
/// Defines the file path to read static variant data from. Content is
148+
/// assumed to be line-oriented.
149+
static_path: PathBuf,
150+
/// Chrono-compatible timestamp format string used to parse the leading
151+
/// timestamp in each line.
152+
timestamp_format: String,
153+
/// Emit a minimal placeholder block (single newline) for seconds with
154+
/// no lines. When false, empty seconds are skipped.
155+
#[serde(default)]
156+
emit_placeholder: bool,
157+
},
132158
/// Generates a line of printable ascii characters
133159
Ascii,
134160
/// Generates a json encoded line
@@ -167,6 +193,10 @@ pub enum Payload {
167193
SplunkHec(splunk_hec::SplunkHec),
168194
/// Static file content
169195
Static(Static),
196+
/// Static file content with a fixed number of lines emitted per block
197+
StaticLinesPerSecond(StaticLinesPerSecond),
198+
/// Static file content grouped into one-second blocks based on timestamps
199+
StaticSecond(StaticSecond),
170200
/// Syslog RFC 5424 format
171201
Syslog(Syslog5424),
172202
/// OpenTelemetry traces
@@ -195,6 +225,8 @@ impl Serialize for Payload {
195225
Payload::Json(ser) => ser.to_bytes(rng, max_bytes, writer),
196226
Payload::SplunkHec(ser) => ser.to_bytes(rng, max_bytes, writer),
197227
Payload::Static(ser) => ser.to_bytes(rng, max_bytes, writer),
228+
Payload::StaticLinesPerSecond(ser) => ser.to_bytes(rng, max_bytes, writer),
229+
Payload::StaticSecond(ser) => ser.to_bytes(rng, max_bytes, writer),
198230
Payload::Syslog(ser) => ser.to_bytes(rng, max_bytes, writer),
199231
Payload::OtelTraces(ser) => ser.to_bytes(rng, max_bytes, writer),
200232
Payload::OtelLogs(ser) => ser.to_bytes(rng, max_bytes, writer),
@@ -207,6 +239,8 @@ impl Serialize for Payload {
207239
fn data_points_generated(&self) -> Option<u64> {
208240
match self {
209241
Payload::OtelMetrics(ser) => ser.data_points_generated(),
242+
Payload::StaticLinesPerSecond(ser) => ser.data_points_generated(),
243+
Payload::StaticSecond(ser) => ser.data_points_generated(),
210244
// Other implementations use the default None
211245
_ => None,
212246
}

0 commit comments

Comments
 (0)