Skip to content

Commit 5ddd32c

Browse files
authored
ci: upgrade Ray to 2.6 and fix security dependabots (#2403)
* deps: upgrade Ray to 2.6 and fix security dependabots Signed-off-by: Abdel Jaidi <[email protected]> * fix pydantic Signed-off-by: Abdel Jaidi <[email protected]> * tests: limit cpu and memory for each ray process Signed-off-by: Abdel Jaidi <[email protected]> * [skip ci] revert env variables Signed-off-by: Abdel Jaidi <[email protected]> * fix: mark failing Ray 2.6 tests Signed-off-by: Abdel Jaidi <[email protected]> * fix: amend test and increase cpu count Signed-off-by: Abdel Jaidi <[email protected]> * fix: adjust cpu count Signed-off-by: Abdel Jaidi <[email protected]> --------- Signed-off-by: Abdel Jaidi <[email protected]>
1 parent 31c4bd0 commit 5ddd32c

13 files changed

+337
-267
lines changed

awswrangler/distributed/ray/_executor.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ def map(self, func: Callable[..., MapOutputType], _: Optional["BaseClient"], *ar
2626
return list(func(*arg) for arg in zip(itertools.repeat(None), *args))
2727

2828

29-
@ray.remote # type: ignore[attr-defined]
29+
@ray.remote
3030
class AsyncActor:
3131
async def run_concurrent(self, func: Callable[..., MapOutputType], *args: Any) -> MapOutputType:
3232
return func(*args)

awswrangler/distributed/ray/_utils.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@ def _estimate_avail_cpus(cur_pg: Optional[PlacementGroup]) -> int:
1818
Args:
1919
cur_pg: The current placement group, if any.
2020
"""
21-
cluster_cpus = int(ray.cluster_resources().get("CPU", 1)) # type: ignore[attr-defined]
22-
cluster_gpus = int(ray.cluster_resources().get("GPU", 0)) # type: ignore[attr-defined]
21+
cluster_cpus = int(ray.cluster_resources().get("CPU", 1))
22+
cluster_gpus = int(ray.cluster_resources().get("GPU", 0))
2323

2424
# If we're in a placement group, we shouldn't assume the entire cluster's
2525
# resources are available for us to use. Estimate an upper bound on what's

awswrangler/distributed/ray/datasources/arrow_parquet_datasource.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,7 @@ def __init__(
209209
import ray
210210
from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy
211211

212-
self._local_scheduling = NodeAffinitySchedulingStrategy(ray.get_runtime_context().get_node_id(), soft=False) # type: ignore[attr-defined]
212+
self._local_scheduling = NodeAffinitySchedulingStrategy(ray.get_runtime_context().get_node_id(), soft=False)
213213

214214
dataset_kwargs = reader_args.pop("dataset_kwargs", {})
215215
try:
@@ -225,7 +225,7 @@ def __init__(
225225
# Try to infer dataset schema by passing dummy table through UDF.
226226
dummy_table = schema.empty_table()
227227
try:
228-
inferred_schema = _block_udf(dummy_table).schema # type: ignore[union-attr]
228+
inferred_schema = _block_udf(dummy_table).schema
229229
inferred_schema = inferred_schema.with_metadata(schema.metadata)
230230
except Exception: # pylint: disable=broad-except
231231
_logger.debug(

awswrangler/s3/_read_deltalake.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ def _set_default_storage_options_kwargs(
3333
@_utils.check_optional_dependency(deltalake, "deltalake")
3434
@apply_configs
3535
def read_deltalake(
36-
path: Optional[str] = None,
36+
path: str,
3737
version: Optional[int] = None,
3838
partitions: Optional[List[Tuple[str, str, Any]]] = None,
3939
columns: Optional[List[str]] = None,
@@ -54,7 +54,7 @@ def read_deltalake(
5454
5555
Parameters
5656
----------
57-
path: Optional[str]
57+
path: str
5858
The path of the DeltaTable.
5959
version: Optional[int]
6060
The version of the DeltaTable.

poetry.lock

+269-238
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

+4-4
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ python = ">=3.8, <4.0"
3030
# Required
3131
boto3 = "^1.20.32"
3232
botocore = "^1.23.32"
33-
pandas = ">=1.2.0,!=1.5.0,<3.0.0" # Exclusion per: https://github.com/aws/aws-sdk-pandas/issues/1678
33+
pandas = ">=1.2.0,<3.0.0"
3434
numpy = "^1.18"
3535
pyarrow = ">=7.0.0"
3636
typing-extensions = "^4.4.0"
@@ -56,11 +56,11 @@ jsonpath-ng = { version = "^1.5.3", optional = true }
5656
# Other
5757
openpyxl = { version = "^3.0.0", optional = true }
5858
progressbar2 = { version = "^4.0.0", optional = true }
59-
deltalake = { version = ">=0.6.4,<0.10.0", optional = true }
59+
deltalake = { version = ">=0.6.4,<0.11.0", optional = true }
6060

6161
# Distributed
62-
modin = { version = "^0.22.2", optional = true }
63-
ray = { version = ">=2.0.0,<2.6.0", extras = ["default", "data"], optional = true }
62+
modin = { version = "^0.23.0", optional = true }
63+
ray = { version = ">=2.0.0,<2.7.0", extras = ["default", "data"], optional = true }
6464

6565
[tool.poetry.extras]
6666
redshift = ["redshift-connector"]

tests/unit/test_athena.py

+17-10
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,7 @@ def test_athena_ctas(path, path2, path3, glue_table, glue_table2, glue_database,
136136
assert len(wr.s3.list_objects(path=path3)) == 0
137137

138138

139+
@pytest.mark.modin_index
139140
def test_athena_read_sql_ctas_bucketing(path, path2, glue_table, glue_table2, glue_database, glue_ctas_database):
140141
df = pd.DataFrame({"c0": [0, 1], "c1": ["foo", "bar"]})
141142
wr.s3.to_parquet(
@@ -155,12 +156,14 @@ def test_athena_read_sql_ctas_bucketing(path, path2, glue_table, glue_table2, gl
155156
bucketing_info=(["c0"], 1),
156157
),
157158
s3_output=path2,
159+
pyarrow_additional_kwargs={"ignore_metadata": True},
158160
)
159161
df_no_ctas = wr.athena.read_sql_query(
160162
sql=f"SELECT * FROM {glue_table}",
161163
ctas_approach=False,
162164
database=glue_database,
163165
s3_output=path2,
166+
pyarrow_additional_kwargs={"ignore_metadata": True},
164167
)
165168
assert df_ctas.equals(df_no_ctas)
166169

@@ -855,6 +858,7 @@ def test_bucketing_catalog_parquet_table(path, glue_database, glue_table):
855858
assert table["StorageDescriptor"]["BucketColumns"] == bucket_cols
856859

857860

861+
@pytest.mark.modin_index
858862
@pytest.mark.parametrize("bucketing_data", [[0, 1, 2], [False, True, False], ["b", "c", "d"]])
859863
@pytest.mark.parametrize(
860864
"dtype",
@@ -907,12 +911,12 @@ def test_bucketing_parquet_dataset(path, glue_database, glue_table, bucketing_da
907911
if isinstance(bucketing_data[0], str):
908912
dtype = pd.StringDtype()
909913

910-
first_bucket_df = wr.s3.read_parquet(path=[r["paths"][0]])
914+
first_bucket_df = wr.s3.read_parquet(path=[r["paths"][0]], pyarrow_additional_kwargs={"ignore_metadata": True})
911915
assert len(first_bucket_df) == 2
912916
assert pandas_equals(pd.Series([bucketing_data[0], bucketing_data[2]], dtype=dtype), first_bucket_df["c0"])
913917
assert pandas_equals(pd.Series(["foo", "baz"], dtype=pd.StringDtype()), first_bucket_df["c1"])
914918

915-
second_bucket_df = wr.s3.read_parquet(path=[r["paths"][1]])
919+
second_bucket_df = wr.s3.read_parquet(path=[r["paths"][1]], pyarrow_additional_kwargs={"ignore_metadata": True})
916920
assert len(second_bucket_df) == 1
917921
assert pandas_equals(pd.Series([bucketing_data[1]], dtype=dtype), second_bucket_df["c0"])
918922
assert pandas_equals(pd.Series(["bar"], dtype=pd.StringDtype()), second_bucket_df["c1"])
@@ -943,6 +947,7 @@ def test_bucketing_catalog_csv_table(path, glue_database, glue_table):
943947
assert table["StorageDescriptor"]["BucketColumns"] == bucket_cols
944948

945949

950+
@pytest.mark.modin_index
946951
@pytest.mark.parametrize("bucketing_data", [[0, 1, 2], [False, True, False], ["b", "c", "d"]])
947952
@pytest.mark.parametrize(
948953
"dtype",
@@ -988,12 +993,12 @@ def test_bucketing_csv_dataset(path, glue_database, glue_table, bucketing_data,
988993
assert r["paths"][0].endswith("bucket-00000.csv")
989994
assert r["paths"][1].endswith("bucket-00001.csv")
990995

991-
first_bucket_df = wr.s3.read_csv(path=[r["paths"][0]], header=None, names=["c0", "c1"])
996+
first_bucket_df = wr.s3.read_csv(path=[r["paths"][0]], header=None, names=["c0", "c1"]).reset_index(drop=True)
992997
assert len(first_bucket_df) == 2
993998
assert pandas_equals(pd.Series([bucketing_data[0], bucketing_data[2]]), first_bucket_df["c0"])
994999
assert pandas_equals(pd.Series(["foo", "baz"]), first_bucket_df["c1"])
9951000

996-
second_bucket_df = wr.s3.read_csv(path=[r["paths"][1]], header=None, names=["c0", "c1"])
1001+
second_bucket_df = wr.s3.read_csv(path=[r["paths"][1]], header=None, names=["c0", "c1"]).reset_index(drop=True)
9971002
assert len(second_bucket_df) == 1
9981003
assert pandas_equals(pd.Series([bucketing_data[1]]), second_bucket_df["c0"])
9991004
assert pandas_equals(pd.Series(["bar"]), second_bucket_df["c1"])
@@ -1008,6 +1013,7 @@ def test_bucketing_csv_dataset(path, glue_database, glue_table, bucketing_data,
10081013
assert all(x in bucketing_data for x in loaded_df["c0"].to_list())
10091014

10101015

1016+
@pytest.mark.modin_index
10111017
@pytest.mark.parametrize("bucketing_data", [[0, 1, 2, 3], [False, True, False, True], ["b", "c", "d", "e"]])
10121018
def test_combined_bucketing_partitioning_parquet_dataset(path, glue_database, glue_table, bucketing_data):
10131019
nb_of_buckets = 2
@@ -1045,22 +1051,22 @@ def test_combined_bucketing_partitioning_parquet_dataset(path, glue_database, gl
10451051
if isinstance(bucketing_data[0], str):
10461052
dtype = pd.StringDtype()
10471053

1048-
bucket_df = wr.s3.read_parquet(path=[r["paths"][0]])
1054+
bucket_df = wr.s3.read_parquet(path=[r["paths"][0]], pyarrow_additional_kwargs={"ignore_metadata": True})
10491055
assert len(bucket_df) == 1
10501056
assert pandas_equals(pd.Series([bucketing_data[0]], dtype=dtype), bucket_df["c0"])
10511057
assert pandas_equals(pd.Series(["foo"], dtype=pd.StringDtype()), bucket_df["c1"])
10521058

1053-
bucket_df = wr.s3.read_parquet(path=[r["paths"][1]])
1059+
bucket_df = wr.s3.read_parquet(path=[r["paths"][1]], pyarrow_additional_kwargs={"ignore_metadata": True})
10541060
assert len(bucket_df) == 1
10551061
assert pandas_equals(pd.Series([bucketing_data[1]], dtype=dtype), bucket_df["c0"])
10561062
assert pandas_equals(pd.Series(["bar"], dtype=pd.StringDtype()), bucket_df["c1"])
10571063

1058-
bucket_df = wr.s3.read_parquet(path=[r["paths"][2]])
1064+
bucket_df = wr.s3.read_parquet(path=[r["paths"][2]], pyarrow_additional_kwargs={"ignore_metadata": True})
10591065
assert len(bucket_df) == 1
10601066
assert pandas_equals(pd.Series([bucketing_data[2]], dtype=dtype), bucket_df["c0"])
10611067
assert pandas_equals(pd.Series(["baz"], dtype=pd.StringDtype()), bucket_df["c1"])
10621068

1063-
bucket_df = wr.s3.read_parquet(path=[r["paths"][3]])
1069+
bucket_df = wr.s3.read_parquet(path=[r["paths"][3]], pyarrow_additional_kwargs={"ignore_metadata": True})
10641070
assert len(bucket_df) == 1
10651071
assert pandas_equals(pd.Series([bucketing_data[3]], dtype=dtype), bucket_df["c0"])
10661072
assert pandas_equals(pd.Series(["boo"], dtype=pd.StringDtype()), bucket_df["c1"])
@@ -1135,6 +1141,7 @@ def test_combined_bucketing_partitioning_csv_dataset(path, glue_database, glue_t
11351141
assert all(x in bucketing_data for x in loaded_df["c0"].to_list())
11361142

11371143

1144+
@pytest.mark.modin_index
11381145
def test_multiple_bucketing_columns_parquet_dataset(path, glue_database, glue_table):
11391146
nb_of_buckets = 2
11401147
df = pd.DataFrame({"c0": [0, 1, 2, 3], "c1": [4, 6, 5, 7], "c2": ["foo", "bar", "baz", "boo"]})
@@ -1152,13 +1159,13 @@ def test_multiple_bucketing_columns_parquet_dataset(path, glue_database, glue_ta
11521159
assert r["paths"][0].endswith("bucket-00000.snappy.parquet")
11531160
assert r["paths"][1].endswith("bucket-00001.snappy.parquet")
11541161

1155-
first_bucket_df = wr.s3.read_parquet(path=[r["paths"][0]])
1162+
first_bucket_df = wr.s3.read_parquet(path=[r["paths"][0]], pyarrow_additional_kwargs={"ignore_metadata": True})
11561163
assert len(first_bucket_df) == 2
11571164
assert pandas_equals(pd.Series([0, 3], dtype=pd.Int64Dtype()), first_bucket_df["c0"])
11581165
assert pandas_equals(pd.Series([4, 7], dtype=pd.Int64Dtype()), first_bucket_df["c1"])
11591166
assert pandas_equals(pd.Series(["foo", "boo"], dtype=pd.StringDtype()), first_bucket_df["c2"])
11601167

1161-
second_bucket_df = wr.s3.read_parquet(path=[r["paths"][1]])
1168+
second_bucket_df = wr.s3.read_parquet(path=[r["paths"][1]], pyarrow_additional_kwargs={"ignore_metadata": True})
11621169
assert len(second_bucket_df) == 2
11631170
assert pandas_equals(pd.Series([1, 2], dtype=pd.Int64Dtype()), second_bucket_df["c0"])
11641171
assert pandas_equals(pd.Series([6, 5], dtype=pd.Int64Dtype()), second_bucket_df["c1"])

tests/unit/test_athena_csv.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -372,6 +372,7 @@ def test_athena_csv_types(path, glue_database, glue_table):
372372
ensure_data_types_csv(df2)
373373

374374

375+
@pytest.mark.modin_index
375376
@pytest.mark.parametrize("use_threads", [True, False])
376377
@pytest.mark.parametrize("ctas_approach", [True, False])
377378
@pytest.mark.parametrize("line_count", [1, 2])
@@ -388,7 +389,7 @@ def test_skip_header(path, glue_database, glue_table, use_threads, ctas_approach
388389
skip_header_line_count=line_count,
389390
)
390391
df2 = wr.athena.read_sql_table(glue_table, glue_database, use_threads=use_threads, ctas_approach=ctas_approach)
391-
assert df.iloc[line_count - 1 :].reset_index(drop=True).equals(df2)
392+
assert df.iloc[line_count - 1 :].reset_index(drop=True).equals(df2.reset_index(drop=True))
392393

393394

394395
@pytest.mark.parametrize("use_threads", [True, False])

tests/unit/test_athena_parquet.py

+8-2
Original file line numberDiff line numberDiff line change
@@ -580,6 +580,7 @@ def test_schema_evolution_disabled(path, glue_table, glue_database):
580580
assert df2.c0.sum() == 3
581581

582582

583+
@pytest.mark.modin_index
583584
def test_date_cast(path, glue_table, glue_database):
584585
df = pd.DataFrame(
585586
{
@@ -614,9 +615,14 @@ def test_date_cast(path, glue_table, glue_database):
614615
}
615616
)
616617
wr.s3.to_parquet(df=df, path=path, dataset=True, database=glue_database, table=glue_table, dtype={"c0": "date"})
617-
df2 = wr.s3.read_parquet(path=path)
618+
df2 = wr.s3.read_parquet(path=path, pyarrow_additional_kwargs={"ignore_metadata": True})
618619
assert pandas_equals(df_expected, df2)
619-
df3 = wr.athena.read_sql_table(database=glue_database, table=glue_table, ctas_approach=False)
620+
df3 = wr.athena.read_sql_table(
621+
database=glue_database,
622+
table=glue_table,
623+
ctas_approach=False,
624+
pyarrow_additional_kwargs={"ignore_metadata": True},
625+
)
620626
assert pandas_equals(df_expected, df3)
621627

622628

tests/unit/test_s3_parquet.py

+16-4
Original file line numberDiff line numberDiff line change
@@ -433,6 +433,12 @@ def test_range_index_recovery_simple(path, use_threads):
433433
assert_pandas_equals(df.reset_index(level=0), df2.reset_index(level=0))
434434

435435

436+
@pytest.mark.modin_index
437+
@pytest.mark.xfail(
438+
raises=AssertionError,
439+
reason="https://github.com/ray-project/ray/issues/37771",
440+
condition=is_ray_modin,
441+
)
436442
@pytest.mark.parametrize("use_threads", [True, False, 2])
437443
@pytest.mark.parametrize("name", [None, "foo"])
438444
def test_range_index_recovery_pandas(path, use_threads, name):
@@ -478,8 +484,9 @@ def test_multi_index_recovery_nameless(path, use_threads):
478484
assert_pandas_equals(df.reset_index(), df2.reset_index())
479485

480486

487+
@pytest.mark.modin_index
481488
@pytest.mark.xfail(
482-
raises=wr.exceptions.InvalidArgumentCombination,
489+
raises=(wr.exceptions.InvalidArgumentCombination, AssertionError),
483490
reason="Named index not working when partitioning to a single file",
484491
condition=is_ray_modin,
485492
)
@@ -544,23 +551,27 @@ def test_to_parquet_dataset_sanitize(path):
544551
assert df2.par.to_list() == ["a", "b"]
545552

546553

554+
@pytest.mark.modin_index
547555
@pytest.mark.parametrize("use_threads", [False, True, 2])
548556
def test_timezone_file(path, use_threads):
549557
file_path = f"{path}0.parquet"
550558
df = pd.DataFrame({"c0": [datetime.utcnow(), datetime.utcnow()]})
551559
df["c0"] = pd.DatetimeIndex(df.c0).tz_localize(tz="US/Eastern")
552560
df.to_parquet(file_path)
553-
df2 = wr.s3.read_parquet(path, use_threads=use_threads)
561+
df2 = wr.s3.read_parquet(path, use_threads=use_threads, pyarrow_additional_kwargs={"ignore_metadata": True})
554562
assert_pandas_equals(df, df2)
555563

556564

565+
@pytest.mark.modin_index
557566
@pytest.mark.parametrize("use_threads", [True, False, 2])
558567
def test_timezone_file_columns(path, use_threads):
559568
file_path = f"{path}0.parquet"
560569
df = pd.DataFrame({"c0": [datetime.utcnow(), datetime.utcnow()], "c1": [1.1, 2.2]})
561570
df["c0"] = pd.DatetimeIndex(df.c0).tz_localize(tz="US/Eastern")
562571
df.to_parquet(file_path)
563-
df2 = wr.s3.read_parquet(path, columns=["c1"], use_threads=use_threads)
572+
df2 = wr.s3.read_parquet(
573+
path, columns=["c1"], use_threads=use_threads, pyarrow_additional_kwargs={"ignore_metadata": True}
574+
)
564575
assert_pandas_equals(df[["c1"]], df2)
565576

566577

@@ -620,12 +631,13 @@ def test_mixed_types_column(path) -> None:
620631
wr.s3.to_parquet(df, path, dataset=True, partition_cols=["par"])
621632

622633

634+
@pytest.mark.modin_index
623635
@pytest.mark.parametrize("compression", [None, "snappy", "gzip", "zstd"])
624636
def test_parquet_compression(path, compression) -> None:
625637
df = pd.DataFrame({"id": [1, 2, 3]}, dtype="Int64")
626638
path_file = f"{path}0.parquet"
627639
wr.s3.to_parquet(df=df, path=path_file, compression=compression)
628-
df2 = wr.s3.read_parquet([path_file])
640+
df2 = wr.s3.read_parquet([path_file], pyarrow_additional_kwargs={"ignore_metadata": True})
629641
assert_pandas_equals(df, df2)
630642

631643

tests/unit/test_s3_select.py

+5
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,11 @@ def test_full_table(path, use_threads):
6767
assert df.shape == df4.shape
6868

6969

70+
@pytest.mark.xfail(
71+
raises=AssertionError,
72+
reason="https://github.com/ray-project/ray/issues/37928",
73+
condition=is_ray_modin,
74+
)
7075
@pytest.mark.parametrize("use_threads", [True, False, 2])
7176
def test_push_down(path, use_threads):
7277
df = pd.DataFrame({"c0": [1, 2, 3], "c1": ["foo", "boo", "bar"], "c2": [4.0, 5.0, 6.0]})

tests/unit/test_s3_text.py

+8-1
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,12 @@ def test_csv_dataset_header_modes(path, mode, glue_database, glue_table):
188188
assert df_res.equals(dfs[-1])
189189

190190

191+
@pytest.mark.modin_index
192+
@pytest.mark.xfail(
193+
raises=AssertionError,
194+
reason="https://github.com/ray-project/ray/issues/37771",
195+
condition=is_ray_modin,
196+
)
191197
def test_json(path):
192198
df0 = pd.DataFrame({"id": [1, 2, 3]})
193199
path0 = f"{path}test_json0.json"
@@ -354,6 +360,7 @@ def test_csv_line_terminator(path, line_terminator):
354360
assert df.equals(df2)
355361

356362

363+
@pytest.mark.modin_index
357364
def test_read_json_versioned(path) -> None:
358365
path_file = f"{path}0.json"
359366
dfs = [
@@ -368,7 +375,7 @@ def test_read_json_versioned(path) -> None:
368375
version_ids.append(version_id)
369376

370377
for df, version_id in zip(dfs, version_ids):
371-
df_temp = wr.s3.read_json(path_file, version_id=version_id)
378+
df_temp = wr.s3.read_json(path_file, version_id=version_id).reset_index(drop=True)
372379
assert df_temp.equals(df)
373380
assert version_id == wr.s3.describe_objects(path=path_file, version_id=version_id)[path_file]["VersionId"]
374381

tox.ini

+1
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ passenv =
2929
AWS_SESSION_TOKEN
3030
setenv =
3131
COV_FAIL_UNDER = 74.00
32+
WR_CPU_COUNT = 16
3233
allowlist_externals = poetry
3334
commands_pre =
3435
poetry install --no-root --sync --all-extras

0 commit comments

Comments
 (0)