Skip to content

Commit f81ed2e

Browse files
committed
line log generator
1 parent 2f252cf commit f81ed2e

File tree

3 files changed

+213
-0
lines changed

3 files changed

+213
-0
lines changed

lading_payload/src/block.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@ pub enum SpinError {
2323
/// Static payload creation error
2424
#[error(transparent)]
2525
Static(#[from] crate::statik::Error),
26+
/// Static line-rate payload creation error
27+
#[error(transparent)]
28+
StaticLinesPerSecond(#[from] crate::statik_line_rate::Error),
2629
/// rng slice is Empty
2730
#[error("RNG slice is empty")]
2831
EmptyRng,
@@ -55,6 +58,9 @@ pub enum Error {
5558
/// Static payload creation error
5659
#[error(transparent)]
5760
Static(#[from] crate::statik::Error),
61+
/// Static line-rate payload creation error
62+
#[error(transparent)]
63+
StaticLinesPerSecond(#[from] crate::statik_line_rate::Error),
5864
/// Error for crate deserialization
5965
#[error("Deserialization error: {0}")]
6066
Deserialize(#[from] crate::Error),
@@ -337,6 +343,21 @@ impl Cache {
337343
total_bytes.get(),
338344
)?
339345
}
346+
crate::Config::StaticLinesPerSecond {
347+
static_path,
348+
lines_per_second,
349+
} => {
350+
let span = span!(Level::INFO, "fixed", payload = "static-lines-per-second");
351+
let _guard = span.enter();
352+
let mut serializer =
353+
crate::StaticLinesPerSecond::new(static_path, *lines_per_second)?;
354+
construct_block_cache_inner(
355+
&mut rng,
356+
&mut serializer,
357+
maximum_block_bytes,
358+
total_bytes.get(),
359+
)?
360+
}
340361
crate::Config::OpentelemetryTraces => {
341362
let mut pyld = crate::OpentelemetryTraces::new(&mut rng);
342363
let span = span!(Level::INFO, "fixed", payload = "otel-traces");

lading_payload/src/lib.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ pub use opentelemetry::metric::OpentelemetryMetrics;
2727
pub use opentelemetry::trace::OpentelemetryTraces;
2828
pub use splunk_hec::SplunkHec;
2929
pub use statik::Static;
30+
pub use statik_line_rate::StaticLinesPerSecond;
3031
pub use syslog::Syslog5424;
3132

3233
pub mod apache_common;
@@ -40,6 +41,7 @@ pub mod opentelemetry;
4041
pub mod procfs;
4142
pub mod splunk_hec;
4243
pub mod statik;
44+
pub mod statik_line_rate;
4345
pub mod syslog;
4446
pub mod trace_agent;
4547

@@ -129,6 +131,14 @@ pub enum Config {
129131
/// assumed to be line-oriented but no other claim is made on the file.
130132
static_path: PathBuf,
131133
},
134+
/// Generates static data but limits the number of lines emitted per block
135+
StaticLinesPerSecond {
136+
/// Defines the file path to read static variant data from. Content is
137+
/// assumed to be line-oriented but no other claim is made on the file.
138+
static_path: PathBuf,
139+
/// Number of lines to emit in each generated block
140+
lines_per_second: u32,
141+
},
132142
/// Generates a line of printable ascii characters
133143
Ascii,
134144
/// Generates a json encoded line
@@ -167,6 +177,8 @@ pub enum Payload {
167177
SplunkHec(splunk_hec::SplunkHec),
168178
/// Static file content
169179
Static(Static),
180+
/// Static file content with a fixed number of lines emitted per block
181+
StaticLinesPerSecond(StaticLinesPerSecond),
170182
/// Syslog RFC 5424 format
171183
Syslog(Syslog5424),
172184
/// OpenTelemetry traces
@@ -195,6 +207,7 @@ impl Serialize for Payload {
195207
Payload::Json(ser) => ser.to_bytes(rng, max_bytes, writer),
196208
Payload::SplunkHec(ser) => ser.to_bytes(rng, max_bytes, writer),
197209
Payload::Static(ser) => ser.to_bytes(rng, max_bytes, writer),
210+
Payload::StaticLinesPerSecond(ser) => ser.to_bytes(rng, max_bytes, writer),
198211
Payload::Syslog(ser) => ser.to_bytes(rng, max_bytes, writer),
199212
Payload::OtelTraces(ser) => ser.to_bytes(rng, max_bytes, writer),
200213
Payload::OtelLogs(ser) => ser.to_bytes(rng, max_bytes, writer),
@@ -207,6 +220,7 @@ impl Serialize for Payload {
207220
fn data_points_generated(&self) -> Option<u64> {
208221
match self {
209222
Payload::OtelMetrics(ser) => ser.data_points_generated(),
223+
Payload::StaticLinesPerSecond(ser) => ser.data_points_generated(),
210224
// Other implementations use the default None
211225
_ => None,
212226
}
Lines changed: 178 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,178 @@
1+
//! Static file payload that replays a limited number of lines per block.
2+
3+
use std::{
4+
fs::{self, OpenOptions},
5+
io::{BufRead, BufReader, Write},
6+
num::NonZeroU32,
7+
path::Path,
8+
};
9+
10+
use rand::{Rng, seq::IndexedMutRandom};
11+
use tracing::debug;
12+
13+
#[derive(Debug)]
14+
struct Source {
15+
lines: Vec<Vec<u8>>,
16+
next_idx: usize,
17+
}
18+
19+
#[derive(Debug)]
20+
/// Static payload that emits a fixed number of lines each time it is asked to
21+
/// serialize.
22+
pub struct StaticLinesPerSecond {
23+
sources: Vec<Source>,
24+
lines_per_block: NonZeroU32,
25+
last_lines_generated: u64,
26+
}
27+
28+
#[derive(thiserror::Error, Debug)]
29+
/// Errors produced by [`StaticLinesPerSecond`].
30+
pub enum Error {
31+
/// IO error
32+
#[error(transparent)]
33+
Io(#[from] std::io::Error),
34+
/// No lines were discovered in the provided path
35+
#[error("No lines found in static path")]
36+
NoLines,
37+
/// The provided lines_per_second value was zero
38+
#[error("lines_per_second must be greater than zero")]
39+
ZeroLinesPerSecond,
40+
}
41+
42+
impl StaticLinesPerSecond {
43+
/// Create a new instance of `StaticLinesPerSecond`
44+
///
45+
/// # Errors
46+
///
47+
/// See documentation for [`Error`]
48+
pub fn new(path: &Path, lines_per_second: u32) -> Result<Self, Error> {
49+
let lines_per_block =
50+
NonZeroU32::new(lines_per_second).ok_or(Error::ZeroLinesPerSecond)?;
51+
52+
let mut sources = Vec::with_capacity(16);
53+
54+
let metadata = fs::metadata(path)?;
55+
if metadata.is_file() {
56+
debug!("Static path {} is a file.", path.display());
57+
let lines = read_lines(path)?;
58+
sources.push(Source { next_idx: 0, lines });
59+
} else if metadata.is_dir() {
60+
debug!("Static path {} is a directory.", path.display());
61+
for entry in fs::read_dir(path)? {
62+
let entry = entry?;
63+
let entry_pth = entry.path();
64+
debug!("Attempting to open {} as file.", entry_pth.display());
65+
if let Ok(file) = OpenOptions::new().read(true).open(&entry_pth) {
66+
let lines = read_lines_from_reader(file)?;
67+
sources.push(Source { next_idx: 0, lines });
68+
}
69+
}
70+
}
71+
72+
if sources.iter().all(|s| s.lines.is_empty()) {
73+
return Err(Error::NoLines);
74+
}
75+
76+
Ok(Self {
77+
sources,
78+
lines_per_block,
79+
last_lines_generated: 0,
80+
})
81+
}
82+
}
83+
84+
impl crate::Serialize for StaticLinesPerSecond {
85+
fn to_bytes<W, R>(
86+
&mut self,
87+
mut rng: R,
88+
max_bytes: usize,
89+
writer: &mut W,
90+
) -> Result<(), crate::Error>
91+
where
92+
R: Rng + Sized,
93+
W: Write,
94+
{
95+
self.last_lines_generated = 0;
96+
97+
let Some(source) = self.sources.choose_mut(&mut rng) else {
98+
return Ok(());
99+
};
100+
if source.lines.is_empty() {
101+
return Ok(());
102+
}
103+
104+
let mut bytes_written = 0usize;
105+
for _ in 0..self.lines_per_block.get() {
106+
let line = &source.lines[source.next_idx % source.lines.len()];
107+
let needed = line.len() + 1; // newline
108+
if bytes_written + needed > max_bytes {
109+
break;
110+
}
111+
112+
writer.write_all(line)?;
113+
writer.write_all(b"\n")?;
114+
bytes_written += needed;
115+
self.last_lines_generated += 1;
116+
source.next_idx = (source.next_idx + 1) % source.lines.len();
117+
}
118+
119+
Ok(())
120+
}
121+
122+
fn data_points_generated(&self) -> Option<u64> {
123+
Some(self.last_lines_generated)
124+
}
125+
}
126+
127+
fn read_lines(path: &Path) -> Result<Vec<Vec<u8>>, std::io::Error> {
128+
let file = OpenOptions::new().read(true).open(path)?;
129+
read_lines_from_reader(file)
130+
}
131+
132+
fn read_lines_from_reader<R: std::io::Read>(reader: R) -> Result<Vec<Vec<u8>>, std::io::Error> {
133+
let mut out = Vec::new();
134+
let mut reader = BufReader::new(reader);
135+
let mut buf = String::new();
136+
while {
137+
buf.clear();
138+
reader.read_line(&mut buf)?
139+
} != 0
140+
{
141+
if buf.ends_with('\n') {
142+
buf.pop();
143+
if buf.ends_with('\r') {
144+
buf.pop();
145+
}
146+
}
147+
out.push(buf.as_bytes().to_vec());
148+
}
149+
Ok(out)
150+
}
151+
152+
#[cfg(test)]
153+
mod tests {
154+
use super::*;
155+
use rand::{SeedableRng, rngs::StdRng};
156+
use std::{env, fs::File, io::Write as IoWrite};
157+
158+
#[test]
159+
fn writes_requested_number_of_lines() {
160+
let mut path = env::temp_dir();
161+
path.push("static_line_rate_test.txt");
162+
{
163+
let mut f = File::create(&path).unwrap();
164+
writeln!(f, "alpha").unwrap();
165+
writeln!(f, "beta").unwrap();
166+
writeln!(f, "gamma").unwrap();
167+
}
168+
169+
let mut serializer = StaticLinesPerSecond::new(&path, 2).unwrap();
170+
let mut buf = Vec::new();
171+
let mut rng = StdRng::seed_from_u64(42);
172+
173+
serializer.to_bytes(&mut rng, 1024, &mut buf).unwrap();
174+
assert_eq!(buf, b"alpha\nbeta\n");
175+
// Clean up
176+
let _ = std::fs::remove_file(&path);
177+
}
178+
}

0 commit comments

Comments
 (0)