Skip to content

Commit

Permalink
clean: run discarding documents instead of tagging
Browse files Browse the repository at this point in the history
Refactor cli for clean jobs, use getopts to add cli options.
Pass parameters to job scripts through the environment.
  • Loading branch information
ZJaume committed Nov 30, 2023
1 parent 03b9c1d commit 2251780
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 27 deletions.
9 changes: 5 additions & 4 deletions 30.clean
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,9 @@ set -euo pipefail
L=$1
input_dir=dedup
output_dir=clean
external=false
if [ $# -ge 2 ] && [ "$2" == "external" ]; then
if [ "$EXTERNAL" = true ]; then
input_dir=external
output_dir=external_clean
external=true
fi
INPUT=$WORKSPACE/$input_dir/$L/${L}_$SLURM_ARRAY_TASK_ID.jsonl.zst
OUTPUT=$WORKSPACE/$output_dir/$L/${L}_$SLURM_ARRAY_TASK_ID.jsonl.zst
Expand All @@ -37,9 +35,12 @@ case "$L" in
*)
FILTER_PARAMS="-a";;
esac
if [ "$external" = true ]; then
if [ "$EXTERNAL" = true ]; then
FILTER_PARAMS="-w -m";
fi
if [ "$DISCARD" = true ]; then
FILTER_PARAMS="$FILTER_PARAMS -f";
fi

# Do not use -k, we don't care if documents appear in
# different order
Expand Down
77 changes: 60 additions & 17 deletions 30.clean.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,48 +5,91 @@
source .env
source .checks
set -euo pipefail

usage () {
echo "Usage: `basename $0` [options] <lang>"
echo "Options:"
echo " -d Discard documents instead of tagging"
echo " -e Process external contributions"
echo " -f FAILED_JOBID Re-run failed jobs of previous job array"
echo " -i INDEX Job array index instead of run 'all'"
echo " -h Shows this message"
}


MAX_JOBS=120
NUM_JOBS=60
TIME_RETRY=10m

L=$1
external=""
input_dir="dedup"
DISCARD=false
EXTERNAL=false
FAILED=""
INDEX=""
while getopts "df:i:eh" options
do
case "${options}" in
d) DISCARD=true;;
f) FAILED=$OPTARG;;
i) INDEX=$OPTARG;;
e) EXTERNAL=true;;
h) usage
exit 0;;
\?) usage >&2
exit 1;;
esac
done

L=${@:$OPTIND:1}

if [ $# -lt 2 ] || [ "$2" == "all" ]; then
if [ "$INDEX" = "" ]; then
# List all the batches that need to be processed (size of the job array)
if [ $# -ge 3 ] && [ "$3" == "external" ]; then
if [ "$EXTERNAL" = true ]; then
external="external"
input_dir=external
fi
INDEX=1-$(ls -1 $WORKSPACE/$input_dir/$L/${L}_*.zst | sed -E 's#.*/\w{2,3}_([0-9]+)\.jsonl\.zst#\1#' | sort -n | tail -1)
elif [ $# -gt 2 ] && [ "$2" == "failed" ]; then
# Select only failed jobs (timeout, oom and failed status)
# Create a list of batch id's separated by comma
JOB=$3
else
INDEX=$2
fi

if ! [ "$FAILED" = "" ]; then
# Select only failed jobs (timeout, oom and failed status)
# Create a list of batch id's separated by comma
INDEX=$(\
sacct -j $JOB --parsable -s oom,f,to -n \
sacct -j $FAILED --parsable -s oom,f,to -n \
| grep -v '.batch' \
| sed -E 's/[0-9]+_([0-9]+)\|.*/\1/g' \
| paste -sd','
)
else
INDEX=$2
fi

if [ "$DISCARD" = true ]; then
echo "Discarding documents instead of tagging"
fi
echo "Job array of index $INDEX for $L"
read -p "Confirm? [y/n] " -n 1 -r
if [[ ! $REPLY =~ [Yy] ]]; then echo; exit 1; fi
echo

# Send parameters to the job scripts through the env
export EXTERNAL
export DISCARD
JOBNAME="$L"
if [ "$EXTERNAL" = true ]; then
JOBNAME="external-$L"
fi

JOB_ID=$(\
SBATCH_OUTPUT="$SLURM_LOGS_DIR/%x-%A_%a.out" \
sbatch --array=$INDEX \
-J $external$L-clean --parsable \
30.clean $L $external)
-J $JOBNAME-clean --parsable \
30.clean $L)
echo Submitted batch job $JOB_ID

SBATCH_OUTPUT="$SLURM_LOGS_DIR/%x.out" \
sbatch -J $external$L-stats \
-d afterok:$JOB_ID \
30.stats $L $external
if [ "$DISCARD" = false ]; then
SBATCH_OUTPUT="$SLURM_LOGS_DIR/%x.out" \
sbatch -J $JOBNAME-stats \
-d afterok:$JOB_ID \
30.stats $L
fi
4 changes: 1 addition & 3 deletions 30.stats
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,12 @@ set -euo pipefail

L=$1
input_dir=clean
if [ $# -ge 2 ] && [ "$2" == "external" ]; then
if [ $EXTERNAL = true ]; then
input_dir=external_clean
fi
INPUT=$WORKSPACE/$input_dir/$L/
OUTPUT=$WORKSPACE/$input_dir/$L/${L}_stats

# Do not use -k, we don't care if documents appear in
# different order
zstdcat $INPUT/${L}_*.jsonl.zst \
| python scripts/filter-stats.py \
>$OUTPUT.tmp
Expand Down
14 changes: 11 additions & 3 deletions filter-docs.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
parser.add_argument('-m','--minimum', action='store_true', help="Remove docs that do not meet the minimum size")
parser.add_argument('-l','--language', action='store_true', help="Remove docs that do not meet the minimum correct language pct")
parser.add_argument('-z','--cjk', action='store_true', help="Process CJK language")
parser.add_argument('-s','--stats', action='store_true', help="Do not filter just print stats+docs for debugging")
parser.add_argument('-f','--filter', action='store_true', help="Discard documents instead of adding filter metadata.")

args = parser.parse_args()
if args.all:
args.explicit = True
Expand Down Expand Up @@ -101,5 +102,12 @@ def filter_doc(args, doc):
for line in sys.stdin:
doc = orjson.loads(line)
reason = filter_doc(args, doc)
doc["filter"] = reason
print(orjson.dumps(doc).decode('utf-8'))

# If not discarding documents, just add metadata to the json and print
# otherwise just print the document in case "keep" reason
# with other filter reasons, just do nothing
if not args.filter:
doc["filter"] = reason
print(orjson.dumps(doc).decode('utf-8'))
elif reason == "keep":
print(line, end='')

0 comments on commit 2251780

Please sign in to comment.