-
Notifications
You must be signed in to change notification settings - Fork 310
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Return StructuredDataset which is a field in a dataclass #3071
base: master
Are you sure you want to change the base?
Conversation
Signed-off-by: Nelson Chen <[email protected]>
Signed-off-by: Nelson Chen <[email protected]>
Signed-off-by: Nelson Chen <[email protected]>
Code Review Agent Run #63793cActionable Suggestions - 2
Review Details
|
Changelist by BitoThis pull request implements the following key changes.
|
if isinstance(python_val._literal_sd, StructuredDataset): | ||
sdt = StructuredDatasetType(format=python_val._literal_sd.file_format) | ||
metad = literals.StructuredDatasetMetadata(structured_dataset_type=sdt) | ||
sd_literal = literals.StructuredDataset(uri=python_val._literal_sd.uri, metadata=metad) | ||
return Literal(scalar=Scalar(structured_dataset=sd_literal)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Accessing private member '_literal_sd'. Consider using a public interface or property to access this data.
Code suggestion
Check the AI-generated fix before applying
- if literal_type.structured_dataset_type is not None and self._literal_sd is not None:
- return self._literal_sd
- if literal_type.structured_dataset_type is not None and self._literal_sd is None:
+ if literal_type.structured_dataset_type is not None and self.literal_sd is not None:
+ return self.literal_sd
+ if literal_type.structured_dataset_type is not None and self.literal_sd is None:
Code Review Run #63793c
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
if isinstance(python_val._literal_sd, StructuredDataset): | ||
sdt = StructuredDatasetType(format=python_val._literal_sd.file_format) | ||
metad = literals.StructuredDatasetMetadata(structured_dataset_type=sdt) | ||
sd_literal = literals.StructuredDataset(uri=python_val._literal_sd.uri, metadata=metad) | ||
return Literal(scalar=Scalar(structured_dataset=sd_literal)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code block for handling StructuredDataset
passed through dataclass could be simplified by extracting the literal creation logic into a helper method. This would improve code readability and maintainability.
Code suggestion
Check the AI-generated fix before applying
if isinstance(python_val._literal_sd, StructuredDataset): | |
sdt = StructuredDatasetType(format=python_val._literal_sd.file_format) | |
metad = literals.StructuredDatasetMetadata(structured_dataset_type=sdt) | |
sd_literal = literals.StructuredDataset(uri=python_val._literal_sd.uri, metadata=metad) | |
return Literal(scalar=Scalar(structured_dataset=sd_literal)) | |
if isinstance(python_val._literal_sd, StructuredDataset): | |
return self._create_structured_dataset_literal(python_val._literal_sd.uri, python_val._literal_sd.file_format) | |
def _create_structured_dataset_literal(self, uri: str, file_format: str) -> Literal: | |
sdt = StructuredDatasetType(format=file_format) | |
metad = literals.StructuredDatasetMetadata(structured_dataset_type=sdt) | |
sd_literal = literals.StructuredDataset(uri=uri, metadata=metad) | |
return Literal(scalar=Scalar(structured_dataset=sd_literal)) |
Code Review Run #63793c
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it looks correct, can you provide
- screenshot
- add an example to integration test to test it properlly?
test_remote.py
Signed-off-by: Nelson Chen <[email protected]>
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #3071 +/- ##
==========================================
+ Coverage 78.20% 79.58% +1.38%
==========================================
Files 292 203 -89
Lines 25401 21599 -3802
Branches 2779 2780 +1
==========================================
- Hits 19864 17190 -2674
+ Misses 4726 3633 -1093
+ Partials 811 776 -35 ☔ View full report in Codecov by Sentry. |
Signed-off-by: Nelson Chen <[email protected]>
Signed-off-by: Nelson Chen <[email protected]>
Code Review Agent Run #d93af6Actionable Suggestions - 1
Additional Suggestions - 10
Review Details
|
@task | ||
def read_sd(dc: DC) -> StructuredDataset: | ||
"""Read input StructuredDataset.""" | ||
print("sd:", dc.sd.open(pd.DataFrame).all()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider adding error handling around open()
and all()
calls to handle potential exceptions when accessing the structured dataset.
Code suggestion
Check the AI-generated fix before applying
print("sd:", dc.sd.open(pd.DataFrame).all()) | |
try: | |
df = dc.sd.open(pd.DataFrame).all() | |
print("sd:", df) | |
except Exception as e: | |
print(f"Error accessing structured dataset: {e}") | |
raise |
Code Review Run #d93af6
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
Signed-off-by: Nelson Chen <[email protected]>
Code Review Agent Run #0d1278Actionable Suggestions - 0Additional Suggestions - 10
Review Details
|
Signed-off-by: Nelson Chen <[email protected]>
Code Review Agent Run #7e2d92Actionable Suggestions - 3
Review Details
|
file_transfer = SimpleFileTransfer() | ||
remote_file_path = file_transfer.upload_file(file_type="parquet") | ||
|
||
execution_id = run("attr_access_dc_sd.py", "wf", "--uri", remote_file_path) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider adding validation for the remote_file_path
parameter before passing it to the run()
function. The URI parameter should be validated to ensure it's a valid S3/Minio path.
Code suggestion
Check the AI-generated fix before applying
@@ -893,4 +893,7 @@
file_transfer = SimpleFileTransfer()
remote_file_path = file_transfer.upload_file(file_type="parquet")
+ if not remote_file_path or not remote_file_path.startswith('s3://') or not remote_file_path.endswith('.parquet'):
+ raise ValueError(f'Invalid remote file path: {remote_file_path}')
+
execution_id = run("attr_access_dc_sd.py", "wf", "--uri", remote_file_path)
Code Review Run #7e2d92
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
Returns: | ||
dc: A dataclass with a StructuredDataset attribute. | ||
""" | ||
dc = DC(sd=StructuredDataset(uri=uri, file_format="parquet")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider adding validation for the file_format
parameter. The code assumes parquet
format but it may be worth checking if this format is supported by checking against DEFAULT_FORMATS
or ENCODERS
dictionaries.
Code suggestion
Check the AI-generated fix before applying
-def create_dc(uri: str) -> DC:
- dc = DC(sd=StructuredDataset(uri=uri, file_format="parquet"))
- return dc
+def create_dc(uri: str) -> DC:
+ file_format = "parquet"
+ if file_format not in DEFAULT_FORMATS.values() and file_format not in ENCODERS:
+ raise ValueError(f"File format {file_format} is not supported")
+ dc = DC(sd=StructuredDataset(uri=uri, file_format=file_format))
+ return dc
Code Review Run #7e2d92
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
|
||
|
||
if __name__ == "__main__": | ||
wf(uri="tests/flytekit/integration/remote/workflows/basic/data/df.parquet") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider making the parquet file path configurable through environment variables or configuration files instead of hardcoding it. This would make the code more flexible and easier to maintain across different environments.
Code suggestion
Check the AI-generated fix before applying
wf(uri="tests/flytekit/integration/remote/workflows/basic/data/df.parquet") | |
import os | |
default_path = "tests/flytekit/integration/remote/workflows/basic/data/df.parquet" | |
wf(uri=os.getenv("PARQUET_FILE_PATH", default_path)) |
Code Review Run #7e2d92
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
Signed-off-by: Nelson Chen <[email protected]>
Code Review Agent Run #d69fbdActionable Suggestions - 1
Review Details
|
@workflow | ||
def wf(uri: str) -> None: | ||
dc = create_dc(uri=uri) | ||
read_sd(dc=dc) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider storing the return value of read_sd()
since it returns a StructuredDataset
. The returned value might be needed later in the workflow.
Code suggestion
Check the AI-generated fix before applying
read_sd(dc=dc) | |
sd = read_sd(dc=dc) |
Code Review Run #d69fbd
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
Signed-off-by: Nelson Chen <[email protected]>
Code Review Agent Run #b95e33Actionable Suggestions - 0Additional Suggestions - 10
Review Details
|
@Future-Outlier |
Code Review Agent Run #425bc0Actionable Suggestions - 0Additional Suggestions - 6
Review Details
|
Tracking issue
Related to #6117
Why are the changes needed?
If we wrap the StructuredDataset in a dataclass, it will fail during the to_flyte_idl conversion.
What changes were proposed in this pull request?
Before returning
Literals
, we check the type ofpython_val._literal_sd
. If it is a Python nativeStructuredDataset
, we transform it into aLiterals.StructuredDataset
.How was this patch tested?
As described in #6117, an error occurs when the
extract
task is executed.Setup process
Screenshots
Check all the applicable boxes
Summary by Bito
This PR implements comprehensive improvements to Flytekit's core functionality, including StructuredDataset handling, caching system enhancements, and a new rate limiting system. The changes transition from in-memory DataFrame to file-based approach, integrate Kubernetes StatefulSet Data Service and Ray plugins, while improving dictionary and annotated type handling. Key improvements include enhanced error handling, better array node handling in remote execution, improved workflow execution tracking, and strengthened input validation across multiple components.Unit tests added: True
Estimated effort to review (1-5, lower is better): 5