Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[wip] Pandas Dataframe in Dataclass #3116

Open
wants to merge 8 commits into
base: master
Choose a base branch
from

Conversation

Future-Outlier
Copy link
Member

@Future-Outlier Future-Outlier commented Feb 7, 2025

Tracking issue

Why are the changes needed?

we want to improve UX.

import flytekit
from flytekit import task, workflow
from dataclasses import dataclass, field
import pandas as pd
# from pydantic import BaseModel
from flytekit.types.structured.structured_dataset import (
    PARQUET,
    StructuredDataset,
    StructuredDatasetDecoder,
    StructuredDatasetEncoder,
    StructuredDatasetTransformerEngine,
)

@dataclass
class smallDC:
    df: pd.DataFrame = field(default_factory=lambda: pd.DataFrame([[1, 2, 3], [4, 5, 6]]))

@dataclass
class DC:
    dc: smallDC
    df: pd.DataFrame = field(default_factory=lambda: pd.DataFrame([[1, 2, 3], [4, 5, 6]]))

@task
def t1() -> DC:
    return DC(dc=smallDC())

if __name__ == "__main__":
    print(t1())

What changes were proposed in this pull request?

How was this patch tested?

Setup process

Screenshots

Check all the applicable boxes

  • I updated the documentation accordingly.
  • All new and existing tests passed.
  • All commits are signed-off.

Related PRs

Docs link

Summary by Bito

This PR enhances Flytekit's functionality in multiple areas: 1) Improves the DataclassTransformer with automatic Pandas DataFrame to StructuredDataset conversion and better type handling, 2) Enhances error handling and validation for msgpack data, authentication, and secret mount management, 3) Implements recursive transformation capabilities and improved JSON decoding, 4) Introduces configurable caching policies and strengthens type validation features.

Unit tests added: True

Estimated effort to review (1-5, lower is better): 5

Signed-off-by: Future-Outlier <[email protected]>
@flyte-bot
Copy link
Contributor

flyte-bot commented Feb 7, 2025

Code Review Agent Run #fedbf7

Actionable Suggestions - 2
  • flytekit/core/type_engine.py - 2
    • Consider dedicated transformer for DataFrame conversion · Line 882-884
    • Consider caching decoder with original type · Line 1028-1029
Review Details
  • Files reviewed - 2 · Commit Range: 28db576..28db576
    • flytekit/core/type_engine.py
    • flytekit/types/structured/structured_dataset.py
  • Files skipped - 0
  • Tools
    • Whispers (Secret Scanner) - ✔︎ Successful
    • Detect-secrets (Secret Scanner) - ✔︎ Successful
    • MyPy (Static Code Analysis) - ✔︎ Successful
    • Astral Ruff (Static Code Analysis) - ✔︎ Successful

AI Code Review powered by Bito Logo

@flyte-bot
Copy link
Contributor

flyte-bot commented Feb 7, 2025

Changelist by Bito

This pull request implements the following key changes.

Key Change Files Impacted
Feature Improvement - Enhanced Dataclass Type Handling with Pandas Support

type_engine.py - Added support for Pandas DataFrame in dataclasses with automatic conversion to StructuredDataset and improved type transformation logic

Comment on lines +882 to +884
import pandas as pd
if isinstance(python_val, pd.DataFrame):
python_val = StructuredDataset(dataframe=python_val, file_format="parquet")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider dedicated transformer for DataFrame conversion

Consider moving the pandas DataFrame conversion logic to a dedicated transformer class instead of handling it in the _make_dataclass_serializable method. This would improve code organization and maintainability.

Code suggestion
Check the AI-generated fix before applying
Suggested change
import pandas as pd
if isinstance(python_val, pd.DataFrame):
python_val = StructuredDataset(dataframe=python_val, file_format="parquet")

Code Review Run #fedbf7


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

Comment on lines +1028 to +1029
decoder = JSONDecoder(new_expected_python_type)
self._json_decoder[new_expected_python_type] = decoder
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider caching decoder with original type

Consider storing the decoder in the original expected_python_type key instead of new_expected_python_type to avoid potential memory leaks from storing multiple decoders for the same logical type.

Code suggestion
Check the AI-generated fix before applying
Suggested change
decoder = JSONDecoder(new_expected_python_type)
self._json_decoder[new_expected_python_type] = decoder
decoder = JSONDecoder(new_expected_python_type)
self._json_decoder[expected_python_type] = decoder

Code Review Run #fedbf7


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

Signed-off-by: Future-Outlier <[email protected]>
@flyte-bot
Copy link
Contributor

flyte-bot commented Feb 7, 2025

Code Review Agent Run #6cea0d

