Skip to content

Flyte v1.15.0 milestone release

Latest
Compare
Choose a tag to compare
@flyte-bot flyte-bot released this 15 Feb 03:06
· 19 commits to master since this release
ee919f8

Flyte 1.15.0 release notes

Added

Streaming Decks (#2779)

Decks is a Flyte feature that provides customizable visualization tools for tasks, offering both default and user-defined insights
For use cases like Hyperparameter Optimization, seeing the metrics, data, logs, and links on the Flyte Deck while the task is running is extremely useful. However, the Flyte Deck HTML is uploaded to the remote bucket only after the task succeeds.
With this change, flytepropeller now sends the Deck URI to flyteadmin once the task starts running. This feature is also helpful for debugging, identifying your task’s dependencies and even looking at the source code that is being executed.

Screenshot 2025-02-14 at 2.10.27 PM

Define shared memory in the task definition

Several ML libraries make use of shared memory, so this is now easier to do in Flyte. This new argument can be either a bool or a string value, the former means "memory backed volumes are sized to node allocatable memory", whereas the latter means that a volume of that size will be defined in the podspec used to run the task. For example, the simple case:

import os
from flytekit import task

@task(shared_memory=True)
def t1() -> bool:
    # /dev/shm is available in the task
    ...

or a more complex case:

import os
from flytekit import task

@task(shared_memory="10Gi")
def t1() -> bool:
    # /dev/shm of size 10Gi is available in the task
    ...

Progress bars to help visualize package creation and upload

Flytekit includes now progress tracking, including package creation/compression progress visualization.

Here's how the progress bar looks like:

image

And after files are compressed and uploaded:

image

Debug @dynamic tasks

Advancing the debug story we've been chipping away, we now support debugging @dynamic tasks.

Changed

Better Secrets handling (#3048)

Flyte enables tasks to requests secrets managed by Kubernetes or by external systems.
If you want to map and mount a Kubernetes secret with an environment variable, flytekit required you to export an environment variable as _FSEC_<GROUP>_<GROUP_VERSION>_<KEY> with GROUP corresponding to the Secret name, and KEY being the actual key to retrieve from the secret. This was inconvenient and too verbose.
Starting with this release, you can easily configure secrets from environment variables::

@task(secret_requests=[Secret(..., env_var="MY_ENVVAR", mount_requirement=Secret.MountType.ENV_VAR)
def hello():
    ...

Or from files:

@task(secret_requests=[Secret(..., env_var="MY_TOKEN_PATH", mount_requirement=Secret.MountType.FILE)
def hello():
    ...

Override Pod template using .with_overrides (#2981)

Flyte lets you specify default resources for task executions to enable deterministic scheduling behavior.
The .with_overrides method allows you to override the global resource defaults dynamically:

@workflow
def my_pipeline(x: typing.List[int]) -> int:
    return square_1(x=count_unique_numbers_1(x=x)).with_overrides(limits=Resources(cpu="6", mem="500Mi"))

This method has been available for Resources only. Starting with this release, you can now override the Pod template too.

Similar to Resources, you can instruct flytepropeller to use a custom Pod template for the task executions, setting the template at different levels: from namespace-wide to the task configuration.
Starting with this release, you can override dynamically the Pod template when the task is invoked in the workflow:

@task
def say_hello() -> str:
    return "Hello, World!"

@workflow
def hello_world_wf() -> str:
    res = say_hello().with_overrides(limits=Resources(cpu="2", mem="600Mi"),pod_template=PodTemplate(
        primary_container_name="primary-nelson",
        labels={"lKeyA": "lValA", "lKeyB": "lValB"},
        annotations={"aKeyA": "aValA", "aKeyB": "aValB"},
        pod_spec=V1PodSpec(
            containers=[
                V1Container(
                    name="primary-nelson",
                    image="arbaobao/flyte-test-images:pythonpath5",
                    env=[V1EnvVar(name="eKeyC", value="eValC"), V1EnvVar(name="eKeyD", value="eValD")],
                ),
                V1Container(
                    name="primary-nelson2",
                    image="arbaobao/flyte-test-images:pythonpath5",
                    env=[V1EnvVar(name="eKeyC", value="eValC"), V1EnvVar(name="eKeyD", value="eValD")],
                ),
            ],
        )
    ))
    return res

This change provides more flexible pod configuration management while maintaining compatibility with existing resource override patterns.

Introducing Caching Policies (#3129)

Flyte provides robust caching mechanisms to optimize workflow performance. Two key features users can leverage today include:

a. Task Output Caching with Deterministic Inputs

Users can enable task-level caching by setting cache=True in the @task decorator. This ensures that tasks with identical inputs reuse previously computed outputs, saving time and resources.

Example:

@task(cache=True, cache_version="1.0")  
def square(n: int) -> int:  
    return n * n 

b. Custom Hashing for Offloaded Objects (e.g., DataFrames)

Flyte allows users to define custom hash functions for non-deterministic objects (like pandas DataFrames) using Annotated types and HashMethod. This ensures caching works even for complex data types.

Example:

def hash_pandas_dataframe(df: pd.DataFrame) -> str:  
    return str(pd.util.hash_pandas_object(df))  

@task  
def data_loader(cache=True, cache_version="1.0",
) -> Annotated[pd.DataFrame, HashMethod(hash_pandas_dataframe)]:  
    return pd.DataFrame(...) 

This release brings a refactoring of the Flyte’s caching system into a unified Cache object, deprecating legacy parameters and adding advanced features:

1. Unified Cache Configuration Object

All caching parameters (existing and new) are now grouped under a single Cache class:

class Cache:  
    version: Optional[str] = None # Replaces `cache_version`  
    serialize: bool = False      # Replaces `cache_serialize`  
    ignored_inputs: Union[Tuple[str, ...], str] = ()  # New: Exclude inputs from cache key  
    salt: str = ""           New: Add unique salt to cache key  
    policies: Optional[Union[List[CachePolicy], CachePolicy]] = None  # New: Dynamic versioning 

Deprecation Notice: Legacy parameters (e.g., cache_version, cache_serialize) are deprecated but remain supported.

2. Advanced Cache Key Customization
  • Cache Policies: Define dynamic versioning logic via CachePolicy protocols.

  • Policies generate version strings through the get_version method. Those are then concatenated and hashed (SHA-256) to form the final cache key.

In case a Cache object is defined, we require the definition of policies

  • Salt: Add a unique string to the cache key to avoid collisions (e.g., differentiate between teams sharing the same task).

  • Ignored Inputs: Exclude specific inputs (e.g., request_id) from cache key calculation using ignored_inputs.

c. Migration Path

Existing code using cache=True or cache_version will continue to work, but users are encouraged to migrate to the Cache object:

@task(cache=Cache(version="1.0", serialize=True, ignored_inputs=("debug_mode",)))  
def my_task(...) -> ...: 

Faster GitIgnore checks

Algorithmic speed-ups are always welcome and this is exactly what happened in flyteorg/flytekit#3007. Enjoy the massive speedup!

Full changelog

New Contributors

Full Changelog: v1.14.1...v1.15.0