Skip to content
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
4c60148
fixed inference skipping
guipenedo Nov 7, 2025
97bd686
nit
guipenedo Nov 7, 2025
00045c6
nit
guipenedo Nov 7, 2025
16d20e0
refactored inf runner
guipenedo Nov 11, 2025
8a2f019
style
guipenedo Nov 11, 2025
9819f29
drop documents with 0 successful rollouts
guipenedo Nov 11, 2025
f798ddd
add requests cache
guipenedo Nov 12, 2025
50f3157
nit
guipenedo Nov 12, 2025
c1e0d60
perf improvements (less aggressive fs hits)
guipenedo Nov 12, 2025
9358be6
improved writes with queue
guipenedo Nov 12, 2025
fb01939
aiosqlite
guipenedo Nov 12, 2025
fb8413e
tmp sync on cluster
hynky1999 Nov 14, 2025
f5d97af
working version for slurm
Nov 17, 2025
023c538
fix master node import
Nov 17, 2025
1876c8b
capture output of ray stop
Nov 17, 2025
4ea13d8
sync locally
Nov 19, 2025
5e81239
final polishes
hynky1999 Nov 19, 2025
f92057b
nit condition during distributed check
Nov 19, 2025
a7752bd
Merger with main
hynky1999 Nov 19, 2025
1914b1d
push ray
hynky1999 Nov 20, 2025
b20218c
fix issues with vllm and sglang on slurm
Nov 20, 2025
51c61e7
Merge branch 'multi-node-inference' of github.com:huggingface/datatro…
Nov 20, 2025
0282f60
get ray + sglang working
Nov 20, 2025
46e0d50
logging node in multinode, fixes from debugging + prettier
hynky1999 Nov 20, 2025
261f680
prettier
hynky1999 Nov 20, 2025
0d654af
removed auto restart and distributed coordinator + small nits
guipenedo Nov 21, 2025
5278c66
remove wal
guipenedo Nov 21, 2025
e212a7f
envs vars consistency + vllm master node tracking or nodes
Nov 24, 2025
45b5eaa
fmt
hynky1999 Nov 24, 2025
ba8ad67
add example
hynky1999 Nov 24, 2025
0715bff
make ray checks async and with lnoger timeout
Nov 25, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ inference = [
"aiosqlite",
]
ray = [
"ray"
"ray[default]"
]
quality = [
"ruff>=0.1.5"
Expand Down
22 changes: 13 additions & 9 deletions src/datatrove/executor/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,22 +62,26 @@ def world_size(self) -> int:
"""
return 0

def _run_for_rank(self, rank: int, local_rank: int = 0) -> PipelineStats:
def _run_for_rank(self, rank: int, local_rank: int = 0, node_rank: int = 0) -> PipelineStats:
"""
Main executor's method. Sets up logging, pipes data from each pipeline step to the next, saves statistics
and marks tasks as completed.
and marks tasks as completed. We assume node_rank == 0 is the master node.
Completion is only marked on the master node, all other nodes are ignored in terms of job completion as we use 1-master, many-workers mode.
In this case it's master responsibility to check for workers completion and mark the job as complete.
Args:
rank: the rank that we want to run the pipeline for
local_rank: at the moment this is only used for logging.
Any task with local_rank != 0 will not print logs to console.
node_rank: node rank/ID for logging prefix. Logs will be prefixed with [NODE X], we assume node_rank == 0 is the master node.

Returns: the stats for this task

"""
if self.is_rank_completed(rank):
logger.info(f"Skipping {rank=} as it has already been completed.")
return PipelineStats()
logfile = add_task_logger(self.logging_dir, rank, local_rank)

logfile = add_task_logger(self.logging_dir, rank, local_rank, node_rank=node_rank)
log_pipeline(self.pipeline)

if self.randomize_start_duration > 0:
Expand All @@ -97,13 +101,13 @@ def _run_for_rank(self, rank: int, local_rank: int = 0) -> PipelineStats:

logger.success(f"Processing done for {rank=}")

# stats
# stats - only save on master node in distributed setting
stats = PipelineStats(self.pipeline)
with self.logging_dir.open(f"stats/{rank:05d}.json", "w") as f:
stats.save_to_disk(f)
logger.info(stats.get_repr(f"Task {rank}"))
# completed
self.mark_rank_as_completed(rank)
if node_rank == 0:
with self.logging_dir.open(f"stats/{rank:05d}.json", "w") as f:
stats.save_to_disk(f)
logger.info(stats.get_repr(f"Task {rank}"))
self.mark_rank_as_completed(rank)
except Exception as e:
logger.exception(e)
raise e
Expand Down
Loading
Loading