Actionable Suggestions - 0
Review Details
  • Files reviewed - 2 · Commit Range: 28db576..fc506fe
    • flytekit/core/type_engine.py
    • flytekit/types/structured/structured_dataset.py
  • Files skipped - 0
  • Tools
    • Whispers (Secret Scanner) - ✔︎ Successful
    • Detect-secrets (Secret Scanner) - ✔︎ Successful
    • MyPy (Static Code Analysis) - ✔︎ Successful
    • Astral Ruff (Static Code Analysis) - ✔︎ Successful

AI Code Review powered by Bito Logo

Signed-off-by: Future-Outlier <[email protected]>
@flyte-bot
Copy link
Contributor

flyte-bot commented Feb 7, 2025

Code Review Agent Run #b16268

Actionable Suggestions - 2
  • flytekit/core/type_engine.py - 2
Review Details
  • Files reviewed - 1 · Commit Range: fc506fe..de150b1
    • flytekit/core/type_engine.py
  • Files skipped - 0
  • Tools
    • Whispers (Secret Scanner) - ✔︎ Successful
    • Detect-secrets (Secret Scanner) - ✔︎ Successful
    • MyPy (Static Code Analysis) - ✔︎ Successful
    • Astral Ruff (Static Code Analysis) - ✔︎ Successful

AI Code Review powered by Bito Logo

from flytekit.types.structured.structured_dataset import StructuredDataset
from typing import get_type_hints, Type, Dict

def convert_dataclass(instance, target_cls):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding type hints to function

The convert_dataclass function could benefit from type hints for better code maintainability and IDE support. Consider adding type annotations for instance and target_cls parameters.

Code suggestion
Check the AI-generated fix before applying
Suggested change
def convert_dataclass(instance, target_cls):
def convert_dataclass(instance: Any, target_cls: Type[T]) -> T:

Code Review Run #b16268


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged


dc = decoder.decode(json_str)

return self._fix_dataclass_int(expected_python_type, dc)
return convert_dataclass(self._fix_dataclass_int(new_expected_python_type, dc), expected_python_type)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider validating dataclass field compatibility

The conversion from new_expected_python_type to expected_python_type may lose data if the types have different field structures. Consider validating field compatibility before conversion.

Code suggestion
Check the AI-generated fix before applying
Suggested change
return convert_dataclass(self._fix_dataclass_int(new_expected_python_type, dc), expected_python_type)
fixed_dc = self._fix_dataclass_int(new_expected_python_type, dc)
# Validate field compatibility
if not all(f.name in [ef.name for ef in fields(expected_python_type)] for f in fields(fixed_dc.__class__)):
raise ValueError(f"Incompatible field structure between {new_expected_python_type.__name__} and {expected_python_type.__name__}")
return convert_dataclass(fixed_dc, expected_python_type)

Code Review Run #b16268


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

Signed-off-by: Future-Outlier <[email protected]>
@flyte-bot
Copy link
Contributor

flyte-bot commented Feb 10, 2025

Code Review Agent Run #9c627a

Actionable Suggestions - 1
  • flytekit/core/type_engine.py - 1
    • Consider extracting duplicated transform_dataclass function · Line 689-710
Review Details
  • Files reviewed - 1 · Commit Range: de150b1..2a3bffe
    • flytekit/core/type_engine.py
  • Files skipped - 0
  • Tools
    • Whispers (Secret Scanner) - ✔︎ Successful
    • Detect-secrets (Secret Scanner) - ✔︎ Successful
    • MyPy (Static Code Analysis) - ✔︎ Successful
    • Astral Ruff (Static Code Analysis) - ✔︎ Successful

AI Code Review powered by Bito Logo

Comment on lines 689 to 710
def transform_dataclass(cls, memo=None):
if memo is None:
memo = {}

if cls in memo:
return memo[cls]

cls_hints = get_type_hints(cls)
new_field_defs = []
for field in fields(cls):
orig_type = cls_hints[field.name]
if orig_type == pd.DataFrame:
new_type = StructuredDataset
elif is_dataclass(orig_type):
new_type = transform_dataclass(orig_type, memo)
else:
new_type = orig_type
new_field_defs.append((field.name, new_type))

new_cls = make_dataclass("FlyteModified" + cls.__name__, new_field_defs)
memo[cls] = new_cls
return new_cls
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider extracting duplicated transform_dataclass function

Consider extracting the transform_dataclass function to avoid code duplication. This function appears multiple times in the codebase (lines 689-710, 764-785, and 1014-1035) with identical functionality.

Code suggestion
Check the AI-generated fix before applying
Suggested change
def transform_dataclass(cls, memo=None):
if memo is None:
memo = {}
if cls in memo:
return memo[cls]
cls_hints = get_type_hints(cls)
new_field_defs = []
for field in fields(cls):
orig_type = cls_hints[field.name]
if orig_type == pd.DataFrame:
new_type = StructuredDataset
elif is_dataclass(orig_type):
new_type = transform_dataclass(orig_type, memo)
else:
new_type = orig_type
new_field_defs.append((field.name, new_type))
new_cls = make_dataclass("FlyteModified" + cls.__name__, new_field_defs)
memo[cls] = new_cls
return new_cls
from flytekit.core.utils import transform_dataclass

