Skip to content

feat(ucxx): single-chunk transfer + health-aware client rotation#695

Open
shuangwu wants to merge 2 commits into
nvidia-cosmos:mainfrom
shuangwu:feat/ucxx-single-chunk-rotation
Open

feat(ucxx): single-chunk transfer + health-aware client rotation#695
shuangwu wants to merge 2 commits into
nvidia-cosmos:mainfrom
shuangwu:feat/ucxx-single-chunk-rotation

Conversation

@shuangwu

@shuangwu shuangwu commented May 27, 2026

Copy link
Copy Markdown
Collaborator

Summary

Reworks the UCXX payload transport for correctness and simplicity, and removes a deprecated trainer-side mixin.

  • Single-chunk wire protocol per slot: one connection → send([slot])recv(status)recv(payload). The handler that does the slot's READY→READING transition uniquely owns its READING→{FREE|READY} finalisation — no refcounting, no shared cross-handler state.
  • Two structural invariants make orphan handlers harmless: (1) defensive slot-state transitions (mark_consumed/release_reading no-op with a warning on unexpected states); (2) single owner per read attempt (exactly one of mark_consumed/release_reading in finally).
  • Three-layer client retry stack: per-call port rotation + per-port quarantine on reset/truncate/timeout (UCXXClient.read); per-slot fresh-call retry with non-retryable StaleSlotError (_read_one); multi-round batch retry (_ucxx_dp_fetch_all).

Test plan

  • tests/test_ucxx_transport.py (single-chunk protocol, slot-state invariants, port quarantine/rotation)
  • tests/test_ucxx_data_packer_mixin.py

@shuangwu shuangwu force-pushed the feat/ucxx-single-chunk-rotation branch 2 times, most recently from 8170a0e to 7cb2df4 Compare June 2, 2026 21:19
shuangwu added 2 commits June 5, 2026 14:16
Add Slurm CI tooling and harden replica teardown so test_process_flow no
longer hangs for hours: fail-fast Redis stream polls during shutdown,
controller finalize on heartbeat-death reap (with a startup guard so SFT
controllers do not self-destruct before workers register), bounded HTTP
timeouts, teardown progress logging, and a bounded test-side wait with
faulthandler diagnostics.
Reworks the UCXX payload transport for correctness and
simplicity, and removes a deprecated trainer-side mixin.

**Wire protocol (single chunk per slot).**  One connection, one
``send([slot])``, one ``recv(status)``, one ``recv(payload)``.
The handler that performs the slot's ``READY -> READING``
transition is the unique owner of its
``READING -> {FREE | READY}`` finalisation for that read
attempt.  No refcounting, no shared cross-handler state, no
multi-chunk fan-out.

**Two structural invariants** make orphan handlers harmless:

1. *Defensive slot-state transitions.*
   ``SharedRingBuffer.mark_consumed`` and ``release_reading``
   no-op (with a warning) on unexpected slot states instead of
   clobbering them.  A late finalise from an orphan handler --
   e.g. one still mid-``send`` after the client rotated to a
   different port -- cannot silently undo a writer's recycle.

2. *Single owner per read attempt.*  Each handler calls exactly
   one of ``mark_consumed`` (success) or ``release_reading``
   (failure) in its ``finally`` block.

**Three-layer client retry stack.**

* ``UCXXClient.read``: per-call port rotation with on-timeout
  fallback, plus a per-port skip-list so ports that have just
  produced ``UCXXConnectionResetError`` /
  ``UCXXMessageTruncated`` / timeout are quarantined for
  ``_PORT_QUARANTINE_SEC`` and skipped on subsequent calls
  while at least one healthy port remains.
* ``UCXXDataPackerMixin._read_one``: per-slot fresh-call retry,
  with ``StaleSlotError`` flagged non-retryable so a recycled
  slot is not re-fetched in the same batch.
* ``UCXXDataPackerMixin._ucxx_dp_fetch_all``: multi-round batch
  retry (``_MAX_FETCH_ROUNDS``).

**Audit-pass fixes.**

* Stop server-thread busy-spin: shutdown watchdog now sleeps
  for 50 ms instead of yielding via ``asyncio.sleep(0)``.
* ``SharedRingBuffer.get_ready_count`` actually counts
  ``READY`` slots instead of returning the saturating
  ``entry_count`` header.
* ``UCXXClient`` preemptively closes pooled endpoints older
  than ``_POOL_ENDPOINT_MAX_AGE_S`` on checkout, replacing
  them with fresh connections so a client never tries to
  reuse an endpoint the server has already idle-evicted.
* Demote the per-request ``[UCXXBuffer] req slot=...`` log
  from INFO to DEBUG.

**Cleanup.**  Removes the deprecated ``UCXXTrainerMixin``
(superseded by ``UCXXDataPackerMixin``) and the
``ucxx_n_chunks`` config knob.  ``read_raw`` still tolerates
re-entry from ``READING`` -- not for chunking, but to let a
fresh handler serve a slot when the original is still
mid-``send``; the defensive guards above make the duplicate
finalise harmless.

**Docs.**  ``ucxx/__init__.py`` and ``rl-gym/docs/UCXX.md``
rewritten with the slot-lifecycle diagram, the three-layer
retry table, port skip-list semantics, and the
preemptive-eviction self-healing window.

Tests: 55 unit tests in ``tests/test_ucxx_transport.py`` and
``tests/test_ucxx_data_packer_mixin.py`` covering the
defensive primitive guards, ``get_ready_count`` correctness,
stale-pooled-endpoint replacement, and the three-layer retry
behaviour.
@shuangwu shuangwu force-pushed the feat/ucxx-single-chunk-rotation branch from 7cb2df4 to 0a8b64c Compare June 6, 2026 16:54
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant