Skip to content

Add optional alignment step to NMT jobs, temporary implementation of eflomal #169

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 17 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .devcontainer/dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ RUN apt-get update && \
apt-get install --no-install-recommends -y \
python$PYTHON_VERSION \
python$PYTHON_VERSION-distutils \
python$PYTHON_VERSION-dev \
git vim curl gdb ca-certificates gnupg2 tar make gcc libssl-dev zlib1g-dev libncurses5-dev \
libbz2-dev libreadline-dev libreadline6-dev libxml2-dev xz-utils libgdbm-dev libgdbm-compat-dev tk-dev dirmngr \
libxmlsec1-dev libsqlite3-dev libffi-dev liblzma-dev lzma lzma-dev uuid-dev && \
Expand All @@ -39,4 +40,6 @@ RUN pip install -U pip setuptools \

COPY ./.devcontainer/clearml.conf /root/clearml.conf

ENV EFLOMAL_PATH=/workspaces/machine.py/.venv/lib/python${PYTHON_VERSION}/site-packages/eflomal/bin

CMD ["bash"]
2 changes: 2 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ jobs:
poetry run pyright
- name: Test with pytest
run: poetry run pytest --cov --cov-report=xml
env:
EFLOMAL_PATH: /home/runner/work/machine.py/machine.py/.venv/lib/python${{ matrix.python-version }}/site-packages/eflomal/bin
- name: Upload coverage reports to Codecov
uses: codecov/codecov-action@v4
env:
Expand Down
6 changes: 3 additions & 3 deletions dockerfile
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
# syntax=docker/dockerfile:1.7-labs

ARG PYTHON_VERSION=3.12
ARG UBUNTU_VERSION=noble
ARG POETRY_VERSION=1.6.1
ARG CUDA_VERSION=12.6.1-base-ubuntu24.04

FROM python:$PYTHON_VERSION-slim AS builder
ARG POETRY_VERSION
Expand All @@ -25,7 +23,7 @@ COPY poetry.lock pyproject.toml /src
RUN poetry export --with=gpu --without-hashes -f requirements.txt > requirements.txt


FROM nvidia/cuda:$CUDA_VERSION
FROM python:$PYTHON_VERSION
ARG PYTHON_VERSION