Code Review Run #9c627a


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

Signed-off-by: Future-Outlier <[email protected]>
Copy link

codecov bot commented Feb 14, 2025

Codecov Report

Attention: Patch coverage is 1.69492% with 116 lines in your changes missing coverage. Please review.

Project coverage is 41.34%. Comparing base (66d4aed) to head (b5f2a6f).

Files with missing lines Patch % Lines
flytekit/core/type_engine.py 1.69% 116 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff             @@
##           master    #3116       +/-   ##
===========================================
- Coverage   76.85%   41.34%   -35.52%     
===========================================
  Files         206      206               
  Lines       21851    21955      +104     
  Branches     2837     2860       +23     
===========================================
- Hits        16794     9077     -7717     
- Misses       4269    12729     +8460     
+ Partials      788      149      -639     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@flyte-bot
Copy link
Contributor

flyte-bot commented Feb 14, 2025

Code Review Agent Run #e17dbe

Actionable Suggestions - 3
  • flytekit/core/type_engine.py - 3
Review Details
  • Files reviewed - 1 · Commit Range: 2a3bffe..1625381
    • flytekit/core/type_engine.py
  • Files skipped - 0
  • Tools
    • Whispers (Secret Scanner) - ✔︎ Successful
    • Detect-secrets (Secret Scanner) - ✔︎ Successful
    • MyPy (Static Code Analysis) - ✔︎ Successful
    • Astral Ruff (Static Code Analysis) - ✔︎ Successful

AI Code Review powered by Bito Logo

Comment on lines +776 to +778
FLYTE_TYPES = [FlyteFile, FlyteDirectory, StructuredDataset, FlyteSchema]
if cls in FLYTE_TYPES:
return cls
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider moving constant to module level

Consider moving the FLYTE_TYPES list to a module-level constant since it's used in multiple places within the same file. This would improve maintainability and reduce duplication.

Code suggestion
Check the AI-generated fix before applying
Suggested change
FLYTE_TYPES = [FlyteFile, FlyteDirectory, StructuredDataset, FlyteSchema]
if cls in FLYTE_TYPES:
return cls
FLYTE_TYPES = [FlyteFile, FlyteDirectory, StructuredDataset, FlyteSchema]
if cls in FLYTE_TYPES: return cls

Code Review Run #e17dbe


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

Comment on lines +776 to +778
FLYTE_TYPES = [FlyteFile, FlyteDirectory, StructuredDataset, FlyteSchema]
if cls in FLYTE_TYPES:
return cls
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider using immutable type for constants

Consider using a tuple or frozenset instead of list for FLYTE_TYPES since it appears to be a constant collection that won't be modified. This provides better performance for lookups and makes the intent clearer.

Code suggestion
Check the AI-generated fix before applying
Suggested change
FLYTE_TYPES = [FlyteFile, FlyteDirectory, StructuredDataset, FlyteSchema]
if cls in FLYTE_TYPES:
return cls
FLYTE_TYPES = (FlyteFile, FlyteDirectory, StructuredDataset, FlyteSchema)
if cls in FLYTE_TYPES:
return cls

Code Review Run #e17dbe


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

Signed-off-by: Future-Outlier <[email protected]>
@flyte-bot
Copy link
Contributor

flyte-bot commented Feb 14, 2025

Code Review Agent Run #c4bd83

Actionable Suggestions - 1
  • flytekit/core/type_engine.py - 1
Review Details
  • Files reviewed - 1 · Commit Range: 1625381..d9d438e
    • flytekit/core/type_engine.py
  • Files skipped - 0
  • Tools
    • Whispers (Secret Scanner) - ✔︎ Successful
    • Detect-secrets (Secret Scanner) - ✔︎ Successful
    • MyPy (Static Code Analysis) - ✔︎ Successful
    • Astral Ruff (Static Code Analysis) - ✔︎ Successful

AI Code Review powered by Bito Logo

