Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fixed schema violation for timestamp with tz #260

Open
wants to merge 24 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
861a8d5
Allow dask_index_on for all col types
fjetter Mar 17, 2020
c9cc1fa
Merge branch 'master' into bugfix/gh248
fjetter Mar 18, 2020
cd430fa
fixed schema violation for timestamp with tz
ged-steponavicius Mar 19, 2020
04147fd
add benchmark for observed values
fjetter Mar 19, 2020
c29bf98
Merge branch 'bugfix/gh248' of github.com:fjetter/kartothek into bugf…
fjetter Mar 19, 2020
f9fe85d
tz bug fix: isinstace instead of type check
ged-steponavicius Mar 19, 2020
a13c258
tz bug fix: added comment to Changes and refactored for loop
ged-steponavicius Mar 19, 2020
63f4163
Merge branch 'master' into tz_bug_fix
ged-steponavicius Mar 19, 2020
4d4f9e0
Merge pull request #253 from fjetter/bugfix/gh248
fjetter Mar 19, 2020
6a17b6e
test failures
ged-steponavicius Mar 19, 2020
3306b71
Merge branch 'tz_bug_fix' of https://github.com/ged-steponavicius/kar…
ged-steponavicius Mar 19, 2020
b0d3f54
Update release date of 3.8.1
fjetter Mar 20, 2020
98daf2f
test failures 2, some hardcoded expected values needed changes after …
ged-steponavicius Mar 22, 2020
7cf4191
iterating over schema directly only supported from arrow 15.0+
ged-steponavicius Mar 22, 2020
7e80f27
Add Docstrings to Delete Dataset API
Kshitij68 Mar 22, 2020
3199052
Merge pull request #261 from Kshitij68/add_docstrings_to_delete_datas…
fjetter Mar 23, 2020
1b90da7
Add a test for roundtripping ExtensionArrays
xhochy Mar 19, 2020
5cba248
Merge pull request #256 from xhochy/extenionarray-roundtrip
fjetter Mar 24, 2020
83746de
tz fix moved to _schema_compat
Apr 7, 2020
d070094
Merge branch 'tz_bug_fix' of https://github.com/ged-steponavicius/kar…
Apr 7, 2020
62284aa
Update kartothek/core/common_metadata.py
ged-steponavicius Apr 8, 2020
51397b1
Update CHANGES.rst
ged-steponavicius Apr 8, 2020
1618fac
Update common_metadata.py
ged-steponavicius Apr 8, 2020
d2faed5
Update test_common_metadata.py
ged-steponavicius Apr 8, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
Changelog
=========

Version 3.8.1 (2020-03-XX)
Version 3.8.1 (2020-03-20)
ged-steponavicius marked this conversation as resolved.
Show resolved Hide resolved
==========================

Improvements
Expand All @@ -12,8 +12,11 @@ Improvements

