Skip to content

Commit

Permalink
Misc cleaning up of ingest scripts (#2256)
Browse files Browse the repository at this point in the history
* Cleaning up ingest scripts

* Mistyped echo

---------

Co-authored-by: palindrome <[email protected]>
  • Loading branch information
mineralntl and hlgp authored Feb 12, 2025
1 parent 45b0194 commit 46af0e0
Show file tree
Hide file tree
Showing 33 changed files with 255 additions and 234 deletions.
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
#!/bin/bash

if [[ `uname` == "Darwin" ]]; then
THIS_SCRIPT=`python -c 'import os,sys;print os.path.realpath(sys.argv[1])' $0`
if [[ $(uname) == "Darwin" ]]; then
THIS_SCRIPT=$(python -c 'import os,sys;print os.path.realpath(sys.argv[1])' $0)
else
THIS_SCRIPT=`readlink -f $0`
THIS_SCRIPT=$(readlink -f "$0")
fi

THIS_DIR="${THIS_SCRIPT%/*}"
cd $THIS_DIR
cd $THIS_DIR || exit

. ./ingest-env.sh
BIN_DIR=$1
Expand All @@ -22,45 +23,45 @@ shopt -s extglob
while [ 1 == 1 ]; do

# first lets startup any markers we have within the pipeline count that we have
declare -i MARKER_COUNT=`shopt -s extglob; find ${FLAG_DIR} -regextype posix-egrep -regex ".*_(bulk)_.*\.flag\..*\.marker" 2>/dev/null | wc -l`
declare -i MARKER_COUNT=$(shopt -s extglob; find ${FLAG_DIR} -regextype posix-egrep -regex ".*_(bulk)_.*\.flag\..*\.marker" 2>/dev/null | wc -l)
echo "Found $MARKER_COUNT marker files"
if [[ $((MARKER_COUNT)) -gt 0 ]]; then
for f in `shopt -s extglob; find ${FLAG_DIR} -regextype posix-egrep -regex ".*_(bulk)_.*\.flag\..*\.marker" 2>/dev/null`; do
for f in $(shopt -s extglob; find ${FLAG_DIR} -regextype posix-egrep -regex ".*_(bulk)_.*\.flag\..*\.marker" 2>/dev/null); do
# extract the pipline id
declare -i PIPELINE=$(flagPipeline $f)
if [[ $((PIPELINE)) -gt 0 && $((PIPELINE)) -le $((PIPELINE_COUNT)) ]]; then
echo "`date` Executing job for marker file $f in pipeline $PIPELINE"
echo "$(date) Executing job for marker file $f in pipeline $PIPELINE"

inprogress_file=$(flagBasename $f).flag.inprogress
mv $f $inprogress_file
MAP_LOADER_INDEX=$(( MAP_LOADER_INDEX + 1 ))
if (( $MAP_LOADER_INDEX >= ${#MAP_LOADER_HDFS_NAME_NODES[@]} )); then
if (( MAP_LOADER_INDEX >= ${#MAP_LOADER_HDFS_NAME_NODES[@]} )); then
MAP_LOADER_INDEX=0
fi
EXTRA_ARGS="-destHdfs ${MAP_LOADER_HDFS_NAME_NODES[$MAP_LOADER_INDEX]}"
$BIN_DIR/bulk-execute.sh $BIN_DIR $inprogress_file $LOG_DIR $FLAG_DIR -pipelineId $PIPELINE $EXTRA_ARGS &
else
echo "`date` Resetting marker file for ingest pipeline $PIPELINE"
echo "$(date) Resetting marker file for ingest pipeline $PIPELINE"
mv $f $(flagBasename $f).flag
fi
done
fi

# now make sure we have a flag started for every pipeline
for (( pipeline=1; pipeline <= $((PIPELINE_COUNT)); pipeline=$((pipeline+1)) )); do
declare -i PIPELINE_JOB_COUNT=`ps -ef | grep bulk-execute.sh | grep " -pipelineId ${pipeline} " | wc -l`
declare -i PIPELINE_MARKER_COUNT=`shopt -s extglob; find ${FLAG_DIR}/ -regextype posix-egrep -regex ".*_(bulk)_.*\.flag\.${pipeline}\.marker" 2>/dev/null | wc -l`
declare -i PIPELINE_JOB_COUNT=$(ps -ef | grep bulk-execute.sh | grep " -pipelineId ${pipeline} " | wc -l)
declare -i PIPELINE_MARKER_COUNT=$(shopt -s extglob; find ${FLAG_DIR}/ -regextype posix-egrep -regex ".*_(bulk)_.*\.flag\.${pipeline}\.marker" 2>/dev/null | wc -l)
declare -i PIPELINE_TOTAL_COUNT=$((PIPELINE_JOB_COUNT + PIPELINE_MARKER_COUNT))
echo "Found $PIPELINE_TOTAL_COUNT jobs for pipeline $pipeline"
if [[ $((PIPELINE_TOTAL_COUNT)) == 0 ]]; then
if [[ "$MAPRED_INGEST_OPTS" =~ "-markerFileLIFO" ]]; then
flag_files=`find ${FLAG_DIR}/ -regextype posix-egrep -regex ".*_(bulk)_.*\.flag" -printf "%AY%Aj%AT %p\n" | sort -r | head -1 | awk '{print $2}'`
flag_files=$(find ${FLAG_DIR}/ -regextype posix-egrep -regex ".*_(bulk)_.*\.flag" -printf "%AY%Aj%AT %p\n" | sort -r | head -1 | awk '{print $2}')
else
flag_files=`find ${FLAG_DIR}/ -regextype posix-egrep -regex ".*_(bulk)_.*\.flag" -printf "%AY%Aj%AT %p\n" | sort | head -1 | awk '{print $2}'`
flag_files=$(find ${FLAG_DIR}/ -regextype posix-egrep -regex ".*_(bulk)_.*\.flag" -printf "%AY%Aj%AT %p\n" | sort | head -1 | awk '{print $2}')
fi
for first_flag_file in $flag_files; do
if [[ -a $first_flag_file ]]; then
echo "`date` Executing job for $first_flag_file in pipeline $pipeline"
echo "$(date) Executing job for $first_flag_file in pipeline $pipeline"
mv $first_flag_file ${first_flag_file}.inprogress
MAP_LOADER_INDEX=$(( MAP_LOADER_INDEX + 1 ))
if (( $MAP_LOADER_INDEX >= ${#MAP_LOADER_HDFS_NAME_NODES[@]} )); then
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
#!/bin/bash

if [[ `uname` == "Darwin" ]]; then
THIS_SCRIPT=`python -c 'import os,sys;print os.path.realpath(sys.argv[1])' $0`
if [[ $(uname) == "Darwin" ]]; then
THIS_SCRIPT=$(python -c 'import os,sys;print os.path.realpath(sys.argv[1])' $0)
else
THIS_SCRIPT=`readlink -f $0`
THIS_SCRIPT=$(readlink -f "$0")
fi

THIS_DIR="${THIS_SCRIPT%/*}"
cd $THIS_DIR
cd $THIS_DIR || exit

#
# Get the classpath
Expand All @@ -28,13 +29,13 @@ fi
declare -a INGEST_CONFIG
i=0
for f in ../../config/*-config.xml; do
INGEST_CONFIG[i++]=`basename $f`
INGEST_CONFIG[i++]=$(basename $f)
done

#
# Transform the classpath into a comma-separated list also
#
LIBJARS=`echo $CLASSPATH | sed 's/:/,/g'`
LIBJARS=$(echo $CLASSPATH | sed 's/:/,/g')

#
# Ingest parameters
Expand All @@ -47,10 +48,10 @@ if [[ "$BULK_CHILD_REDUCE_MAX_MEMORY_MB" == "" ]]; then
fi
# Tell Yarn that we're using 20% more memory than we've requested for the VM
# to account for off heap memory usage.
MAP_MEMORY_MB=$(( (($BULK_CHILD_MAP_MAX_MEMORY_MB*1048576*12)/10)/1048576 ))
REDUCE_MEMORY_MB=$(( (($BULK_CHILD_REDUCE_MAX_MEMORY_MB*1048576*12)/10)/1048576 ))
MAP_MEMORY_MB=$(( ((BULK_CHILD_MAP_MAX_MEMORY_MB*1048576*12)/10)/1048576 ))
REDUCE_MEMORY_MB=$(( ((BULK_CHILD_REDUCE_MAX_MEMORY_MB*1048576*12)/10)/1048576 ))

DATE=`date "+%Y%m%d%H%M%S"`
DATE=$(date "+%Y%m%d%H%M%S")
WORKDIR=${BASE_WORK_DIR}/${DATE}-$$/
# specifying no partitioning argument will default to the MultiTableRangePartitioner
PART_ARG=
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
#!/bin/bash

if [[ `uname` == "Darwin" ]]; then
THIS_SCRIPT=`python -c 'import os,sys;print os.path.realpath(sys.argv[1])' $0`
MKTEMP="mktemp -t -u `basename $0`"
if [[ $(uname) == "Darwin" ]]; then
THIS_SCRIPT=$(python -c 'import os,sys;print os.path.realpath(sys.argv[1])' $0)
MKTEMP="mktemp -t -u $(basename $0)"
else
THIS_SCRIPT=`readlink -f $0`
MKTEMP="mktemp -t `basename $0`.XXXXXXXX"
THIS_SCRIPT=$(readlink -f $0)
MKTEMP="mktemp -t $(basename $0).XXXXXXXX"
fi
THIS_DIR="${THIS_SCRIPT%/*}"
cd $THIS_DIR
cd $THIS_DIR || exit

. ../ingest/ingest-env.sh
. ../ingest/job-cache-env.sh
Expand All @@ -17,14 +17,14 @@ cd $THIS_DIR
echo "Checking the consistency of $JOB_CACHE_DIR"

# list the hdfs job cache
hdfs_listing=`$MKTEMP`
hdfs_listing=$($MKTEMP)
trap 'rm -f "$hdfs_listing"; exit 1' INT TERM EXIT

$INGEST_HADOOP_HOME/bin/hadoop fs -fs $INGEST_HDFS_NAME_NODE -ls -R $INGEST_HDFS_NAME_NODE$JOB_CACHE_DIR > $hdfs_listing

# check the datawave libraries in the job cache for consistency with the libraries on disk
for c in ${CLASSPATH//:/ }; do
for f in `find -L $c -type f`; do
for f in $(find -L $c -type f); do

dir=${c%/}
bname=${dir/*\//}
Expand All @@ -40,16 +40,16 @@ for c in ${CLASSPATH//:/ }; do
fi

# get the local size
local_file=`/bin/ls -Ll $f`
local_file=$(/bin/ls -Ll $f)
if [[ $? != 0 ]]; then
echo "Cannot find $f"
exit 1
fi
local_size=`echo $local_file | awk '{print $5}'`
local_size=$(echo $local_file | awk '{print $5}')

# get the size in hadoop
hdfs_file=`grep -F /$name $hdfs_listing`
hdfs_size=`echo $hdfs_file | awk '{print $5}'`
hdfs_file=$(grep -F /$name $hdfs_listing)
hdfs_size=$(echo $hdfs_file | awk '{print $5}')

if [[ "$local_size" != "$hdfs_size" ]]; then
echo "$JOB_CACHE_DIR inconsistent for $name: $local_size != $hdfs_size"
Expand All @@ -61,8 +61,8 @@ done
# If the edge key version file was not created or empty then we want to retry updating it so we fail the check so that
# the load-job-cache.sh gets run which will update the edge key version file
name="edge-key-version.txt"
hdfs_file=`grep -F /$name $hdfs_listing`
hdfs_size=`echo $hdfs_file | awk '{print $5}'`
hdfs_file=$(grep -F /$name $hdfs_listing)
hdfs_size=$(echo $hdfs_file | awk '{print $5}')
if [ "$hdfs_size" == "" ]; then
echo "$JOB_CACHE_DIR missing $name"
exit 1
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
#!/bin/bash

if [[ `uname` == "Darwin" ]]; then
THIS_SCRIPT=`python -c 'import os,sys;print os.path.realpath(sys.argv[1])' $0`
if [[ $(uname) == "Darwin" ]]; then
THIS_SCRIPT=$(python -c 'import os,sys;print os.path.realpath(sys.argv[1])' $0)
else
THIS_SCRIPT=`readlink -f $0`
THIS_SCRIPT=$(readlink -f "$0")
fi

THIS_DIR="${THIS_SCRIPT%/*}"
cd $THIS_DIR
cd $THIS_DIR || exit

#
# Ensure force is set so we can run this without ingest running
#
if [[ ! "$@" =~ ".*-force.*" && ! "$@" =~ "-force" ]]; then
if [[ ! "$@" =~ .*-force.* && ! "$@" =~ "-force" ]]; then
$THIS_SCRIPT -force
exit $?
fi
Expand All @@ -36,7 +37,7 @@ fi
declare -a INGEST_CONFIG
i=0
for f in ../../config/*-config.xml; do
INGEST_CONFIG[i++]=`basename $f`
INGEST_CONFIG[i++]=$(basename $f)
done

export HADOOP_CLASSPATH=$CLASSPATH
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,27 @@
# -d yyyyMMdd specifies the date to create a split for. Default is tomorrow.
# -n N specifies the number of days for which to create splits. Default is 1.

if [[ `uname` == "Darwin" ]]; then
THIS_SCRIPT=`python -c 'import os,sys;print os.path.realpath(sys.argv[1])' $0`
if [[ $(uname) == "Darwin" ]]; then
THIS_SCRIPT=$(python -c 'import os,sys;print os.path.realpath(sys.argv[1])' $0)
else
THIS_SCRIPT=`readlink -f $0`
THIS_SCRIPT=$(readlink -f "$0")
fi

THIS_DIR="${THIS_SCRIPT%/*}"
cd $THIS_DIR
cd $THIS_DIR || exit

. ./ingest-libs.sh

# Default date is tomorrow
DATE=`date -d tomorrow +%Y%m%d`
DATE=$(date -d tomorrow +%Y%m%d)

# Default is one split
NUM_DAYS=1

# optional argument -d YYYYMMDD
while getopts d:n: FLAG; do
case $FLAG in
d) DATE=`echo $OPTARG | egrep '^[0-9]{8}$'`
d) DATE=$(echo $OPTARG | egrep '^[0-9]{8}$')
if [ "$DATE" == "" ]; then
echo "DATE must be in YYYYMMDD format, not $OPTARG"
exit 1
Expand All @@ -34,15 +35,18 @@ while getopts d:n: FLAG; do
exit $?
fi
;;
n) NUM_DAYS=`echo $OPTARG | egrep '^[0-9]+$'`
n) NUM_DAYS=$(echo $OPTARG | egrep '^[0-9]+$')
if [ "$NUM_DAYS" == "" ]; then
echo "NUM_DAYS must be a number, not $OPTARG"
exit 1
fi
;;
*) usage
exit
;;
esac
done

DAILY_SPLITS=`for ((i = 0; i < $NUM_DAYS; i++)); do for ((j = 0; j < $NUM_DATE_INDEX_SHARDS; j++)); do date -d "${DATE} + ${i} days" +%Y%m%d | awk "{print \\$1 \"_\" ${j}}" ; done ; done | tr "\n" " "`
DAILY_SPLITS=$(for ((i = 0; i < $NUM_DAYS; i++)); do for ((j = 0; j < $NUM_DATE_INDEX_SHARDS; j++)); do date -d "${DATE} + ${i} days" +%Y%m%d | awk "{print \\$1 \"_\" ${j}}" ; done ; done | tr "\n" " ")
echo "Creating the following splits for dateIndex: $DAILY_SPLITS"
$WAREHOUSE_ACCUMULO_HOME/bin/accumulo shell -u $USERNAME -p $PASSWORD -e "addsplits ${DAILY_SPLITS} -t dateIndex" -zi $WAREHOUSE_INSTANCE_NAME -zh $WAREHOUSE_ZOOKEEPERS
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,17 @@ function usage
\t--help\t print this message\n"
}



if [[ `uname` == "Darwin" ]]; then
THIS_SCRIPT=`python -c 'import os,sys;print os.path.realpath(sys.argv[1])' $0`
if [[ $(uname) == "Darwin" ]]; then
THIS_SCRIPT=$(python -c 'import os,sys;print os.path.realpath(sys.argv[1])' $0)
else
THIS_SCRIPT=`readlink -f $0`
THIS_SCRIPT=$(readlink -f "$0")
fi
THIS_DIR="${THIS_SCRIPT%/*}"
cd $THIS_DIR

THIS_DIR="${THIS_SCRIPT%/*}"
cd $THIS_DIR || exit

RUN_OPS=""


#Parse the command line args
while [ "$1" != "" ]; do
case $1 in
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
#!/bin/bash

if [[ `uname` == "Darwin" ]]; then
THIS_SCRIPT=`python -c 'import os,sys;print os.path.realpath(sys.argv[1])' $0`
if [[ $(uname) == "Darwin" ]]; then
THIS_SCRIPT=$(python -c 'import os,sys;print os.path.realpath(sys.argv[1])' $0)
else
THIS_SCRIPT=`readlink -f $0`
THIS_SCRIPT=$(readlink -f "$0")
fi

THIS_DIR="${THIS_SCRIPT%/*}"
cd $THIS_DIR
cd $THIS_DIR || exit

. ./ingest-libs.sh

Expand All @@ -16,7 +17,7 @@ if [[ "$1" == "" ]]; then
exit 1
fi

date=`date --date="$1" +%Y%m%d`
date=$(date --date="$1" +%Y%m%d)
if [[ "$?" != "0" ]]; then
echo " Please specify a valid date (yyyyMMdd) for which to process data"
exit 1
Expand All @@ -39,7 +40,7 @@ dateDiff (){

if [[ "$2" == "" ]]; then
# set the count to the number of days up until today
end=`date +%Y%m%d`
end=$(date +%Y%m%d)
count=$(dateDiff $date $end)
else
count=$2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,14 @@
# Generating 1/1 of 250 = 250 error table shards for 20230809
# Generating 1/1 of 100 = 100 query metric shards for 20230809

if [[ `uname` == "Darwin" ]]; then
THIS_SCRIPT=`python -c 'import os,sys;print os.path.realpath(sys.argv[1])' $0`
if [[ $(uname) == "Darwin" ]]; then
THIS_SCRIPT=$(python -c 'import os,sys;print os.path.realpath(sys.argv[1])' $0)
else
THIS_SCRIPT=`readlink -f $0`
THIS_SCRIPT=$(readlink -f "$0")
fi

THIS_DIR="${THIS_SCRIPT%/*}"
cd $THIS_DIR
cd $THIS_DIR || exit

. ./ingest-libs.sh

Expand Down Expand Up @@ -51,7 +52,7 @@ shardTableNumShards=$((numshardsOrg * numerator / divisor))
errorTableNumShards=$((errorTableShards * numerator / divisor))
queryMetricTableNumShards=$((queryMetricShards * numerator / divisor))

DATE=`date -d tomorrow +%Y%m%d`
DATE=$(date -d tomorrow +%Y%m%d)

echo "Generating ${numerator}/${divisor} of ${numshardsOrg} = $shardTableNumShards shards for $DATE"
echo "Generating ${numerator}/${divisor} of ${errorTableShards} = $errorTableNumShards shards for $DATE"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ export LOG_DIR=$2
export FLAG_DIR=$3
EXTRA_ARGS=${@:4}
export JOB_FILE_BASE=${JOB_FILE%%.inprogress}
export LOG_FILE_BASE=`basename $JOB_FILE .inprogress`
export LOG_FILE_BASE=$(basename $JOB_FILE .inprogress)
export LOG_FILE=${LOG_FILE_BASE}.log

CMD=`head -1 $JOB_FILE | envsubst`
CMD=$(head -1 $JOB_FILE | envsubst)
CMD="$CMD $EXTRA_ARGS -flagFile $JOB_FILE"
echo "Executed Command: $CMD" >> $LOG_DIR/$LOG_FILE 2>&1
$CMD >> $LOG_DIR/$LOG_FILE 2>&1
Expand Down
Loading

0 comments on commit 46af0e0

Please sign in to comment.