Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

read_dataset_as_ddf raises when loading a dataset partitioned on a string column #479

Closed
mlondschien opened this issue Jun 14, 2021 · 1 comment

Comments

@mlondschien
Copy link
Contributor

mlondschien commented Jun 14, 2021

from kartothek.io.dask.dataframe import read_dataset_as_ddf
from kartothek.io.eager import store_dataframes_as_dataset
import pandas as pd
import minimalkv
from functools import partial

store=partial(minimalkv.get_store_from_url, f"hfs:///tmp?create_if_missing=False")

df = pd.DataFrame({"x": ["A", "B"], "y": ["a", "b"]}, dtype="string")

store_dataframes_as_dataset(
	dfs=[df],
	dataset_uuid="test",
	store=store,
	partition_on=["x"],
	overwrite=True
)

ddf = read_dataset_as_ddf(store=store, dataset_uuid="test")
ddf.compute(scheduler="single-threaded")

raises

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-1-2f1841c50a53> in <module>
     18
     19 ddf = read_dataset_as_ddf(store=store, dataset_uuid="test")
---> 20 ddf.compute(scheduler="single-threaded")

/opt/conda/envs/sd-pricing-r/lib/python3.8/site-packages/dask/base.py in compute(self, **kwargs)
    283         dask.base.compute
    284         """
--> 285         (result,) = compute(self, traverse=False, **kwargs)
    286         return result
    287

/opt/conda/envs/sd-pricing-r/lib/python3.8/site-packages/dask/base.py in compute(*args, **kwargs)
    565         postcomputes.append(x.__dask_postcompute__())
    566
--> 567     results = schedule(dsk, keys, **kwargs)
    568     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    569

/opt/conda/envs/sd-pricing-r/lib/python3.8/site-packages/dask/local.py in get_sync(dsk, keys, **kwargs)
    558     """
    559     kwargs.pop("num_workers", None)  # if num_workers present, remove it
--> 560     return get_async(
    561         synchronous_executor.submit,
    562         synchronous_executor._max_workers,

/opt/conda/envs/sd-pricing-r/lib/python3.8/site-packages/dask/local.py in get_async(submit, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, l
oads, chunksize, **kwargs)
    501             while state["waiting"] or state["ready"] or state["running"]:
    502                 fire_tasks(chunksize)
--> 503                 for key, res_info, failed in queue_get(queue).result():
    504                     if failed:
    505                         exc, tb = loads(res_info)

/opt/conda/envs/sd-pricing-r/lib/python3.8/concurrent/futures/_base.py in result(self, timeout)
    435                     raise CancelledError()
    436                 elif self._state == FINISHED:
--> 437                     return self.__get_result()
    438
    439                 self._condition.wait(timeout)

/opt/conda/envs/sd-pricing-r/lib/python3.8/concurrent/futures/_base.py in __get_result(self)
    387         if self._exception:
    388             try:
--> 389                 raise self._exception
    390             finally:
    391                 # Break a reference cycle with the exception in self._exception

/opt/conda/envs/sd-pricing-r/lib/python3.8/site-packages/dask/local.py in submit(self, fn, *args, **kwargs)
    543         fut = Future()
    544         try:
--> 545             fut.set_result(fn(*args, **kwargs))
    546         except BaseException as e:
    547             fut.set_exception(e)

/opt/conda/envs/sd-pricing-r/lib/python3.8/site-packages/dask/local.py in batch_execute_tasks(it)
    235     Batch computing of multiple tasks with `execute_task`
    236     """
--> 237     return [execute_task(*a) for a in it]
    238
    239

/opt/conda/envs/sd-pricing-r/lib/python3.8/site-packages/dask/local.py in <listcomp>(.0)
    235     Batch computing of multiple tasks with `execute_task`
    236     """
--> 237     return [execute_task(*a) for a in it]
    238
    239

/opt/conda/envs/sd-pricing-r/lib/python3.8/site-packages/dask/local.py in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
    226         failed = False
    227     except BaseException as e:
--> 228         result = pack_exception(e, dumps)
    229         failed = True
    230     return key, result, failed

/opt/conda/envs/sd-pricing-r/lib/python3.8/site-packages/dask/local.py in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
    221     try:
    222         task, data = loads(task_info)
--> 223         result = _execute_task(task, data)
    224         id = get_id()
    225         result = dumps((result, id))

/opt/conda/envs/sd-pricing-r/lib/python3.8/site-packages/dask/core.py in _execute_task(arg, cache, dsk)
    119         # temporaries by their reference count and can execute certain
    120         # operations in-place.
--> 121         return func(*(_execute_task(a, cache) for a in args))
    122     elif not ishashable(arg):
    123         return arg

/opt/conda/envs/sd-pricing-r/lib/python3.8/site-packages/dask/dataframe/utils.py in check_meta(x, meta, funcname, numeric_equal)
    656     elif is_dataframe_like(meta):
    657         dtypes = pd.concat([x.dtypes, meta.dtypes], axis=1, sort=True)
--> 658         bad_dtypes = [
    659             (repr(col), a, b)
    660             for col, a, b in dtypes.fillna("-").itertuples()

/opt/conda/envs/sd-pricing-r/lib/python3.8/site-packages/dask/dataframe/utils.py in <listcomp>(.0)
    659             (repr(col), a, b)
    660             for col, a, b in dtypes.fillna("-").itertuples()
--> 661             if not equal_dtypes(a, b)
    662         ]
    663         if bad_dtypes:

/opt/conda/envs/sd-pricing-r/lib/python3.8/site-packages/dask/dataframe/utils.py in equal_dtypes(a, b)
    637                 return True
    638             return a == b
--> 639         return (a.kind in eq_types and b.kind in eq_types) or (a == b)
    640
    641     if not (

TypeError: Cannot interpret 'StringDtype' as a data type

This is related to dask/dask#7805.

In read_dataset_as_ddf:

meta = _get_dask_meta_for_dataset(
ds_factory, table, columns, categoricals, dates_as_object
)

the function _get_dask_meta_for_dataset (correctly) returns a data frame with dtypes string for "x" and "y". However
read_table_as_delayed (incorrectly) returns a data frame with dtype object for "x". This then raises here
return dd.from_delayed(delayed_partitions, meta=meta)

after calling compute (see dask issue).

Also related: #425.

@mlondschien
Copy link
Contributor Author

Closed by dask/dask#7813.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

No branches or pull requests

1 participant