Bug fixes
^^^^^^^^^
* GH248 Fix an issue causing a ValueError to be raised when using `dask_index_on` on non-integer columns
* GH255 Fix an issue causing the python interpreter to shut down when reading an
empty file (see also https://issues.apache.org/jira/browse/ARROW-8142)
* GH259 Fix an issue where Timestamp with timezone produce Schema violation


Version 3.8.0 (2020-03-12)
==========================
Expand Down
3 changes: 3 additions & 0 deletions asv_bench/benchmarks/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ def time_as_series_partitions_as_index(
):
self.ktk_index.as_flat_series(partitions_as_index=True)

def time_observed_values(self, number_values, number_partitions, arrow_type):
self.ktk_index.observed_values()


class SerializeIndex(IndexBase):
timeout = 180
Expand Down
47 changes: 34 additions & 13 deletions kartothek/core/common_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,10 @@ def __init__(self, schema, origin: Union[str, Set[str]]):
origin = copy(origin)
if not all(isinstance(s, str) for s in origin):
raise TypeError("Schema origin elements must be strings.")

# print('init')
self.__schema = schema
self.__origin = origin
# print('compat')
self._schema_compat()

def with_origin(self, origin: Union[str, Set[str]]) -> "SchemaWrapper":
Expand Down Expand Up @@ -78,6 +79,36 @@ def _schema_compat(self):

schema = schema.remove_metadata()
md = {b"pandas": _dict_to_binary(pandas_metadata)}
# https://github.com/JDASoftwareGroup/kartothek/issues/259

if schema is not None:
fields = []
for f in schema:
if pa.types.is_timestamp(f.type):
if ARROW_LARGER_EQ_0150:
f = pa.field(f.name, pa.timestamp("us", tz=f.type.tz))
else:
## using pandas meta to pass timezone
# pandas_field = [field for field in pandas_metadata['columns'] if field['name']==f.name]
pandas_field = list(
filter(
lambda x: x["name"] == f.name,
pandas_metadata["columns"],
)
)
ged-steponavicius marked this conversation as resolved.
Show resolved Hide resolved
if (
len(pandas_field) == 1
and len(pandas_field) == 1
and pandas_field[0].get("metadata")
):
tz = pandas_field[0].get("metadata", {}).get("timezone")
f = pa.field(f.name, pa.timestamp("us", tz=tz))
else:
f = pa.field(f.name, pa.timestamp("us"))
fields.append(f)

schema = pa.schema(fields)

if ARROW_LARGER_EQ_0150:
schema = schema.with_metadata(md)
else:
Expand Down Expand Up @@ -380,24 +411,14 @@ def store_schema_metadata(schema, dataset_uuid, store, table):

def _schema2bytes(schema):
buf = pa.BufferOutputStream()
pq.write_metadata(schema, buf, version="2.0", coerce_timestamps="us")
pq.write_metadata(schema, buf, version="2.0")
return buf.getvalue().to_pybytes()


def _bytes2schema(data):
reader = pa.BufferReader(data)
schema = pq.read_schema(reader)
fields = []
for idx in range(len(schema)):
f = schema[idx]

# schema data recovered from parquet always contains timestamp data in us-granularity, but pandas will use
# ns-granularity, so we re-align the two different worlds here
if f.type == pa.timestamp("us"):
f = pa.field(f.name, pa.timestamp("ns"))

fields.append(f)
return pa.schema(fields, schema.metadata)
return schema


def _pandas_in_schemas(schemas):
Expand Down
10 changes: 4 additions & 6 deletions kartothek/core/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,15 +132,13 @@ def __repr__(self) -> str:
class_=type(self).__name__, attrs=", ".join(repr_str)
)

def observed_values(self) -> np.array:
def observed_values(self, date_as_object=True) -> np.array:
"""
Return an array of all observed values
"""
return np.fromiter(
(self.normalize_value(self.dtype, x) for x in self.index_dct.keys()),
count=len(self.index_dct),
dtype=self.dtype.to_pandas_dtype(),
)
keys = np.array(list(self.index_dct.keys()))
labeled_array = pa.array(keys, type=self.dtype)
return np.array(labeled_array.to_pandas(date_as_object=date_as_object))
Comment on lines +139 to +141
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should already be on master. Is github fooling me or what's happening here?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have pulled the latest master before committing, I think GitHub is confused

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like the rebase got a bit messed up. @ged-steponavicius could you perhaps try rebasing again on master?


@staticmethod
def normalize_value(dtype: pa.DataType, value: Any) -> Any:
Expand Down
6 changes: 6 additions & 0 deletions kartothek/core/testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@ def get_dataframe_not_nested():
"float64": pd.Series([1.0], dtype=np.float64),
"date": pd.Series([date(2018, 1, 1)], dtype=object),
"datetime64": pd.Series(["2018-01-01"], dtype="datetime64[ns]"),
"datetime64_tz": pd.Series(
pd.date_range("2012-1-1 1:30", periods=1, freq="min", tz="US/Eastern")
),
"datetime64_utc": pd.Series(
pd.date_range("2012-1-1 1:30", periods=1, freq="min", tz="UTC")
),
"unicode": pd.Series(["Ö"], dtype=np.unicode),
"null": pd.Series([None], dtype=object),
# Adding a byte type with value as byte sequence which can not be encoded as UTF8
Expand Down
2 changes: 2 additions & 0 deletions kartothek/io/eager.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@
@default_docs
def delete_dataset(dataset_uuid=None, store=None, factory=None):
"""
Delete the entire dataset from the store.

Parameters
----------
"""
Expand Down
34 changes: 34 additions & 0 deletions kartothek/io/testing/read.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,13 @@


import datetime
from distutils.version import LooseVersion
from functools import wraps
from itertools import permutations

