Skip to content

[Bug] JSONStreamDatasource locks first-batch schema and fails on later null -> concrete type evolution #936

@Mark-Wu2003

Description

@Mark-Wu2003

[Bug] JSONStreamDatasource fixes schema from the first batch and fails on later null -> concrete type evolution

Summary

Data-Juicer's custom JSON stream reader appears to lock schema from the first batch in data_juicer.core.data.ray_dataset.JSONStreamDatasource._read_stream.

When an early batch infers a nested field as null (for example meta.url = null) and a later batch introduces a concrete type for the same field (for example string), the reader does not reconcile schema across batches and can fail with errors such as:

Couldn't cast array of type ... to null

or an Arrow schema mismatch during ingestion.

This is a correctness + stability bug in DJ's custom JSON streaming ingestion path, not just a configuration issue.

  • Component: data_juicer.core.data.ray_dataset.JSONStreamDatasource._read_stream
  • Severity suggestion: P1
  • Impact: pipeline / ingestion stage can fail on otherwise valid mixed-null JSONL

Environment

  • OS: Ubuntu 24.04.2 LTS
  • Kernel: Linux 5.15.0-119-generic x86_64
  • Python: 3.12
  • Ray: 2.54.0
  • PyArrow: 23.0.1
  • Data-Juicer: 1.5.0
  • Reproduced in: standalone script

Why this looks like a DJ bug

In DJ source, RayDataset.read_json() is explicitly marked as a temporary solution for JSON stream reading and has a TODO to replace it with ray.data.read_json_stream once available.

In JSONStreamDatasource._read_stream, schema starts as None, then after the first batch DJ does:

schema = None
while True:
    batch = reader.read_next_batch()
    table = pyarrow.Table.from_batches([batch], schema=schema)
    if schema is None:
        schema = table.schema
    yield table

This means the first batch can determine the schema used for later batches. If the first batch infers a field as null, later batches with concrete types may fail instead of being reconciled.

Expected behavior

Valid mixed-null JSONL should remain readable in stream mode.

Examples of acceptable behavior:

  • allow null -> string/int/struct evolution across batches when the data is otherwise valid;
  • reconcile / widen schema across batches;
  • or at minimum raise a clearer error that the custom stream path does not support this schema evolution.

Actual behavior

The custom DJ stream path can fail on inputs where:

  • earlier stream batches contain only null for a field;
  • later batches introduce a concrete type for the same field.

Observed errors include:

Couldn't cast array of type ... to null

and related Arrow schema mismatch failures.

Minimal reproduction

"""Minimal repro for Data-Juicer JSONStreamDatasource schema-lock bug.

Expected behavior on affected DJ versions:
- DJ read_json_stream fails on mixed null/non-null nested field values.
- Ray native read_json succeeds on the same file.
"""

from __future__ import annotations

import json
import tempfile
from pathlib import Path

import pyarrow.json as paj
import ray
from data_juicer.core.data.ray_dataset import read_json_stream


def _write_payload(path: Path) -> None:
    rows = []
    # First batches contain null-only nested field.
    for i in range(30):
        rows.append({"id": i, "meta": {"url": None}})
    # Later batch introduces concrete type.
    rows.append({"id": 999, "meta": {"url": "https://example.com"}})
    with path.open("w", encoding="utf-8") as f:
        for row in rows:
            f.write(json.dumps(row, ensure_ascii=False) + "\n")


def main() -> None:
    ray.init(ignore_reinit_error=True)
    try:
        tmp_dir = Path(tempfile.mkdtemp(prefix="dj_repro_"))
        payload = tmp_dir / "mixed_schema.jsonl"
        _write_payload(payload)
        print("payload =", payload)

        print("\n[DJ read_json_stream]")
        try:
            ds = read_json_stream(
                str(payload),
                override_num_blocks=1,
                # Small block size forces multiple read batches in stream mode.
                read_options=paj.ReadOptions(use_threads=False, block_size=256),
            )
            print("dj count =", ds.count())
            print("dj schema =", ds.schema())
        except Exception as exc:  # noqa: BLE001
            print("dj error =", type(exc).__name__)
            print(exc)

        print("\n[Ray native read_json]")
        ds2 = ray.data.read_json(str(payload), override_num_blocks=1)
        print("ray count =", ds2.count())
        print("ray schema =", ds2.schema())
    finally:
        ray.shutdown()


if __name__ == "__main__":
    main()

Reproduction scripts

  1. DJ bug repro (DJ fails, Ray native succeeds on same input):
    tmp/debug/repro_dj_jsonstream_schema_lock.py

Run:

conda run -n LLMData --no-capture-output python /Users/markwu/Pycharm/LLMdata/tmp/debug/repro_dj_jsonstream_schema_lock.py
  1. Ray _backfill_missing_fields TypeError repro:
    tmp/debug/repro_ray_backfill_typeerror.py

Run:

conda run -n LLMData --no-capture-output python /Users/markwu/Pycharm/LLMdata/tmp/debug/repro_ray_backfill_typeerror.py

Downstream mitigations currently used (workarounds, not root fix)

We currently apply multiple downstream mitigations in our pipeline wrapper to keep production stable:

  1. Bypass DJ read_json_stream to ray.data.read_json when possible, to avoid JSONStreamDatasource stream-read failures on mixed-schema JSONL.
  2. Auto-infer and sanitize explicit Arrow schema (drop null-only unstable paths, keep known stable fields typed), then pass it through parsing options.
  3. Patch BTS deduplicator map_batches kwargs to forward per-op num_cpus, so Ray task resource requests are honored.
  4. Patch ray.init caps and tune Ray Data context (read_op_min_num_blocks, min_parallelism, block sizing, streaming buffer) for stability under large runs.

These mitigations are implemented in:

finance_cleaning/utils/dj_process_capped.py

These workarounds unblock production jobs, but they do not fix the underlying schema-handling behavior in DJ's custom JSON stream reader.

Possible fix directions

  1. Avoid fixing schema from the first batch in JSONStreamDatasource._read_stream.
  2. Reconcile schemas across batches instead of reusing the first inferred schema unchanged.
  3. For valid cases such as null -> string/int, allow widening / permissive promotion across batches.
  4. Prefer the official Ray JSON reading path when equivalent functionality is available, to reduce behavior drift between DJ custom ingestion and Ray native ingestion.

Additional context

We currently see two distinct classes of failures in this area:

  1. DJ JSON stream schema-lock bug: first-batch schema fixation causes cast ... to null / schema mismatch failures on later concrete values.
  2. Ray Data struct backfill robustness bug: separate failure mode in _backfill_missing_fields for struct reconciliation.

This issue is specifically about the first one: DJ's custom JSON stream ingestion behavior.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions