diff --git a/CHANGES.rst b/CHANGES.rst index ab499ff8..fc63929b 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -12,8 +12,11 @@ Improvements Bug fixes ^^^^^^^^^ +* GH248 Fix an issue causing a ValueError to be raised when using `dask_index_on` on non-integer columns * GH255 Fix an issue causing the python interpreter to shut down when reading an empty file (see also https://issues.apache.org/jira/browse/ARROW-8142) +* GH259 Fix an issue where Timestamp with timezone produce Schema violation + Version 3.8.0 (2020-03-12) ========================== diff --git a/asv_bench/benchmarks/index.py b/asv_bench/benchmarks/index.py index 44b1eafd..7ee7b92c 100644 --- a/asv_bench/benchmarks/index.py +++ b/asv_bench/benchmarks/index.py @@ -80,6 +80,9 @@ def time_as_series_partitions_as_index( ): self.ktk_index.as_flat_series(partitions_as_index=True) + def time_observed_values(self, number_values, number_partitions, arrow_type): + self.ktk_index.observed_values() + class SerializeIndex(IndexBase): timeout = 180 diff --git a/kartothek/core/common_metadata.py b/kartothek/core/common_metadata.py index 28f3e3de..3ee8458b 100644 --- a/kartothek/core/common_metadata.py +++ b/kartothek/core/common_metadata.py @@ -34,7 +34,6 @@ def __init__(self, schema, origin: Union[str, Set[str]]): origin = copy(origin) if not all(isinstance(s, str) for s in origin): raise TypeError("Schema origin elements must be strings.") - self.__schema = schema self.__origin = origin self._schema_compat() @@ -78,6 +77,35 @@ def _schema_compat(self): schema = schema.remove_metadata() md = {b"pandas": _dict_to_binary(pandas_metadata)} + # https://github.com/JDASoftwareGroup/kartothek/issues/259 + + if schema is not None: + fields = [] + for f in schema: + if pa.types.is_timestamp(f.type): + if ARROW_LARGER_EQ_0150: + f = pa.field(f.name, pa.timestamp("us", tz=f.type.tz)) + else: + ## using pandas meta to pass timezone + # pandas_field = [field for field in pandas_metadata['columns'] if field['name']==f.name] + pandas_field = [ + col + for col in pandas_metadata["columns"] + if col["name"] == f.name + ] + if ( + len(pandas_field) == 1 + and len(pandas_field) == 1 + and pandas_field[0].get("metadata") + ): + tz = pandas_field[0].get("metadata", {}).get("timezone") + f = pa.field(f.name, pa.timestamp("us", tz=tz)) + else: + f = pa.field(f.name, pa.timestamp("us")) + fields.append(f) + + schema = pa.schema(fields) + if ARROW_LARGER_EQ_0150: schema = schema.with_metadata(md) else: @@ -380,24 +408,14 @@ def store_schema_metadata(schema, dataset_uuid, store, table): def _schema2bytes(schema): buf = pa.BufferOutputStream() - pq.write_metadata(schema, buf, version="2.0", coerce_timestamps="us") + pq.write_metadata(schema, buf, version="2.0") return buf.getvalue().to_pybytes() def _bytes2schema(data): reader = pa.BufferReader(data) schema = pq.read_schema(reader) - fields = [] - for idx in range(len(schema)): - f = schema[idx] - - # schema data recovered from parquet always contains timestamp data in us-granularity, but pandas will use - # ns-granularity, so we re-align the two different worlds here - if f.type == pa.timestamp("us"): - f = pa.field(f.name, pa.timestamp("ns")) - - fields.append(f) - return pa.schema(fields, schema.metadata) + return schema def _pandas_in_schemas(schemas): diff --git a/kartothek/core/index.py b/kartothek/core/index.py index bdb68472..a1b6f34a 100644 --- a/kartothek/core/index.py +++ b/kartothek/core/index.py @@ -132,15 +132,13 @@ def __repr__(self) -> str: class_=type(self).__name__, attrs=", ".join(repr_str) ) - def observed_values(self) -> np.array: + def observed_values(self, date_as_object=True) -> np.array: """ Return an array of all observed values """ - return np.fromiter( - (self.normalize_value(self.dtype, x) for x in self.index_dct.keys()), - count=len(self.index_dct), - dtype=self.dtype.to_pandas_dtype(), - ) + keys = np.array(list(self.index_dct.keys())) + labeled_array = pa.array(keys, type=self.dtype) + return np.array(labeled_array.to_pandas(date_as_object=date_as_object)) @staticmethod def normalize_value(dtype: pa.DataType, value: Any) -> Any: diff --git a/kartothek/core/testing.py b/kartothek/core/testing.py index 6d32edfd..65b57305 100644 --- a/kartothek/core/testing.py +++ b/kartothek/core/testing.py @@ -60,6 +60,12 @@ def get_dataframe_not_nested(): "float64": pd.Series([1.0], dtype=np.float64), "date": pd.Series([date(2018, 1, 1)], dtype=object), "datetime64": pd.Series(["2018-01-01"], dtype="datetime64[ns]"), + "datetime64_tz": pd.Series( + pd.date_range("2012-1-1 1:30", periods=1, freq="min", tz="US/Eastern") + ), + "datetime64_utc": pd.Series( + pd.date_range("2012-1-1 1:30", periods=1, freq="min", tz="UTC") + ), "unicode": pd.Series(["Ö"], dtype=np.unicode), "null": pd.Series([None], dtype=object), # Adding a byte type with value as byte sequence which can not be encoded as UTF8 diff --git a/kartothek/io/eager.py b/kartothek/io/eager.py index 642b1374..38a94a4b 100644 --- a/kartothek/io/eager.py +++ b/kartothek/io/eager.py @@ -49,6 +49,8 @@ @default_docs def delete_dataset(dataset_uuid=None, store=None, factory=None): """ + Delete the entire dataset from the store. + Parameters ---------- """ diff --git a/kartothek/io/testing/read.py b/kartothek/io/testing/read.py index d9a3a5ab..b475ca30 100644 --- a/kartothek/io/testing/read.py +++ b/kartothek/io/testing/read.py @@ -32,11 +32,13 @@ import datetime +from distutils.version import LooseVersion from functools import wraps from itertools import permutations import pandas as pd import pandas.testing as pdt +import pyarrow as pa import pytest from kartothek.core.uuid import gen_uuid @@ -726,3 +728,35 @@ def test_binary_column_metadata(store_factory, bound_load_dataframes): # Assert column names are of type `str`, instead of `bytes` objects assert set(df.columns.map(type)) == {str} + + +@pytest.mark.xfail( + LooseVersion(pa.__version__) < "0.16.1.dev308", + reason="pa.Schema.from_pandas cannot deal with ExtensionDtype", +) +def test_extensiondtype_rountrip(store_factory, bound_load_dataframes): + table_name = SINGLE_TABLE + df = { + "label": "part1", + "data": [ + (table_name, pd.DataFrame({"str": pd.Series(["a", "b"], dtype="string")})) + ], + } + + store_dataframes_as_dataset( + dfs=[df], store=store_factory, dataset_uuid="dataset_uuid" + ) + + result = bound_load_dataframes( + dataset_uuid="dataset_uuid", store=store_factory, tables=table_name + ) + + probe = result[0] + if isinstance(probe, MetaPartition): + result_dfs = [mp.data[table_name] for mp in result] + elif isinstance(probe, dict): + result_dfs = [mp[table_name] for mp in result] + else: + result_dfs = result + result_df = pd.concat(result_dfs).reset_index(drop=True) + pdt.assert_frame_equal(df["data"][0][1], result_df) diff --git a/tests/core/test_common_metadata.py b/tests/core/test_common_metadata.py index 5a3b5347..2e591872 100644 --- a/tests/core/test_common_metadata.py +++ b/tests/core/test_common_metadata.py @@ -45,8 +45,10 @@ def test_store_schema_metadata(store, df_all_types): key = "some_uuid/some_table/_common_metadata" assert key in store.keys() - pq_file = pq.ParquetFile(store.open(key)) - actual_schema = pq_file.schema.to_arrow_schema() + + actual_schema = read_schema_metadata( + dataset_uuid="some_uuid", store=store, table="some_table" + ) fields = [ pa.field("array_float32", pa.list_(pa.float64())), pa.field("array_float64", pa.list_(pa.float64())), @@ -63,6 +65,8 @@ def test_store_schema_metadata(store, df_all_types): pa.field("byte", pa.binary()), pa.field("date", pa.date32()), pa.field("datetime64", pa.timestamp("us")), + pa.field("datetime64_tz", pa.timestamp("us", tz="US/Eastern")), + pa.field("datetime64_utc", pa.timestamp("us", tz="UTC")), pa.field("float32", pa.float64()), pa.field("float64", pa.float64()), pa.field("int16", pa.int64()), @@ -96,7 +100,7 @@ def test_pickle(df_all_types): obj1 = make_meta(df_all_types, origin="df_all_types") s = pickle.dumps(obj1) obj2 = pickle.loads(s) - assert obj1 == obj2 + assert SchemaWrapper(obj1, "obj1") == SchemaWrapper(obj2, "obj2") def test_wrapper(df_all_types): @@ -445,9 +449,9 @@ def test_diff_schemas(df_all_types): array_float64: list child 0, item: double array_int16: list -@@ -26,10 +24,11 @@ +@@ -28,10 +26,11 @@ - datetime64: timestamp[ns] + datetime64_utc: timestamp[us, tz=UTC] float32: double float64: double -int16: int64 @@ -477,7 +481,7 @@ def test_diff_schemas(df_all_types): 'metadata': None, 'name': 'array_float64', 'numpy_type': 'object', -@@ -91,8 +86,8 @@ +@@ -101,8 +96,8 @@ {'field_name': 'int16', 'metadata': None, @@ -489,7 +493,7 @@ def test_diff_schemas(df_all_types): {'field_name': 'int32', 'metadata': None, 'name': 'int32', -@@ -108,6 +103,11 @@ +@@ -118,6 +113,11 @@ 'name': 'int8', 'numpy_type': 'int64', diff --git a/tests/core/test_index.py b/tests/core/test_index.py index 75f48df8..c11cc1b7 100644 --- a/tests/core/test_index.py +++ b/tests/core/test_index.py @@ -14,6 +14,7 @@ from hypothesis import assume, given from pandas.testing import assert_series_equal +from kartothek.core._compat import ARROW_LARGER_EQ_0150 from kartothek.core.index import ExplicitSecondaryIndex, IndexBase, merge_indices from kartothek.core.testing import get_numpy_array_strategy @@ -472,6 +473,50 @@ def test_index_raises_null_dtype(): assert str(exc.value) == "Indices w/ null/NA type are not supported" +@pytest.mark.parametrize( + "dtype,value", + [ + (pa.bool_(), True), + (pa.int64(), 1), + (pa.float64(), 1.1), + (pa.binary(), b"x"), + (pa.string(), "x"), + (pa.timestamp("ns"), pd.Timestamp("2018-01-01").to_datetime64()), + (pa.date32(), datetime.date(2018, 1, 1)), + pytest.param( + pa.timestamp("ns", tz=pytz.timezone("Europe/Berlin")), + pd.Timestamp("2018-01-01", tzinfo=pytz.timezone("Europe/Berlin")), + marks=pytest.mark.xfail( + not ARROW_LARGER_EQ_0150, + reason="Timezone reoundtrips not supported in older versions", + ), + ), + ], +) +def test_observed_values_plain(dtype, value): + ind = ExplicitSecondaryIndex( + column="col", dtype=dtype, index_dct={value: ["part_label"]} + ) + observed = ind.observed_values() + assert len(observed) == 1 + assert list(observed) == [value] + + +@pytest.mark.parametrize("date_as_object", [None, True, False]) +def test_observed_values_date_as_object(date_as_object): + value = datetime.date(2020, 1, 1) + ind = ExplicitSecondaryIndex( + column="col", dtype=pa.date32(), index_dct={value: ["part_label"]} + ) + observed = ind.observed_values(date_as_object=date_as_object) + if date_as_object: + expected = value + else: + expected = pd.Timestamp(value).to_datetime64() + assert len(observed) == 1 + assert observed[0] == expected + + @pytest.mark.parametrize( "dtype,value,expected", [ diff --git a/tests/io/dask/dataframe/test_read.py b/tests/io/dask/dataframe/test_read.py index 752d7462..542b12a7 100644 --- a/tests/io/dask/dataframe/test_read.py +++ b/tests/io/dask/dataframe/test_read.py @@ -10,6 +10,7 @@ from pandas import testing as pdt from pandas.testing import assert_frame_equal +from kartothek.core.testing import get_dataframe_not_nested from kartothek.io.dask.dataframe import read_dataset_as_ddf from kartothek.io.eager import store_dataframes_as_dataset from kartothek.io.testing.read import * # noqa @@ -169,6 +170,34 @@ def test_reconstruct_dask_index(store_factory, index_type, monkeypatch): assert_frame_equal(ddf_expected_simple.compute(), ddf.compute()) +@pytest.fixture() +def setup_reconstruct_dask_index_types(store_factory, df_not_nested): + indices = list(df_not_nested.columns) + indices.remove("null") + return store_dataframes_as_dataset( + store=store_factory, + dataset_uuid="dataset_uuid", + dfs=[df_not_nested], + secondary_indices=indices, + ) + + +@pytest.mark.parametrize("col", get_dataframe_not_nested().columns) +def test_reconstruct_dask_index_types( + store_factory, setup_reconstruct_dask_index_types, col +): + if col == "null": + pytest.xfail(reason="Cannot index null column") + ddf = read_dataset_as_ddf( + dataset_uuid=setup_reconstruct_dask_index_types.uuid, + store=store_factory, + table="table", + dask_index_on=col, + ) + assert ddf.known_divisions + assert ddf.index.name == col + + def test_reconstruct_dask_index_sorting(store_factory, monkeypatch): # Make sure we're not shuffling anything diff --git a/tests/serialization/test_arrow_compat.py b/tests/serialization/test_arrow_compat.py index 226133de..f35d193d 100644 --- a/tests/serialization/test_arrow_compat.py +++ b/tests/serialization/test_arrow_compat.py @@ -45,7 +45,7 @@ def test_arrow_compat(arrow_version, reference_store, mocker): bytes=b"\x82\xd6\xc1\x06Z\x08\x11\xe9\x85eJ\x00\x07\xf8\n\x10" ) - orig = get_dataframe_alltypes() + orig = get_dataframe_alltypes().drop(columns=["datetime64_tz", "datetime64_utc"]) restored = ParquetSerializer().restore_dataframe( store=reference_store, key=arrow_version + ".parquet", date_as_object=True )