import pandas as pd
import pandas.testing as pdt
import pyarrow as pa
import pytest

from kartothek.core.uuid import gen_uuid
Expand Down Expand Up @@ -726,3 +728,35 @@ def test_binary_column_metadata(store_factory, bound_load_dataframes):

# Assert column names are of type `str`, instead of `bytes` objects
assert set(df.columns.map(type)) == {str}


@pytest.mark.xfail(
LooseVersion(pa.__version__) < "0.16.1.dev308",
reason="pa.Schema.from_pandas cannot deal with ExtensionDtype",
)
def test_extensiondtype_rountrip(store_factory, bound_load_dataframes):
table_name = SINGLE_TABLE
df = {
"label": "part1",
"data": [
(table_name, pd.DataFrame({"str": pd.Series(["a", "b"], dtype="string")}))
],
}

store_dataframes_as_dataset(
dfs=[df], store=store_factory, dataset_uuid="dataset_uuid"
)

result = bound_load_dataframes(
dataset_uuid="dataset_uuid", store=store_factory, tables=table_name
)

probe = result[0]
if isinstance(probe, MetaPartition):
result_dfs = [mp.data[table_name] for mp in result]
elif isinstance(probe, dict):
result_dfs = [mp[table_name] for mp in result]
else:
result_dfs = result
result_df = pd.concat(result_dfs).reset_index(drop=True)
pdt.assert_frame_equal(df["data"][0][1], result_df)
19 changes: 12 additions & 7 deletions tests/core/test_common_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,11 @@ def test_store_schema_metadata(store, df_all_types):

