Skip to content

Commit ce2aff2

Browse files
author
Vladyslav Moisieienkov
committed
job-status-consumer, remove job cache logic
- cache was not working and was disabled a long time; - it doesnt make sense to keep the logic.
1 parent 9a943cd commit ce2aff2

File tree

1 file changed

+1
-50
lines changed

1 file changed

+1
-50
lines changed

reana_workflow_controller/consumer.py

Lines changed: 1 addition & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,10 @@
2525
)
2626
from reana_commons.k8s.secrets import REANAUserSecretsStore
2727
from reana_commons.utils import (
28-
calculate_file_access_time,
29-
calculate_hash_of_dir,
30-
calculate_job_input_hash,
3128
build_unique_component_name,
3229
)
3330
from reana_db.database import Session
34-
from reana_db.models import Job, JobCache, Workflow, RunStatus
31+
from reana_db.models import Job, Workflow, RunStatus
3532
from sqlalchemy.exc import SQLAlchemyError
3633
from sqlalchemy.orm.attributes import flag_modified
3734

@@ -109,9 +106,6 @@ def on_message(self, body, message):
109106
if "progress" in msg:
110107
_update_run_progress(workflow_uuid, msg)
111108
_update_job_progress(workflow_uuid, msg)
112-
# Caching: calculate input hash and store in JobCache
113-
if "caching_info" in msg:
114-
_update_job_cache(msg)
115109
Session.commit()
116110
else:
117111
logging.error(
@@ -193,10 +187,7 @@ def _update_commit_status(workflow, status):
193187
def _update_run_progress(workflow_uuid, msg):
194188
"""Register succeeded Jobs to DB."""
195189
workflow = Session.query(Workflow).filter_by(id_=workflow_uuid).one_or_none()
196-
cached_jobs = None
197190
job_progress = workflow.job_progress
198-
if "cached" in msg["progress"]:
199-
cached_jobs = msg["progress"]["cached"] # noqa: F841
200191
for status, _ in PROGRESS_STATUSES:
201192
if status in msg["progress"]:
202193
previous_status = workflow.job_progress.get(status)
@@ -240,46 +231,6 @@ def _update_job_progress(workflow_uuid, msg):
240231
job.status = job_status
241232

242233

243-
def _update_job_cache(msg):
244-
"""Update caching information for finished job."""
245-
cached_job = (
246-
Session.query(JobCache)
247-
.filter_by(job_id=msg["caching_info"].get("job_id"))
248-
.first()
249-
)
250-
251-
input_files = []
252-
if cached_job:
253-
file_access_times = calculate_file_access_time(
254-
msg["caching_info"].get("workflow_workspace")
255-
)
256-
for filename in cached_job.access_times:
257-
if filename in file_access_times:
258-
input_files.append(filename)
259-
else:
260-
return
261-
cmd = msg["caching_info"]["job_spec"]["cmd"]
262-
# removes cd to workspace, to be refactored
263-
clean_cmd = ";".join(cmd.split(";")[1:])
264-
msg["caching_info"]["job_spec"]["cmd"] = clean_cmd
265-
266-
if "workflow_workspace" in msg["caching_info"]["job_spec"]:
267-
del msg["caching_info"]["job_spec"]["workflow_workspace"]
268-
input_hash = calculate_job_input_hash(
269-
msg["caching_info"]["job_spec"], msg["caching_info"]["workflow_json"]
270-
)
271-
workspace_hash = calculate_hash_of_dir(
272-
msg["caching_info"].get("workflow_workspace"), input_files
273-
)
274-
if workspace_hash == -1:
275-
return
276-
277-
cached_job.parameters = input_hash
278-
cached_job.result_path = msg["caching_info"].get("result_path")
279-
cached_job.workspace_hash = workspace_hash
280-
Session.add(cached_job)
281-
282-
283234
def _delete_workflow_job(workflow: Workflow) -> None:
284235
job_name = build_unique_component_name("run-batch", workflow.id_)
285236
try:

0 commit comments

Comments
 (0)