Skip to content

feat: add streaming I/O pipeline to daemon#15

Merged
mostafa merged 4 commits intomainfrom
feat/streaming-pipeline-level-1
Mar 30, 2026
Merged

feat: add streaming I/O pipeline to daemon#15
mostafa merged 4 commits intomainfrom
feat/streaming-pipeline-level-1

Conversation

@mostafa
Copy link
Copy Markdown
Member

@mostafa mostafa commented Mar 30, 2026

Summary

  • Refactor daemon from hardcoded stdin/stdout to a pluggable streaming pipeline via EventSource trait and Sink enum dispatch
  • Add --input (stdin, http, nats://) and --output (stdout, file://, nats://) CLI flags with fan-out support
  • New daemon-nats feature flag for NATS JetStream source/sink behind async-nats

Pipeline

[Source] --mpsc<String>--> [Engine] --mpsc<ProcessResult>--> [Sink]

Test plan

  • 4 integration tests: stdin->stdout, file output, fan-out, no-match silence
  • cargo clippy --all-features -- -D warnings clean
  • All 693 tests pass on both --features daemon and --all-features

Refactor the daemon from hardcoded stdin/stdout I/O to a pluggable
streaming architecture with EventSource trait and Sink enum dispatch.

Sources: StdinSource, HTTP POST /api/v1/events, NATS JetStream consumer
Sinks: StdoutSink, FileSink (NDJSON append), NatsSink (JetStream publish)
Fan-out: multiple --output flags via Sink::FanOut(Vec<Sink>)

Pipeline: source -> mpsc channel -> engine task -> mpsc channel -> sink task

Key changes:
- process_line() returns ProcessResult instead of writing to stdout
- New streaming module with EventSource trait and Sink enum
- --input flag (stdin, http, nats://) and --output flag (stdout, file://, nats://)
- daemon-nats feature flag with async-nats + tokio-stream dependencies
- 4 integration tests (stdin->stdout, file output, fan-out, no-match)
- README updated with streaming usage examples
@mostafa mostafa self-assigned this Mar 30, 2026
@mostafa mostafa changed the title feat: add streaming I/O pipeline to daemon (Level 1) feat: add streaming I/O pipeline to daemon Mar 30, 2026
mostafa added 3 commits March 31, 2026 00:16
rustls-webpki 0.102.x CRL matching bug is a transitive dependency of
async-nats 0.46.0 (latest). Cannot upgrade until async-nats releases
a version that depends on rustls-webpki 0.103.x.
Wrap synchronous sinks in block_in_place to avoid blocking the Tokio
runtime, skip empty ProcessResults before sending to sink channel,
use batch inc_by for metrics, derive NATS names from subject instead
of hardcoding, and store Subject as Arc<str> to avoid per-publish
String clones.
The sink task was spawned but never awaited, causing the process to
exit before the sink finished writing buffered results to file. This
fixes a race condition in the daemon_streaming_file_output CI test.
@mostafa mostafa force-pushed the feat/streaming-pipeline-level-1 branch from 6a7b4d1 to a32edf9 Compare March 30, 2026 22:42
@mostafa mostafa merged commit 99049f3 into main Mar 30, 2026
8 checks passed
@mostafa mostafa deleted the feat/streaming-pipeline-level-1 branch March 30, 2026 22:49
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant