Skip to content

Commit

Permalink
#1093: Round up to 100% progress for frames that have completed. New …
Browse files Browse the repository at this point in the history
…column in pcm_batch.py view all to list all completed frames
  • Loading branch information
philipjyoon committed Feb 20, 2025
1 parent 75028ad commit 9bd11de
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 7 deletions.
7 changes: 6 additions & 1 deletion data_subscriber/cslc_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,16 +104,21 @@ def get_nearest_sensing_datetime(frame_sensing_datetimes, sensing_time):

return len(frame_sensing_datetimes), frame_sensing_datetimes[-1]

def calculate_historical_progress(frame_states: dict, end_date, frame_to_bursts):
def calculate_historical_progress(frame_states: dict, end_date, frame_to_bursts, k=15):
'''Assumes start date of historical processing as the earlest date possible which is really the only way it should be run'''

total_possible_sensingdates = 0
total_processed_sensingdates = 0
frame_completion = {}
last_processed_datetimes = {}

for frame, state in frame_states.items():
frame = int(frame)
num_sensing_times, _ = get_nearest_sensing_datetime(frame_to_bursts[frame].sensing_datetimes, end_date)

# Round down to the nearest k
num_sensing_times = num_sensing_times - (num_sensing_times % k)

total_possible_sensingdates += num_sensing_times
total_processed_sensingdates += state
frame_completion[str(frame)] = round(state / num_sensing_times * 100) if num_sensing_times > 0 else 0
Expand Down
47 changes: 41 additions & 6 deletions tools/pcm_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,14 @@

from util.conf_util import SettingsConf

from data_subscriber.cslc_utils import localize_disp_frame_burst_hist
from data_subscriber.cslc_utils import localize_disp_frame_burst_hist, get_nearest_sensing_datetime

DATETIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ"
JOB_NAME_DATETIME_FORMAT = "%Y%m%dT%H%M%S"

SETTINGS = SettingsConf(file=str(Path("/export/home/hysdsops/.sds/config"))).cfg
GRQ_IP = SETTINGS["GRQ_PVT_IP"]
MOZART_IP = SETTINGS["MOZART_PVT_IP"]

ES_DATETIME_FORMAT = "%Y-%m-%dT%H:%M:%S"
ES_INDEX = 'batch_proc'
Expand All @@ -33,7 +34,10 @@
LOGGER.setLevel(logging.INFO)

eu = ElasticsearchUtility('http://%s:9200' % GRQ_IP, LOGGER)
LOGGER.info("Connected to %s" % str(eu.es_url))
LOGGER.debug("Connected to %s" % str(eu.es_url))

eu_mzt = ElasticsearchUtility('http://%s:9200' % MOZART_IP, LOGGER)
LOGGER.debug("Connected to %s" % str(eu_mzt.es_url))

FILE_OPTION = '--file'

Expand All @@ -59,27 +63,58 @@ def view_proc(id):
for hit in procs['hits']['hits']:
proc = hit['_source']
if proc['job_type'] == "cslc_query_hist":

data_end_date = datetime.strptime(proc['data_end_date'], ES_DATETIME_FORMAT)

try:
pp = f"{proc['progress_percentage']}%"
except:
pp = "UNKNOWN"
try:
fcp = [f"{f}: {p}%" for f, p in proc["frame_completion_percentages"].items()]
fcp = [f"{frame}: {p}%" for frame, p in proc["frame_completion_percentages"].items()]

# Every frame that has 100% frame_completion_percentage, check in Mozart ES to see if the last SCIFLO has been completed
cf = []
job_id_prefixes = {}
for frame, p in proc["frame_completion_percentages"].items():
frame_state = proc['frame_states'][frame] - 1 # 1-based vs 0-based
acq_index = frames_to_bursts[int(frame)].sensing_datetime_days_index[frame_state]
job_id_prefix = f"job-WF-SCIFLO_L3_DISP_S1-frame-{frame}-latest_acq_index-{acq_index}_hist"
if p == 100:
job_id_prefixes[frame] = job_id_prefix
else:
# check to see if there is a last acq index job for this frame that exists
num_sensing_times, _ = get_nearest_sensing_datetime(frames_to_bursts[int(frame)].sensing_datetimes,
data_end_date)
num_sensing_times = num_sensing_times - (num_sensing_times % proc['k']) # Round down to the nearest k
if num_sensing_times == frame_state + 1:
job_id_prefixes[frame] = job_id_prefix

for frame, job_id_prefix in job_id_prefixes.items():
logging.debug(f"Checking for {job_id_prefix}")
query = {"query": {"bool": {"must": [{"prefix": {"job_id": job_id_prefix}}]}}}
sciflo_jobs = eu_mzt.query(body=query, index="job_status*")
for j in sciflo_jobs:
if j['_source']['status'] == "job-completed":
cf.append(int(frame))

except:
fcp = "UNKNOWN"
rows.append([hit['_id'], proc["label"], pp, proc["frames"], fcp])
cf = "UNKNOWN"

rows.append([hit['_id'], proc["label"], pp, proc["frames"], fcp, cf])
else:
# progress percentage is the ratio of last_successful_proc_data_date in the range between data_start_date and data_end_date
total_time = convert_datetime(proc["data_end_date"], ES_DATETIME_FORMAT) - convert_datetime(proc["data_start_date"], ES_DATETIME_FORMAT)
processed_time = convert_datetime(proc["last_successful_proc_data_date"], ES_DATETIME_FORMAT) - convert_datetime(proc["data_start_date"], ES_DATETIME_FORMAT)
progress_percentage = (processed_time / total_time) * 100
rows.append([hit['_id'], proc["label"], f"{progress_percentage:.0f}%", "N/A", "N/A"])
rows.append([hit['_id'], proc["label"], f"{progress_percentage:.0f}%", "N/A", "N/A", "N/A"])

print(" --- Showing Summary of Enabled Batch Procs --- ")
if len(rows) == 0:
print("No enabled batch procs found")
return
print(tabulate(rows, headers=["ID (showing enabled only)", "Label", "Progress", "Frames", "Frame Completion Percentages"], tablefmt="grid", maxcolwidths=[None,None, None, 30, 60]))
print(tabulate(rows, headers=["ID (showing enabled only)", "Label", "Progress", "Frames", "Frame Completion Percentages", "Completed Frames"], tablefmt="grid", maxcolwidths=[None,None, None, 30, 60, 30]))

return

Expand Down

0 comments on commit 9bd11de

Please sign in to comment.