Skip to content

Commit 00785c5

Browse files
authored
Merge branch 'master' into feat/add-tagging-util-tests
2 parents 0599df1 + b5fc5e7 commit 00785c5

11 files changed

Lines changed: 800 additions & 11 deletions

File tree

devtools/Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -346,6 +346,6 @@ ui: setup-tilt
346346
@echo "🔗 Opening Metaflow UI at http://localhost:3000"
347347
@open http://localhost:3000
348348

349-
.PHONY: install-helm setup-minikube setup-tilt teardown-minikube tunnel up down check-docker install-curl install-gum install-brew up down dashboard shell ui all-up help
349+
.PHONY: install-helm check-docker install-brew install-curl install-gum setup-minikube setup-tilt tunnel teardown-minikube dashboard up all-up down shell wait-until-ready create-dev-shell ui help
350350

351351
.DEFAULT_GOAL := help

metaflow/cli.py

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -475,9 +475,18 @@ def start(
475475
# be raised. For resume, since we ignore those options, we ignore the error.
476476
raise ctx.obj.delayed_config_exception
477477

478+
# Initialize the phase early so it can be used in the mutators
479+
# The phase is determined by which CLI subcommand is being invoked (e.g. "run" → LAUNCH,
480+
# "step" → TASK, "batch" → TRAMPOLINE).
481+
system_context._update(phase=_phase_from_cli_args(getattr(ctx, "saved_args", None)))
482+
478483
# Process config decorators (this is the pre_mutate phase for both flow mutators and
479484
# step mutators -- the mutate is called in init_step_decorators)
480485

486+
# Init all values in the flow mutators and then process them
487+
for decorator in ctx.obj.flow._flow_mutators:
488+
decorator.external_init()
489+
481490
new_cls = ctx.obj.flow._process_config_decorators(config_options)
482491
if new_cls:
483492
ctx.obj.flow = new_cls(use_cli=False)
@@ -561,13 +570,7 @@ def start(
561570
ctx.obj.monitor.start()
562571
_system_monitor.init_system_monitor(ctx.obj.flow.name, ctx.obj.monitor)
563572

564-
# Populate the system context singleton for this process. The phase is
565-
# determined by which CLI subcommand is being invoked (e.g. "run" → LAUNCH,
566-
# "step" → TASK, "batch" → TRAMPOLINE).
567-
saved_args = getattr(ctx, "saved_args", None)
568-
phase = _phase_from_cli_args(saved_args)
569573
system_context._update(
570-
phase=phase,
571574
flow=ctx.obj.flow,
572575
graph=ctx.obj.graph,
573576
environment=ctx.obj.environment,

metaflow/debug.py

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,16 +22,33 @@ class Debug(object):
2222
def __init__(self):
2323
import metaflow.metaflow_config as config
2424

25+
have_debug_options = False
2526
for typ in config.DEBUG_OPTIONS:
2627
if getattr(config, "DEBUG_%s" % typ.upper()):
2728
op = partial(self.log, typ)
29+
have_debug_options = True
2830
else:
2931
op = self.noop
3032
# use debug.$type_exec(args) to log command line for subprocesses
3133
# of type $type
3234
setattr(self, "%s_exec" % typ, op)
3335
# use the debug.$type flag to check if logging is enabled for $type
3436
setattr(self, typ, op != self.noop)
37+
# In some environments (I'm looking at you Bazel), the path to the filename is
38+
# super long and not very useful. We will print it once and truncate the rest.
39+
# This is not 100% accurate as each package is in a separate directory so the
40+
# prefix length may be a bit different but it's good enough and removes a lot
41+
# of noise while also keeping the cost low (instead of having to figure out
42+
# the prefix for each package)
43+
self.prefix_len = 0
44+
if have_debug_options:
45+
# Figure out the name of the current file and strip out everything before
46+
#
47+
self.prefix_len = len(inspect.stack()[0][1][: -len("metaflow/debug.py")])
48+
self.log(
49+
"options",
50+
"File prefix is: %s" % inspect.stack()[0][1][: self.prefix_len],
51+
)
3552

3653
def log(self, typ, args):
3754
if is_stringish(args):
@@ -40,7 +57,10 @@ def log(self, typ, args):
4057
s = " ".join(args)
4158
lineno = inspect.currentframe().f_back.f_lineno
4259
filename = inspect.stack()[1][1]
43-
print("debug[%s %s:%s]: %s" % (typ, filename, lineno, s), file=sys.stderr)
60+
print(
61+
"debug[%s %s:%s]: %s" % (typ, filename[self.prefix_len :], lineno, s),
62+
file=sys.stderr,
63+
)
4464

4565
def __getattr__(self, name):
4666
# Small piece of code to get pyright and other linters to recognize that there

metaflow/decorators.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -272,6 +272,41 @@ def get_top_level_options(self):
272272
"""
273273
return []
274274

275+
def add_to_package(self):
276+
"""
277+
Called to add custom files needed by this flow decorator. This hook will be
278+
called in the `MetaflowPackage` class where metaflow compiles the code package
279+
tarball. This hook can return one of two things (the first is for backwards
280+
compatibility -- move to the second):
281+
- a generator yielding a tuple of `(file_path, arcname)` to add files to
282+
the code package. `file_path` is the path to the file on the local filesystem
283+
and `arcname` is the path relative to the packaged code.
284+
- a generator yielding a tuple of `(content, arcname, type)` where:
285+
- type is one of
286+
ContentType.{USER_CONTENT, CODE_CONTENT, MODULE_CONTENT, OTHER_CONTENT}
287+
- for USER_CONTENT:
288+
- the file will be included relative to the directory containing the
289+
user's flow file.
290+
- content: path to the file to include
291+
- arcname: path relative to the directory containing the user's flow file
292+
- for CODE_CONTENT:
293+
- the file will be included relative to the code directory in the package.
294+
This will be the directory containing `metaflow`.
295+
- content: path to the file to include
296+
- arcname: path relative to the code directory in the package
297+
- for MODULE_CONTENT:
298+
- the module will be added to the code package as a python module. It will
299+
be accessible as usual (import <module_name>)
300+
- content: name of the module
301+
- arcname: None (ignored)
302+
- for OTHER_CONTENT:
303+
- the file will be included relative to any other configuration/metadata
304+
files for the flow
305+
- content: path to the file to include
306+
- arcname: path relative to the config directory in the package
307+
"""
308+
return []
309+
275310

276311
# compare this to parameters.add_custom_parameters
277312
def add_decorator_options(cmd):

metaflow/package/__init__.py

Lines changed: 73 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,12 @@ def _module_selector(m) -> bool:
132132
self._blob_url = None
133133
self._blob = None
134134

135+
# USER_CONTENT files contributed by decorators/mutators via add_to_package.
136+
# Keyed by arcname (path relative to the flow directory) -> absolute file path.
137+
# These are merged with the files discovered by walking the flow directory;
138+
# duplicates (same arcname) are dropped to avoid conflicts.
139+
self._user_content_from_addl: Dict[str, str] = {}
140+
135141
# Update package in the system context -- it will be available
136142
# in all hooks going forward including the ones called in the
137143
# thread that is used to create the package asynchronously.
@@ -611,11 +617,33 @@ def _check_tuple(path_tuple):
611617
raise NonUniqueFileNameToFilePathMappingException(
612618
file_name, [deco_module_paths[file_name], file_path]
613619
)
620+
elif file_type == ContentType.USER_CONTENT:
621+
# USER_CONTENT files will be merged with the files discovered by
622+
# walking the flow directory. Track them here so we can:
623+
# 1. Include them in the package even if they live outside the
624+
# flow directory (or are excluded by the user_code_filter).
625+
# 2. Avoid duplicating files already picked up by the walker.
626+
real_path = os.path.realpath(path_tuple[0])
627+
path_tuple = (real_path, file_name, file_type)
628+
existing = self._user_content_from_addl.get(file_name)
629+
if existing is None:
630+
self._user_content_from_addl[file_name] = real_path
631+
elif existing != real_path:
632+
raise NonUniqueFileNameToFilePathMappingException(
633+
file_name, [existing, real_path]
634+
)
635+
else:
636+
return None # Already recorded for this arcname
614637
else:
615638
raise ValueError(f"Unknown file type: {file_type}")
616639
return path_tuple
617640

618641
def _add_tuple(path_tuple):
642+
# USER_CONTENT is intentionally NOT handled here: those files are
643+
# packaged alongside the user's flow code (see _user_code_tuples)
644+
# rather than under the mfcontent namespace, and are tracked in
645+
# self._user_content_from_addl by _check_tuple above. mfcontent
646+
# owns MODULE/CODE/OTHER only.
619647
file_path, file_name, file_type = path_tuple
620648
if file_type == ContentType.MODULE_CONTENT:
621649
# file_path is actually a module
@@ -625,6 +653,16 @@ def _add_tuple(path_tuple):
625653
elif file_type == ContentType.OTHER_CONTENT:
626654
self._mfcontent.add_other_file(file_path, file_name)
627655

656+
# flow decorators
657+
for decos in self._flow._flow_decorators.values():
658+
for deco in decos:
659+
for path_tuple in deco.add_to_package():
660+
path_tuple = _check_tuple(path_tuple)
661+
if path_tuple is None:
662+
continue
663+
_add_tuple(path_tuple)
664+
665+
# step decorators
628666
for step in self._flow:
629667
for deco in step.decorators:
630668
for path_tuple in deco.add_to_package():
@@ -640,16 +678,42 @@ def _add_tuple(path_tuple):
640678
continue
641679
_add_tuple(path_tuple)
642680

681+
# flow mutators
682+
for mutator in self._flow._flow_mutators:
683+
for path_tuple in mutator.add_to_package():
684+
path_tuple = _check_tuple(path_tuple)
685+
if path_tuple is None:
686+
continue
687+
_add_tuple(path_tuple)
688+
689+
# step mutators (deduplicated across steps)
690+
seen_step_mutators = set()
691+
for step in self._flow:
692+
for mutator in step.config_decorators:
693+
if id(mutator) in seen_step_mutators:
694+
continue
695+
seen_step_mutators.add(id(mutator))
696+
for path_tuple in mutator.add_to_package():
697+
path_tuple = _check_tuple(path_tuple)
698+
if path_tuple is None:
699+
continue
700+
_add_tuple(path_tuple)
701+
643702
def _user_code_tuples(self):
703+
# Track arcnames yielded by the directory walker so we can detect overlap
704+
# with USER_CONTENT files contributed via add_to_package hooks.
705+
seen_arcnames = set()
644706
if R.use_r():
645707
# the R working directory
646708
self._user_flow_dir = R.working_dir()
647709
for path_tuple in walk(
648710
"%s/" % R.working_dir(), file_filter=self._user_code_filter
649711
):
712+
seen_arcnames.add(path_tuple[1])
650713
yield path_tuple
651714
# the R package
652715
for path_tuple in R.package_paths():
716+
seen_arcnames.add(path_tuple[1])
653717
yield path_tuple
654718
else:
655719
# the user's working directory
@@ -660,10 +724,17 @@ def _user_code_tuples(self):
660724
file_filter=self._user_code_filter,
661725
exclude_tl_dirs=self._exclude_tl_dirs,
662726
):
663-
# TODO: This is where we will check if the file is already included
664-
# in the mfcontent portion
727+
seen_arcnames.add(path_tuple[1])
665728
yield path_tuple
666729

730+
# Emit USER_CONTENT files contributed by decorators/mutators that were not
731+
# already picked up by the directory walker (either because they live
732+
# outside the flow directory or were filtered out by the suffix/user filter).
733+
for arcname, file_path in self._user_content_from_addl.items():
734+
if arcname in seen_arcnames:
735+
continue
736+
yield (file_path, arcname)
737+
667738
def _make(self):
668739
backend = self._backend()
669740
with backend.create() as archive:
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
"""
2+
Example FlowMutator that extends the set of file suffixes included in the code
3+
package — mirrors the ``--package-suffixes`` CLI option as a decorator.
4+
5+
By default Metaflow's packaging walks the flow directory and includes files
6+
whose suffixes are in ``DEFAULT_PACKAGE_SUFFIXES`` (``.py,.R,.RDS``) plus
7+
whatever was passed via ``--package-suffixes``. This mutator lets a flow
8+
author declare additional suffixes directly on the flow class::
9+
10+
from metaflow import FlowSpec, step
11+
from metaflow.plugins.package_suffixes_mutator import package_suffixes
12+
13+
@package_suffixes([".yaml", ".json"])
14+
class MyFlow(FlowSpec):
15+
@step
16+
def start(self):
17+
...
18+
19+
The mutator walks the flow directory itself and yields every file with a
20+
matching suffix as ``USER_CONTENT``. The packaging layer deduplicates against
21+
the files that the default walker already picked up, so files that are already
22+
included (e.g. ``.py`` files) are not added twice.
23+
"""
24+
25+
import inspect
26+
import os
27+
from typing import List, Union
28+
29+
from metaflow.packaging_sys import ContentType
30+
from metaflow.packaging_sys.utils import walk
31+
from metaflow.user_decorators.user_flow_decorator import FlowMutator
32+
33+
34+
class package_suffixes(FlowMutator):
35+
"""Include additional file suffixes in the code package.
36+
37+
Parameters
38+
----------
39+
suffixes : list of str or comma-separated str
40+
Additional file suffixes to include (e.g. ``[".yaml", ".json"]`` or
41+
``".yaml,.json"``). Leading dots are optional; a suffix without a
42+
leading dot is treated as an extension (``yaml`` → ``.yaml``).
43+
"""
44+
45+
def init(self, suffixes: Union[List[str], str]):
46+
if isinstance(suffixes, str):
47+
suffixes = [s.strip() for s in suffixes.split(",") if s.strip()]
48+
self._suffixes = tuple(
49+
(s if s.startswith(".") else "." + s).lower() for s in suffixes
50+
)
51+
52+
def add_to_package(self):
53+
if not self._suffixes:
54+
return
55+
56+
try:
57+
flow_file = inspect.getfile(self._flow_cls)
58+
except (TypeError, OSError):
59+
return
60+
flow_dir = os.path.dirname(os.path.abspath(flow_file))
61+
if not flow_dir or not os.path.isdir(flow_dir):
62+
return
63+
64+
def _filter(fname: str) -> bool:
65+
lname = fname.lower()
66+
return any(lname.endswith(sfx) for sfx in self._suffixes)
67+
68+
for file_path, arcname in walk(flow_dir + os.sep, file_filter=_filter):
69+
yield (file_path, arcname, ContentType.USER_CONTENT)

metaflow/user_decorators/user_flow_decorator.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -310,3 +310,38 @@ def mutate(
310310
A representation of this flow
311311
"""
312312
return None
313+
314+
def add_to_package(self):
315+
"""
316+
Called to add custom files needed by this flow mutator. This hook will be
317+
called in the `MetaflowPackage` class where metaflow compiles the code package
318+
tarball. This hook can return one of two things (the first is for backwards
319+
compatibility -- generally use the second when you implement your mutator):
320+
- a generator yielding a tuple of `(file_path, arcname)` to add files to
321+
the code package. `file_path` is the path to the file on the local filesystem
322+
and `arcname` is the path relative to the packaged code.
323+
- a generator yielding a tuple of `(content, arcname, type)` where:
324+
- type is one of
325+
ContentType.{USER_CONTENT, CODE_CONTENT, MODULE_CONTENT, OTHER_CONTENT}
326+
- for USER_CONTENT:
327+
- the file will be included relative to the directory containing the
328+
user's flow file.
329+
- content: path to the file to include
330+
- arcname: path relative to the directory containing the user's flow file
331+
- for CODE_CONTENT:
332+
- the file will be included relative to the code directory in the package.
333+
This will be the directory containing `metaflow`.
334+
- content: path to the file to include
335+
- arcname: path relative to the code directory in the package
336+
- for MODULE_CONTENT:
337+
- the module will be added to the code package as a python module. It will
338+
be accessible as usual (import <module_name>)
339+
- content: name of the module
340+
- arcname: None (ignored)
341+
- for OTHER_CONTENT:
342+
- the file will be included relative to any other configuration/metadata
343+
files for the flow
344+
- content: path to the file to include
345+
- arcname: path relative to the config directory in the package
346+
"""
347+
return []

0 commit comments

Comments
 (0)