key = "some_uuid/some_table/_common_metadata"
assert key in store.keys()
pq_file = pq.ParquetFile(store.open(key))
actual_schema = pq_file.schema.to_arrow_schema()
# pq_file = pq.ParquetFile(store.open(key))
ged-steponavicius marked this conversation as resolved.
Show resolved Hide resolved
# actual_schema = pq_file.schema.to_arrow_schema()
actual_schema = read_schema_metadata(
dataset_uuid="some_uuid", store=store, table="some_table"
)
fields = [
pa.field("array_float32", pa.list_(pa.float64())),
pa.field("array_float64", pa.list_(pa.float64())),
Expand All @@ -63,6 +66,8 @@ def test_store_schema_metadata(store, df_all_types):
pa.field("byte", pa.binary()),
pa.field("date", pa.date32()),
pa.field("datetime64", pa.timestamp("us")),
pa.field("datetime64_tz", pa.timestamp("us", tz="US/Eastern")),
pa.field("datetime64_utc", pa.timestamp("us", tz="UTC")),
pa.field("float32", pa.float64()),
pa.field("float64", pa.float64()),
pa.field("int16", pa.int64()),
Expand Down Expand Up @@ -96,7 +101,7 @@ def test_pickle(df_all_types):
obj1 = make_meta(df_all_types, origin="df_all_types")
s = pickle.dumps(obj1)
obj2 = pickle.loads(s)
assert obj1 == obj2
assert SchemaWrapper(obj1, "obj1") == SchemaWrapper(obj2, "obj2")


def test_wrapper(df_all_types):
Expand Down Expand Up @@ -445,9 +450,9 @@ def test_diff_schemas(df_all_types):
array_float64: list<item: double>
child 0, item: double
array_int16: list<item: int64>
@@ -26,10 +24,11 @@
@@ -28,10 +26,11 @@

datetime64: timestamp[ns]
datetime64_utc: timestamp[us, tz=UTC]
float32: double
float64: double
-int16: int64
Expand Down Expand Up @@ -477,7 +482,7 @@ def test_diff_schemas(df_all_types):
'metadata': None,
'name': 'array_float64',
'numpy_type': 'object',
@@ -91,8 +86,8 @@
@@ -101,8 +96,8 @@

{'field_name': 'int16',
'metadata': None,
Expand All @@ -489,7 +494,7 @@ def test_diff_schemas(df_all_types):
{'field_name': 'int32',
'metadata': None,
'name': 'int32',
@@ -108,6 +103,11 @@
@@ -118,6 +113,11 @@

'name': 'int8',
'numpy_type': 'int64',
Expand Down
45 changes: 45 additions & 0 deletions tests/core/test_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from hypothesis import assume, given
from pandas.testing import assert_series_equal

from kartothek.core._compat import ARROW_LARGER_EQ_0150
from kartothek.core.index import ExplicitSecondaryIndex, IndexBase, merge_indices
from kartothek.core.testing import get_numpy_array_strategy

Expand Down Expand Up @@ -472,6 +473,50 @@ def test_index_raises_null_dtype():
assert str(exc.value) == "Indices w/ null/NA type are not supported"


@pytest.mark.parametrize(
"dtype,value",
[
(pa.bool_(), True),
(pa.int64(), 1),
(pa.float64(), 1.1),
(pa.binary(), b"x"),
(pa.string(), "x"),
(pa.timestamp("ns"), pd.Timestamp("2018-01-01").to_datetime64()),
(pa.date32(), datetime.date(2018, 1, 1)),
pytest.param(
pa.timestamp("ns", tz=pytz.timezone("Europe/Berlin")),
pd.Timestamp("2018-01-01", tzinfo=pytz.timezone("Europe/Berlin")),
marks=pytest.mark.xfail(
not ARROW_LARGER_EQ_0150,
reason="Timezone reoundtrips not supported in older versions",
),
),
],
)
def test_observed_values_plain(dtype, value):
ind = ExplicitSecondaryIndex(
column="col", dtype=dtype, index_dct={value: ["part_label"]}
)
observed = ind.observed_values()
assert len(observed) == 1
assert list(observed) == [value]


@pytest.mark.parametrize("date_as_object", [None, True, False])
def test_observed_values_date_as_object(date_as_object):
value = datetime.date(2020, 1, 1)
ind = ExplicitSecondaryIndex(
column="col", dtype=pa.date32(), index_dct={value: ["part_label"]}
)
observed = ind.observed_values(date_as_object=date_as_object)
if date_as_object:
expected = value
else:
expected = pd.Timestamp(value).to_datetime64()
assert len(observed) == 1
assert observed[0] == expected


@pytest.mark.parametrize(
"dtype,value,expected",
[
Expand Down
29 changes: 29 additions & 0 deletions tests/io/dask/dataframe/test_read.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from pandas import testing as pdt
from pandas.testing import assert_frame_equal

from kartothek.core.testing import get_dataframe_not_nested
from kartothek.io.dask.dataframe import read_dataset_as_ddf
from kartothek.io.eager import store_dataframes_as_dataset
from kartothek.io.testing.read import * # noqa
Expand Down Expand Up @@ -169,6 +170,34 @@ def test_reconstruct_dask_index(store_factory, index_type, monkeypatch):
assert_frame_equal(ddf_expected_simple.compute(), ddf.compute())


@pytest.fixture()
def setup_reconstruct_dask_index_types(store_factory, df_not_nested):
indices = list(df_not_nested.columns)
indices.remove("null")
return store_dataframes_as_dataset(
store=store_factory,
dataset_uuid="dataset_uuid",
dfs=[df_not_nested],
secondary_indices=indices,
)


@pytest.mark.parametrize("col", get_dataframe_not_nested().columns)
def test_reconstruct_dask_index_types(
store_factory, setup_reconstruct_dask_index_types, col
):
if col == "null":
pytest.xfail(reason="Cannot index null column")
ddf = read_dataset_as_ddf(
dataset_uuid=setup_reconstruct_dask_index_types.uuid,
store=store_factory,
table="table",
dask_index_on=col,
)
assert ddf.known_divisions
assert ddf.index.name == col


def test_reconstruct_dask_index_sorting(store_factory, monkeypatch):

# Make sure we're not shuffling anything
Expand Down
2 changes: 1 addition & 1 deletion tests/serialization/test_arrow_compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def test_arrow_compat(arrow_version, reference_store, mocker):
bytes=b"\x82\xd6\xc1\x06Z\x08\x11\xe9\x85eJ\x00\x07\xf8\n\x10"
)

orig = get_dataframe_alltypes()
orig = get_dataframe_alltypes().drop(columns=["datetime64_tz", "datetime64_utc"])
ged-steponavicius marked this conversation as resolved.
Show resolved Hide resolved
restored = ParquetSerializer().restore_dataframe(
store=reference_store, key=arrow_version + ".parquet", date_as_object=True
)
Expand Down