@@ -702,7 +737,7 @@ def to_generic_literal(
encoder = self._json_encoder[python_type]
except KeyError:
encoder = JSONEncoder(python_type)
self._json_encoder[python_type] = encoder
self._json_encoder[new_python_type] = encoder
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential encoder type mismatch issue

Consider using python_type instead of new_python_type when storing the encoder in self._json_encoder. The encoder is created using python_type but stored with new_python_type, which could lead to inconsistencies in encoder lookup.

Code suggestion
Check the AI-generated fix before applying
Suggested change
self._json_encoder[new_python_type] = encoder
self._json_encoder[python_type] = encoder

Code Review Run #c4bd83


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

Signed-off-by: Future-Outlier <[email protected]>
@flyte-bot
Copy link
Contributor

flyte-bot commented Feb 17, 2025

Code Review Agent Run #8760de

Actionable Suggestions - 0
Additional Suggestions - 10
  • flytekit/core/python_auto_container.py - 1
    • Consider adding shared memory param validation · Line 55-55
  • flytekit/core/python_function_task.py - 1
    • Consider using context for signal handlers · Line 631-634
  • tests/flytekit/unit/utils/test_rate_limiter.py - 2
    • Consider consolidating duplicate rate limiter tests · Line 27-31
    • Consider adding edge case tests · Line 27-31
  • flytekit/configuration/plugin.py - 1
    • Consider default implementation for cache policies · Line 58-59
  • flytekit/remote/remote.py - 5
Review Details
  • Files reviewed - 65 · Commit Range: d9d438e..b5f2a6f
    • flytekit/__init__.py
    • flytekit/clients/auth_helper.py
    • flytekit/clients/grpc_utils/auth_interceptor.py
    • flytekit/clis/sdk_in_container/init.py
    • flytekit/clis/sdk_in_container/run.py
    • flytekit/configuration/plugin.py
    • flytekit/core/cache.py
    • flytekit/core/constants.py
    • flytekit/core/node.py
    • flytekit/core/promise.py
    • flytekit/core/python_auto_container.py
    • flytekit/core/python_function_task.py
    • flytekit/core/resources.py
    • flytekit/core/task.py
    • flytekit/core/type_engine.py
    • flytekit/core/type_match_checking.py
    • flytekit/core/worker_queue.py
    • flytekit/core/workflow.py
    • flytekit/image_spec/default_builder.py
    • flytekit/image_spec/image_spec.py
    • flytekit/interaction/click_types.py
    • flytekit/interaction/string_literals.py
    • flytekit/loggers.py
    • flytekit/models/core/workflow.py
    • flytekit/models/execution.py
    • flytekit/models/security.py
    • flytekit/models/task.py
    • flytekit/remote/data.py
    • flytekit/remote/remote.py
    • flytekit/utils/rate_limiter.py
    • plugins/flytekit-aws-sagemaker/tests/test_boto3_agent.py
    • plugins/flytekit-onnx-pytorch/dev-requirements.txt
    • plugins/flytekit-openai/tests/openai_batch/test_agent.py
    • plugins/flytekit-pandera/flytekitplugins/pandera/pandas_transformer.py
    • plugins/flytekit-pandera/setup.py
    • pydoclint-errors-baseline.txt
    • pyproject.toml
    • tests/flytekit/integration/remote/test_remote.py
    • tests/flytekit/integration/remote/workflows/basic/dataclass_wf.py
    • tests/flytekit/unit/bin/test_python_entrypoint.py
    • tests/flytekit/unit/cli/pyflyte/test_run.py
    • tests/flytekit/unit/cli/pyflyte/test_run_lps.py
    • tests/flytekit/unit/clients/auth/test_keyring_store.py
    • tests/flytekit/unit/clients/test_auth_helper.py
    • tests/flytekit/unit/clients/test_friendly.py
    • tests/flytekit/unit/clients/test_raw.py
    • tests/flytekit/unit/core/image_spec/test_default_builder.py
    • tests/flytekit/unit/core/image_spec/test_image_spec.py
    • tests/flytekit/unit/core/test_annotated_bindings.py
    • tests/flytekit/unit/core/test_array_node_map_task.py
    • tests/flytekit/unit/core/test_cache.py
    • tests/flytekit/unit/core/test_generice_idl_type_engine.py
    • tests/flytekit/unit/core/test_node_creation.py
    • tests/flytekit/unit/core/test_resources.py
    • tests/flytekit/unit/core/test_type_engine.py
    • tests/flytekit/unit/core/test_type_match_checking.py
    • tests/flytekit/unit/core/test_worker_queue.py
    • tests/flytekit/unit/interaction/test_click_types.py
    • tests/flytekit/unit/interaction/test_string_literals.py
    • tests/flytekit/unit/models/core/test_security.py
    • tests/flytekit/unit/models/core/test_workflow.py
    • tests/flytekit/unit/models/test_execution.py
    • tests/flytekit/unit/models/test_tasks.py
    • tests/flytekit/unit/types/structured_dataset/test_structured_dataset.py
    • tests/flytekit/unit/utils/test_rate_limiter.py
  • Files skipped - 1
    • .gitignore - Reason: Filter setting
  • Tools
    • Whispers (Secret Scanner) - ✔︎ Successful
    • Detect-secrets (Secret Scanner) - ✔︎ Successful
    • MyPy (Static Code Analysis) - ✔︎ Successful
    • Astral Ruff (Static Code Analysis) - ✔︎ Successful

AI Code Review powered by Bito Logo

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants