Skip to content

New I/O node#81

Open
rgov wants to merge 7 commits into
mainfrom
rzg/io
Open

New I/O node#81
rgov wants to merge 7 commits into
mainfrom
rzg/io

Conversation

@rgov

@rgov rgov commented Jun 26, 2025

Copy link
Copy Markdown
Member

This pull request replaces the network_data_capture node with a new io_node, which provides approximately a superset of functionality of network_data_capture and bridge_node from ds_util_nodes.

An io_node instance is comprised of a Transport, a Framer, and an Extractor:

  • Transport -- handles a connection to an external data source
    • UDPTransport -- for UDP ports
    • SerialTransport -- for serial ports
  • Framer -- splits the data stream into units
    • RawFramer -- each read from the data stream returns a single complete unit
    • DelimitedFramer -- buffers data and splits on a delimiter, such as a line ending
    • JsonFramer -- buffers data and splits on fully formed JSON objects
  • Extractor -- extracts fields from each unit for publication as separate messages
    • DelimitedExtractor -- splits a unit on a delimiter and extracts specified indices
    • JsonExtractor -- parses a unit as JSON and extract specified fields by path (a subset of JSONPath)

An example config:

rbr_io_node:
    transport:
        type: serial
        port: /dev/ttyUSB0
        baudrate: 115200
    framing:
        type: delimited
        pattern: "\\r\\n|\\r|\\n"
    extractor:
        type: delimited
        pattern: ","
        fields:
            - name: depth
              type: float
              selector: "3"

Note

Extractors are intended for ad hoc data collection purposes but are not a substitute for a proper driver node whenever possible.

Raw messages are published to ~in, while parsed fields publish to ~in/<field_name>. The node also subscribes to ~out for outbound traffic.

Only a single I/O stream is supported per node. This is a simplification so that the node does not have to loop over connections and shuttle around complex state objects. An uncaught exception in one stream cannot disrupt the others. Where multiple connections are needed, multiple nodes should be executed through the launch file. (Configuration improvements will help to make this easier for operators.)

Configuration validation is provided by Pydantic, which provides reasonably human-friendly error messages.

  • Clean up io_node implementation
  • Add stats feature for parity with network_data_capture
  • Add sane defaults: line buffering for serial and raw for UDP.
  • Write unit tests for framers and extractors
  • Update example configuration schema
  • Replace existing users of network_data_capture and bridge_node

Closes #78 and resolves a blocker for #68.

@rgov rgov mentioned this pull request Dec 26, 2025
@figuernd

Copy link
Copy Markdown
Contributor

I like this approach, especially launching separate nodes per stream. Makes sense to me.

I'm a little confused on the extractor bit though: In the old network_data_capture the subtopics available were determined by the parsing method - does extractor likewise depend on framer used?

@figuernd

Copy link
Copy Markdown
Contributor

Configuration examples or docs would help

@rgov

rgov commented Jan 12, 2026

Copy link
Copy Markdown
Member Author

does extractor likewise depend on framer used

They are related. For example the framer splits on CSV lines but not on individual fields which have to be selected separately. If that makes sense?

We could change it so that there's one thing that splits records and canonicalizes them into a JSON structure, and then provide a jq-compatible syntax for mapping individual fields to topics.

(I'd love a different word than 'frame'.)

@figuernd figuernd marked this pull request as ready for review January 12, 2026 22:27
@figuernd figuernd self-requested a review January 12, 2026 22:27

@figuernd figuernd left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Documentation / usage examples are the main thing missing for me

else:
fr = RawFramer()
if isinstance(cfg.extractor, DelimitedExtractorConfig):
ex = DelimitedExtractor(cfg.extractor.pattern, cfg.extractor.fields)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah how would a delimited extractor work with a JsonFramer?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Evolve network_data_capture node

2 participants