ENV PIP_DISABLE_PIP_VERSION_CHECK=on
Expand Down Expand Up @@ -64,4 +62,6 @@ RUN --mount=type=cache,target=/root/.cache \
RUN python -m pip install --no-deps . && rm -r /root/*
ENV CLEARML_AGENT_SKIP_PYTHON_ENV_INSTALL=1

ENV EFLOMAL_PATH=/usr/local/lib/python${PYTHON_VERSION}/site-packages/eflomal/bin

CMD ["bash"]
2 changes: 2 additions & 0 deletions dockerfile.cpu_only
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,6 @@ RUN --mount=type=cache,target=/root/.cache \
RUN python -m pip install --no-deps . && rm -r /root/*
ENV CLEARML_AGENT_SKIP_PYTHON_ENV_INSTALL=1

ENV EFLOMAL_PATH=/usr/local/lib/python${PYTHON_VERSION}/site-packages/eflomal/bin

CMD ["bash"]
167 changes: 167 additions & 0 deletions machine/jobs/eflomal_aligner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
# NOTE: this is a temporary solution to be able to use the eflomal aligner inside of machine.py.
# The vast majority of this code is taken from the silnlp repository.

import os
import subprocess
from contextlib import ExitStack
from importlib.util import find_spec
from math import sqrt
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import IO, Iterable, List, Sequence, Tuple

from ..corpora import AlignedWordPair
from ..corpora.token_processors import escape_spaces, lowercase, normalize
from ..tokenization import LatinWordTokenizer
from ..translation import SymmetrizationHeuristic, WordAlignmentMatrix


# From silnlp.common.package_utils
def is_eflomal_available() -> bool:
return find_spec("eflomal") is not None


if is_eflomal_available():
from eflomal import read_text, write_text # type: ignore

EFLOMAL_PATH = Path(os.getenv("EFLOMAL_PATH", "."), "eflomal")
TOKENIZER = LatinWordTokenizer()


# From silnlp.alignment.tools
def execute_eflomal(
source_path: Path,
target_path: Path,
forward_links_path: Path,
reverse_links_path: Path,
n_iterations: Tuple[int, int, int],
) -> None:
if not is_eflomal_available():
raise RuntimeError("eflomal is not installed.")

args = [
str(EFLOMAL_PATH),
"-s",
str(source_path),
"-t",
str(target_path),
"-f",
str(forward_links_path),
"-r",
str(reverse_links_path),
# "-q",
"-m",
"3",
"-n",
"3",
"-N",
"0.2",
"-1",
str(n_iterations[0]),
"-2",
str(n_iterations[1]),
"-3",
str(n_iterations[2]),
]
subprocess.run(args, stderr=subprocess.DEVNULL)


# From silnlp.alignment.eflomal
def to_word_alignment_matrix(alignment_str: str) -> WordAlignmentMatrix:
word_pairs = AlignedWordPair.from_string(alignment_str)
row_count = 0
column_count = 0
for pair in word_pairs:
if pair.source_index + 1 > row_count:
row_count = pair.source_index + 1
if pair.target_index + 1 > column_count:
column_count = pair.target_index + 1
return WordAlignmentMatrix.from_word_pairs(row_count, column_count, word_pairs)


# From silnlp.alignment.eflomal
def to_eflomal_text_file(input: Iterable[str], output_file: IO[bytes], prefix_len: int = 0, suffix_len: int = 0) -> int:
sents, index = read_text(input, True, prefix_len, suffix_len)
n_sents = len(sents)
voc_size = len(index)
write_text(output_file, tuple(sents), voc_size)
return n_sents


# From silnlp.alignment.eflomal
def prepare_files(
src_input: Iterable[str], src_output_file: IO[bytes], trg_input: Iterable[str], trg_output_file: IO[bytes]
) -> int:
n_src_sents = to_eflomal_text_file(src_input, src_output_file)
n_trg_sents = to_eflomal_text_file(trg_input, trg_output_file)
if n_src_sents != n_trg_sents:
raise ValueError("Mismatched file sizes")
return n_src_sents


def tokenize(sent: str) -> Sequence[str]:
return list(TOKENIZER.tokenize(sent))


def normalize_for_alignment(sent: Sequence[str]) -> str:
return " ".join(lowercase(normalize("NFC", escape_spaces(sent))))


# From silnlp.alignment.eflomal
class EflomalAligner:
def __init__(self, model_dir: Path) -> None:
self._model_dir = model_dir

def train(self, src_toks: Sequence[Sequence[str]], trg_toks: Sequence[Sequence[str]]) -> None:
self._model_dir.mkdir(exist_ok=True)
with TemporaryDirectory() as temp_dir:
src_eflomal_path = Path(temp_dir, "source")
trg_eflomal_path = Path(temp_dir, "target")
with ExitStack() as stack:
src_output_file = stack.enter_context(src_eflomal_path.open("wb"))
trg_output_file = stack.enter_context(trg_eflomal_path.open("wb"))
# Write input files for the eflomal binary
n_sentences = prepare_files(
[normalize_for_alignment(s) for s in src_toks],
src_output_file,
[normalize_for_alignment(s) for s in trg_toks],
trg_output_file,
)

iters = max(2, int(round(1.0 * 5000 / sqrt(n_sentences))))
iters4 = max(1, iters // 4)
n_iterations = (max(2, iters4), iters4, iters)

# Run wrapper for the eflomal binary
execute_eflomal(
src_eflomal_path,
trg_eflomal_path,
self._model_dir / "forward-align.txt",
self._model_dir / "reverse-align.txt",
n_iterations,
)

def align(self, sym_heuristic: str = "grow-diag-final-and") -> List[str]:
forward_align_path = self._model_dir / "forward-align.txt"
reverse_align_path = self._model_dir / "reverse-align.txt"

alignments = []
heuristic = SymmetrizationHeuristic[sym_heuristic.upper().replace("-", "_")]
with ExitStack() as stack:
forward_file = stack.enter_context(forward_align_path.open("r", encoding="utf-8-sig"))
reverse_file = stack.enter_context(reverse_align_path.open("r", encoding="utf-8-sig"))

for forward_line, reverse_line in zip(forward_file, reverse_file):
forward_matrix = to_word_alignment_matrix(forward_line.strip())
reverse_matrix = to_word_alignment_matrix(reverse_line.strip())
src_len = max(forward_matrix.row_count, reverse_matrix.row_count)
trg_len = max(forward_matrix.column_count, reverse_matrix.column_count)

forward_matrix.resize(src_len, trg_len)
reverse_matrix.resize(src_len, trg_len)

forward_matrix.symmetrize_with(reverse_matrix, heuristic)

alignments.append(str(forward_matrix))

return alignments
97 changes: 74 additions & 23 deletions machine/jobs/nmt_engine_build_job.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
import logging
from contextlib import ExitStack
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import Any, Callable, Optional, Sequence, Tuple

from ..corpora.corpora_utils import batch
from ..corpora.parallel_text_corpus import ParallelTextCorpus
from ..corpora.text_corpus import TextCorpus
from ..translation.translation_engine import TranslationEngine
from ..utils.phased_progress_reporter import Phase, PhasedProgressReporter
from ..utils.progress_status import ProgressStatus
from .eflomal_aligner import EflomalAligner, is_eflomal_available, tokenize
from .nmt_model_factory import NmtModelFactory
from .shared_file_service_base import DictToJsonWriter
from .translation_engine_build_job import TranslationEngineBuildJob
from .translation_file_service import PretranslationInfo, TranslationFileService

Expand All @@ -28,12 +29,25 @@ def _get_progress_reporter(
self, progress: Optional[Callable[[ProgressStatus], None]], corpus_size: int
) -> PhasedProgressReporter:
if corpus_size > 0:
phases = [
Phase(message="Training NMT model", percentage=0.9),
Phase(message="Pretranslating segments", percentage=0.1),
]
if self._config.align_pretranslations:
phases = [
Phase(message="Training NMT model", percentage=0.8),
Phase(message="Pretranslating segments", percentage=0.1),
Phase(message="Aligning segments", percentage=0.1, report_steps=False),
]
else:
phases = [
Phase(message="Training NMT model", percentage=0.9),
Phase(message="Pretranslating segments", percentage=0.1),
]
else:
phases = [Phase(message="Pretranslating segments", percentage=1.0)]
if self._config.align_pretranslations:
phases = [
Phase(message="Pretranslating segments", percentage=0.9),
Phase(message="Aligning segments", percentage=0.1, report_steps=False),
]
else:
phases = [Phase(message="Pretranslating segments", percentage=1.0)]
return PhasedProgressReporter(progress, phases)

def _respond_to_no_training_corpus(self) -> Tuple[int, float]:
Expand Down Expand Up @@ -89,33 +103,70 @@ def _batch_inference(
with ExitStack() as stack:
phase_progress = stack.enter_context(progress_reporter.start_next_phase())
engine = stack.enter_context(self._nmt_model_factory.create_engine())
src_pretranslations = stack.enter_context(self._translation_file_service.get_source_pretranslations())
writer = stack.enter_context(self._translation_file_service.open_target_pretranslation_writer())
pretranslations = [
pt_info for pt_info in stack.enter_context(self._translation_file_service.get_source_pretranslations())
]
src_segments = [pt_info["translation"] for pt_info in pretranslations]
current_inference_step = 0
phase_progress(ProgressStatus.from_step(current_inference_step, inference_step_count))
batch_size = self._config["inference_batch_size"]
for pi_batch in batch(src_pretranslations, batch_size):
for seg_batch in batch(iter(src_segments), batch_size):
if check_canceled is not None:
check_canceled()
_translate_batch(engine, pi_batch, writer)
current_inference_step += len(pi_batch)
for i, result in enumerate(engine.translate_batch(seg_batch)):
pretranslations[current_inference_step + i]["translation"] = result.translation
current_inference_step += len(seg_batch)
phase_progress(ProgressStatus.from_step(current_inference_step, inference_step_count))

if self._config.align_pretranslations and is_eflomal_available():
logger.info("Aligning source to pretranslations")
pretranslations = self._align(src_segments, pretranslations, progress_reporter, check_canceled)

writer = stack.enter_context(self._translation_file_service.open_target_pretranslation_writer())
for pretranslation in pretranslations:
writer.write(pretranslation)

def _align(
self,
src_segments: Sequence[str],
pretranslations: Sequence[PretranslationInfo],
progress_reporter: PhasedProgressReporter,
check_canceled: Optional[Callable[[], None]],
) -> Sequence[PretranslationInfo]:
if check_canceled is not None:
check_canceled()

logger.info("Aligning source to pretranslations")
progress_reporter.start_next_phase()

src_tokenized = [tokenize(s) for s in src_segments]
trg_tokenized = [tokenize(pt_info["translation"]) for pt_info in pretranslations]

with TemporaryDirectory() as td:
aligner = EflomalAligner(Path(td))
logger.info("Training aligner")
aligner.train(src_tokenized, trg_tokenized)

if check_canceled is not None:
check_canceled()

logger.info("Aligning pretranslations")
alignments = aligner.align()

if check_canceled is not None:
check_canceled()

for i in range(len(pretranslations)):
pretranslations[i]["source_toks"] = list(src_tokenized[i])
pretranslations[i]["translation_toks"] = list(trg_tokenized[i])
pretranslations[i]["alignment"] = alignments[i]

return pretranslations

def _save_model(self) -> None:
if "save_model" in self._config and self._config.save_model is not None:
logger.info("Saving model")
model_path = self._nmt_model_factory.save_model()
self._translation_file_service.save_model(
model_path, f"models/{self._config.save_model + ''.join(model_path.suffixes)}"
)


def _translate_batch(
engine: TranslationEngine,
batch: Sequence[PretranslationInfo],
writer: DictToJsonWriter,
) -> None:
source_segments = [pi["translation"] for pi in batch]
for i, result in enumerate(engine.translate_batch(source_segments)):
batch[i]["translation"] = result.translation
writer.write(batch[i])
1 change: 1 addition & 0 deletions machine/jobs/settings.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ default:
shared_file_uri: s3:/silnlp/
shared_file_folder: production
inference_batch_size: 1024
align_pretranslations: false
huggingface:
parent_model_name: facebook/nllb-200-distilled-1.3B
train_params:
Expand Down
6 changes: 6 additions & 0 deletions machine/jobs/translation_file_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ class PretranslationInfo(TypedDict):
textId: str # noqa: N815
refs: List[str]
translation: str
source_toks: List[str]
translation_toks: List[str]
alignment: str


SOURCE_FILENAME = "train.src.txt"
Expand Down Expand Up @@ -62,6 +65,9 @@ def generator() -> Generator[PretranslationInfo, None, None]:
textId=pi["textId"],
refs=list(pi["refs"]),
translation=pi["translation"],
source_toks=list(pi["source_toks"]),
translation_toks=list(pi["translation_toks"]),
alignment=pi["alignment"],
)

return ContextManagedGenerator(generator())
Expand Down
Loading