Skip to content

Commit

Permalink
ref: migrate to dcor_shared.RQJob
Browse files Browse the repository at this point in the history
  • Loading branch information
paulmueller committed Nov 26, 2024
1 parent d389f90 commit 56ecf93
Show file tree
Hide file tree
Showing 7 changed files with 54 additions and 43 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ jobs:
checks:
runs-on: macos-12
steps:
- uses: actions/checkout@v3
- uses: actions/checkout@main
- name: Set up Python 3.8
uses: actions/setup-python@v4
uses: actions/setup-python@main
with:
python-version: "3.8"
- name: Install flake8 dependencies
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/deploy_pypi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ jobs:
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v3
- uses: actions/checkout@main
- name: Set up Python
uses: actions/setup-python@v4
uses: actions/setup-python@main
with:
python-version: "3.9"
- name: Install dependencies
Expand Down
2 changes: 2 additions & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
0.11.0
- ref: migrate to dcor_shared.RQJob
0.10.2
- setup: fix ckanext namespace
0.10.1
Expand Down
20 changes: 15 additions & 5 deletions ckanext/dc_view/cli.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import datetime
import time
import traceback

import ckan.model as model

Expand Down Expand Up @@ -35,21 +36,30 @@ def run_jobs_dc_view(modified_days=-1, force=False):
past_str = time.strftime("%Y-%m-%d", past.timetuple())
datasets = datasets.filter(model.Package.metadata_modified >= past_str)

job_list = jobs.RQJob.get_all_job_methods_in_order(
ckanext="dc_view")

nl = False # new line character
for dataset in datasets:
nl = False
click.echo(f"Checking dataset {dataset.id}\r", nl=False)

for resource in dataset.resources:
res_dict = resource.as_dict()
try:
if jobs.create_preview_job(res_dict, override=force):
click_echo(f"Created preview for {resource.name}", nl)
nl = True
for job in job_list:
if job.method(res_dict, override=force):
if not nl:
click.echo("")
nl = True
click.echo(f"OK: {job.title} for {resource.name}")
except KeyboardInterrupt:
raise
except BaseException as e:
click_echo(
f"{e.__class__.__name__}: {e} for {res_dict['name']}", nl)
click.echo(
f"\n{e.__class__.__name__} for {res_dict['name']}!",
err=True)
click.echo(traceback.format_exc(), err=True)
nl = True
if not nl:
click.echo("")
Expand Down
32 changes: 27 additions & 5 deletions ckanext/dc_view/jobs.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,21 @@
import atexit
from collections import OrderedDict
import logging
import os
import pathlib
import shutil
import tempfile

import dclab
from dcor_shared import DC_MIME_TYPES, s3cc, get_dc_instance, wait_for_resource
from dcor_shared import (
DC_MIME_TYPES, get_dc_instance, rqjob_register, s3, s3cc, wait_for_resource
)
from dcor_shared import RQJob # noqa: F401

import numpy as np


log = logging.getLogger(__name__)

# Create a matplotlib config directory, so we can import and use matplotlib
mpldir = "/tmp/matplotlib"
pathlib.Path(mpldir).mkdir(exist_ok=True)
Expand All @@ -25,12 +32,27 @@ def admin_context():
return {'ignore_auth': True, 'user': 'default'}


def create_preview_job(resource, override=False):
@rqjob_register(ckanext="dc_view",
queue="dcor-normal",
timeout=3600,
)
def job_create_preview(resource, override=False):
"""Generate a *_preview.png file for a DC resource"""
if not s3.is_available():
log.info("S3 not available, not computing condensed resource")
return False

# make sure mimetype is defined
if "mimetype" not in resource:
suffix = "." + resource["name"].rsplit(".", 1)[-1]
for mt in DC_MIME_TYPES:
if suffix in DC_MIME_TYPES[mt]:
resource["mimetype"] = mt
break

rid = resource["id"]
wait_for_resource(rid)
mtype = resource.get('mimetype', '')
if (mtype in DC_MIME_TYPES
if (resource.get('mimetype', '') in DC_MIME_TYPES
# Check whether the file already exists on S3
and (override
or not s3cc.artifact_exists(resource_id=rid,
Expand Down
4 changes: 2 additions & 2 deletions ckanext/dc_view/meta.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,10 @@ def meta_html_table(meta, sec):
value += " " + units

html_code += [
f'<tr>',
'<tr>',
f'<th class="dataset-labels">{html.escape(name)}</th>',
f'<td class="dataset-details">{html.escape(value)}</td>',
f'</tr>',
'</tr>',
]
html_code.append("</table>")
return html_code
31 changes: 4 additions & 27 deletions ckanext/dc_view/plugin.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,11 @@
import copy

from flask import Blueprint
from ckan import common
import ckan.lib.datapreview as datapreview
from ckan.lib.jobs import _connect as ckan_redis_connect
import ckan.plugins.toolkit as toolkit
import ckan.plugins as plugins

from dcor_shared import DC_MIME_TYPES, s3
from rq.job import Job

from .cli import get_commands
from .jobs import create_preview_job
from . import jobs
from .meta import render_metadata_html
from .route_funcs import dcpreview

Expand Down Expand Up @@ -55,26 +49,9 @@ def after_resource_create(self, context, resource):
"""Generate preview image and upload to S3"""
# We only create the preview and upload it to S3 if the file is
# a DC file and if S3 is available.
if resource.get('mimetype') in DC_MIME_TYPES and s3.is_available():
pkg_job_id = f"{resource['package_id']}_{resource['position']}_"
depends_on = []
extensions = [common.config.get("ckan.plugins")]
# Are we waiting for symlinking (ckanext-dcor_depot)?
# (This makes wait_for_resource really fast ;)
if "dcor_depot" in extensions:
# Wait for the resource to be moved to the depot.
jid_sl = pkg_job_id + "symlink"
depends_on.append(jid_sl)
jid_preview = pkg_job_id + "previews3"
if not Job.exists(jid_preview, connection=ckan_redis_connect()):
toolkit.enqueue_job(create_preview_job,
[resource],
title="Create resource preview image",
queue="dcor-normal",
rq_kwargs={
"timeout": 3600,
"job_id": jid_preview,
"depends_on": copy.copy(depends_on)})
if not context.get("is_background_job") and s3.is_available():
# All jobs are defined via decorators in jobs.py
jobs.RQJob.enqueue_all_jobs(resource, ckanext="dc_view")

# IResourceView
def info(self):
Expand Down

0 comments on commit 56ecf93

Please sign in to comment.