Skip to content

Commit

Permalink
Get the CI Working (#207)
Browse files Browse the repository at this point in the history
Ouch - left it broken for a very long time! This PR is just about getting that working again.

* Fix up flake8 errors
* Clean up CI build - do not force-install `qastle` anymore (that was from a long time ago).
* Move to using python 3.10 as our default check to make sure everything works.
  • Loading branch information
gordonwatts authored Feb 14, 2023
1 parent ebee8e6 commit 45cafb0
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 45 deletions.
8 changes: 3 additions & 5 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,13 @@ jobs:

steps:
- uses: actions/checkout@master
- name: Set up Python ${{ matrix.python-version }}
- name: Set up Python
uses: actions/setup-python@v1
with:
python-version: ${{ matrix.python-version }}
python-version: "3.10"
- name: Install dependencies
run: |
python -m pip install --upgrade pip setuptools wheel
pip install git+https://github.com/iris-hep/ast-language.git@e6470deb68529e1885a4bc46f781e2fe43a6f4c8
pip install --no-cache-dir -e .[test]
pip list
- name: Lint with Flake8
Expand All @@ -31,7 +30,7 @@ jobs:
strategy:
matrix:
platform: [ubuntu-latest, macOS-latest, windows-latest]
python-version: [3.9]
python-version: ["3.10"]
runs-on: ${{ matrix.platform }}
needs: flake8

Expand All @@ -44,7 +43,6 @@ jobs:
- name: Install dependencies
run: |
python -m pip install --upgrade pip setuptools wheel
pip install git+https://github.com/iris-hep/ast-language.git@e6470deb68529e1885a4bc46f781e2fe43a6f4c8
pip install --no-cache-dir -e .[test]
pip list
- name: Test with pytest
Expand Down
6 changes: 4 additions & 2 deletions func_adl_xAOD/common/local_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,13 +235,15 @@ def _dump_info(

lg.log(level, f"Docker image and tag: {docker_image}")
lg.log(level, "Docker Output: ")
_dump_split_string(running_string, lambda l: lg.log(level, f" {l}"))
_dump_split_string(running_string, lambda line: lg.log(level, f" {line}"))

for file in local_run_dir.glob("*"):
if file.is_file() and (file.suffix != ".root"):
lg.log(level, f"{file.name}:")
with file.open("r") as f:
_dump_split_string(f.read(), lambda l: lg.log(level, f" {l}"))
_dump_split_string(
f.read(), lambda line: lg.log(level, f" {line}")
)


def _dump_split_string(s: str, dump: Callable[[str], None]):
Expand Down
96 changes: 58 additions & 38 deletions tests/utils/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@


def _extract_result_TTree(rep: cpp_ttree_rep, run_dir):
'''Copy the final file into a place that is "safe", and return that as a path.
"""Copy the final file into a place that is "safe", and return that as a path.
The reason for this is that the temp directory we are using is about to be deleted!
Expand All @@ -32,25 +32,30 @@ def _extract_result_TTree(rep: cpp_ttree_rep, run_dir):
Raises:
Exception: [description]
'''
"""
current_path = run_dir / rep.filename
new_path = Path('.') / rep.filename
new_path = Path(".") / rep.filename
shutil.copy(current_path, new_path)
return new_path


def _dump_split_string(s: str, dump: Callable[[str], None]):
for ll in s.split('\n'):
for ll in s.split("\n"):
dump(ll)


class LocalFile(EventDataset, ABC):
'''
"""
A dataset that is represented by a local file on disk. It will be processed in
the back end via a docker container.
'''

def __init__(self, docker_image: str, source_file_name: str, local_files: Union[Path, List[Path]]):
"""

def __init__(
self,
docker_image: str,
source_file_name: str,
local_files: Union[Path, List[Path]],
):
EventDataset.__init__(self)
self._docker_image = docker_image
self._source_file_name = source_file_name
Expand All @@ -68,69 +73,83 @@ def get_executor_obj(self) -> executor:
pass

async def execute_result_async(self, a: ast.AST, title: str) -> Any:
'''
"""
Run the file locally with docker
'''
"""
# Construct the files we will run.
with tempfile.TemporaryDirectory() as local_run_dir_p:

local_run_dir = Path(local_run_dir_p)
local_run_dir.chmod(0o777)

exe = self.get_executor_obj()
f_spec = exe.write_cpp_files(exe.apply_ast_transformations(a), local_run_dir)
f_spec = exe.write_cpp_files(
exe.apply_ast_transformations(a), local_run_dir
)

# Write out a file with the mapped in directories.
# Until we better figure out how to deal with this, there are some restrictions
# on file locations.
datafile_dir: Optional[Path] = None
with open(f'{local_run_dir}/filelist.txt', 'w') as flist_out:
with open(f"{local_run_dir}/filelist.txt", "w") as flist_out:
for u in self._files:
if not u.exists():
self.raise_docker_exception(f'Cannot access (or find) file {u}')
self.raise_docker_exception(f"Cannot access (or find) file {u}")

ds_path = u.parent
datafile = u.name
flist_out.write(f'/data/{datafile}\n')
flist_out.write(f"/data/{datafile}\n")
if datafile_dir is None:
datafile_dir = ds_path
else:
if ds_path != datafile_dir:
raise Exception(f'Data files must be from the same directory. Have seen {ds_path} and {datafile_dir} so far.')
raise Exception(
f"Data files must be from the same directory. Have seen {ds_path} and {datafile_dir} so far."
)

# Build a docker command to run this.
datafile_mount = "" if datafile_dir is None else f'-v {datafile_dir}:/data'
docker_cmd = f'docker run --rm -v {f_spec.output_path}:/scripts -v {f_spec.output_path}:/results {datafile_mount} {self._docker_image} /scripts/{f_spec.main_script}'
proc = await asyncio.create_subprocess_shell(docker_cmd,
stdout=asyncio.subprocess.PIPE, # type: ignore
stderr=asyncio.subprocess.PIPE) # type: ignore
datafile_mount = "" if datafile_dir is None else f"-v {datafile_dir}:/data"
docker_cmd = f"docker run --rm -v {f_spec.output_path}:/scripts -v {f_spec.output_path}:/results {datafile_mount} {self._docker_image} /scripts/{f_spec.main_script}"
proc = await asyncio.create_subprocess_shell(
docker_cmd,
stdout=asyncio.subprocess.PIPE, # type: ignore
stderr=asyncio.subprocess.PIPE,
) # type: ignore
p_stdout, p_stderr = await proc.communicate()
if proc.returncode != 0 or dump_running_log:
lg = logging.getLogger(__name__)
level = logging.INFO if proc.returncode == 0 else logging.ERROR
lg.log(level, f"Result of run: {proc.returncode}")
lg.log(level, 'std Output: ')
_dump_split_string(p_stdout.decode(), lambda l: lg.log(level, f' {l}'))
lg.log(level, 'std Error: ')
_dump_split_string(p_stderr.decode(), lambda l: lg.log(level, f' {l}'))
lg.log(level, "std Output: ")
_dump_split_string(
p_stdout.decode(), lambda line: lg.log(level, f" {line}")
)
lg.log(level, "std Error: ")
_dump_split_string(
p_stderr.decode(), lambda line: lg.log(level, f" {line}")
)
if dump_cpp or proc.returncode != 0:
level = logging.INFO if proc.returncode == 0 else logging.ERROR
lg = logging.getLogger(__name__)
for file in local_run_dir.glob('*'):
if file.is_file() and (file.suffix != '.root'):
lg.log(level, f'{file.name}:')
with file.open('r') as f:
_dump_split_string(f.read(), lambda l: lg.log(level, f' {l}'))
for file in local_run_dir.glob("*"):
if file.is_file() and (file.suffix != ".root"):
lg.log(level, f"{file.name}:")
with file.open("r") as f:
_dump_split_string(
f.read(), lambda line: lg.log(level, f" {line}")
)
if proc.returncode != 0:
raise Exception(f"Docker command failed with error {proc.returncode} ({docker_cmd})")
raise Exception(
f"Docker command failed with error {proc.returncode} ({docker_cmd})"
)

# Now that we have run, we can pluck out the result.
assert isinstance(f_spec.result_rep, cpp_ttree_rep), 'Unknown return type'
assert isinstance(f_spec.result_rep, cpp_ttree_rep), "Unknown return type"
return _extract_result_TTree(f_spec.result_rep, local_run_dir)


class dummy_executor(ABC):
'Override the docker part of the execution engine'
"Override the docker part of the execution engine"

def __init__(self):
self.QueryVisitor = None
Expand All @@ -149,12 +168,11 @@ def evaluate(self, a: ast.AST):
self.QueryVisitor = self.get_visitor_obj()
# TODO: #126 query_ast_visitor needs proper arguments
a_transformed = rnr.apply_ast_transformations(a)
self.ResultRep = \
self.QueryVisitor.get_as_ROOT(a_transformed)
self.ResultRep = self.QueryVisitor.get_as_ROOT(a_transformed)
self._job_option_blocks = rnr._job_option_blocks

def get_result(self, q_visitor, result_rep):
'Got the result. Cache for use in tests'
"Got the result. Cache for use in tests"
self.QueryVisitor = q_visitor
self.ResultRep = result_rep
return self
Expand All @@ -175,17 +193,19 @@ def get_dummy_executor_obj(self) -> dummy_executor:
pass

async def execute_result_async(self, a: ast.AST, title: str) -> Any:
'Dummy executor that will return the ast properly rendered. If qastle_roundtrip is true, then we will round trip the ast via qastle first.'
"Dummy executor that will return the ast properly rendered. If qastle_roundtrip is true, then we will round trip the ast via qastle first."
# Round trip qastle if requested.
if self._q_roundtrip:
import qastle
print(f'before: {ast.dump(a)}')

print(f"before: {ast.dump(a)}")
a_text = qastle.python_ast_to_text_ast(a)
a = qastle.text_ast_to_python_ast(a_text).body[0].value
print(f'after: {ast.dump(a)}')
print(f"after: {ast.dump(a)}")

# Setup the rep for this dataset
from func_adl import find_EventDataset

file = find_EventDataset(a)
iterator = cpp_variable("bogus-do-not-use", top_level_scope(), cpp_type=None)
set_rep(file, cpp_sequence(iterator, iterator, top_level_scope()))
Expand Down

0 comments on commit 45cafb0

Please sign in to comment.