“ValueError: If using all scalar values, you must pass an index” when using Dask DataFrame.apply()Constructing pandas DataFrame from values in variables gives “ValueError: If using all scalar values, you must pass an index”Constructing pandas DataFrame from values in variables gives “ValueError: If using all scalar values, you must pass an index”Trouble saving a dask dataframe in any suitable formatDask: ValueError: Integer column has NA valuesdask.async.MemoryError on to_csvReceived “ValueError: If using all scalar values, you must pass an index” in PythonUsing numpy.vecotrize correctlyDask AttributeError: 'DataFrame' object has no attribute '_repr_data'How to merge dataframes with dask without running out of memory?
What are the effects of abstaining from eating a certain flavor?
How did the Time Lords put a whole "Star" in a Tardis?
Can you create a free-floating MASYU puzzle?
When is one 'Ready' to make Original Contributions to Mathematics?
Why do people prefer metropolitan areas, considering monsters and villains?
Is it ok for parents to kiss and romance with each other while their 2- to 8-year-old child watches?
Where are the Wazirs?
What do you call a situation where you have choices but no good choice?
Is this car delivery via Ebay Motors on Craigslist a scam?
WPF Palindrome Checker Application
Why do Martians have to wear space helmets?
What factors could lead to bishops establishing monastic armies?
Is it acceptable that I plot a time-series figure with years increasing from right to left?
Possibility to correct pitch from digital versions of records with the hole not centered
Troubling glyphs
Users forgotting to regenerate PDF before sending it
What's the difference between a type and a kind?
How did the IEC decide to create kibibytes?
How was the website able to tell my credit card was wrong before it processed it?
Alice's First Code Review
Why no parachutes in the Orion AA2 abort test?
Passwordless authentication - how and when to invalidate a login code
Are host configurations in the SSH config merged?
Did William Shakespeare hide things in his writings?
“ValueError: If using all scalar values, you must pass an index” when using Dask DataFrame.apply()
Constructing pandas DataFrame from values in variables gives “ValueError: If using all scalar values, you must pass an index”Constructing pandas DataFrame from values in variables gives “ValueError: If using all scalar values, you must pass an index”Trouble saving a dask dataframe in any suitable formatDask: ValueError: Integer column has NA valuesdask.async.MemoryError on to_csvReceived “ValueError: If using all scalar values, you must pass an index” in PythonUsing numpy.vecotrize correctlyDask AttributeError: 'DataFrame' object has no attribute '_repr_data'How to merge dataframes with dask without running out of memory?
.everyoneloves__top-leaderboard:empty,.everyoneloves__mid-leaderboard:empty,.everyoneloves__bot-mid-leaderboard:empty margin-bottom:0;
I have some data that I'd like to expand from a single column to multiple columns using Dask. I'm doing so by using Dask's DataFrame apply
method and a custom function that returns a new Pandas DataFrame. However, when I try to do so, I get ValueError: If using all scalar values, you must pass an index
. Here's a minimal reproduction case:
import dask.dataframe as dd
import pandas as pd
pd_df = pd.DataFrame('a': [1, 2, 3, 4, 5], dtype=float)
df = dd.from_pandas(pd_df, npartitions=2)
def custom_fn(row):
num = row['a']
frame = pd.DataFrame(
'squared': [num * num],
'x2': [num * 2],
, dtype=float, index=[0])
return frame
new_frame = df.apply(custom_fn, axis=1, meta=
'squared': float,
'x2': float,
, result_type='expand')
new_frame.head()
And the stacktrace:
---------------------------------------------------------------------------
ValueError Traceback (most recent call last)
<ipython-input-38-6aaf3a5d32b2> in <module>()
12 }, result_type='expand')
13
---> 14 new_frame.head()
/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/dask/dataframe/core.pyc in head(self, n, npartitions, compute)
896
897 if compute:
--> 898 result = result.compute()
899 return result
900
/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/dask/base.pyc in compute(self, **kwargs)
154 dask.base.compute
155 """
--> 156 (result,) = compute(self, traverse=False, **kwargs)
157 return result
158
/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/dask/base.pyc in compute(*args, **kwargs)
396 keys = [x.__dask_keys__() for x in collections]
397 postcomputes = [x.__dask_postcompute__() for x in collections]
--> 398 results = schedule(dsk, keys, **kwargs)
399 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
400
/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/dask/threaded.pyc in get(dsk, result, cache, num_workers, pool, **kwargs)
74 results = get_async(pool.apply_async, len(pool._pool), dsk, result,
75 cache=cache, get_id=_thread_get_id,
---> 76 pack_exception=pack_exception, **kwargs)
77
78 # Cleanup pools associated to dead threads
/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/dask/local.pyc in get_async(apply_async, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, **kwargs)
458 _execute_task(task, data) # Re-execute locally
459 else:
--> 460 raise_exception(exc, tb)
461 res, worker_id = loads(res_info)
462 state['cache'][key] = res
/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/dask/local.pyc in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
228 try:
229 task, data = loads(task_info)
--> 230 result = _execute_task(task, data)
231 id = get_id()
232 result = dumps((result, id))
/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/dask/core.pyc in _execute_task(arg, cache, dsk)
116 elif istask(arg):
117 func, args = arg[0], arg[1:]
--> 118 args2 = [_execute_task(a, cache) for a in args]
119 return func(*args2)
120 elif not ishashable(arg):
/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/dask/core.pyc in _execute_task(arg, cache, dsk)
117 func, args = arg[0], arg[1:]
118 args2 = [_execute_task(a, cache) for a in args]
--> 119 return func(*args2)
120 elif not ishashable(arg):
121 return arg
/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/dask/optimization.pyc in __call__(self, *args)
940 % (len(self.inkeys), len(args)))
941 return core.get(self.dsk, self.outkey,
--> 942 dict(zip(self.inkeys, args)))
943
944 def __reduce__(self):
/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/dask/core.pyc in get(dsk, out, cache)
147 for key in toposort(dsk):
148 task = dsk[key]
--> 149 result = _execute_task(task, cache)
150 cache[key] = result
151 result = _execute_task(out, cache)
/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/dask/core.pyc in _execute_task(arg, cache, dsk)
117 func, args = arg[0], arg[1:]
118 args2 = [_execute_task(a, cache) for a in args]
--> 119 return func(*args2)
120 elif not ishashable(arg):
121 return arg
/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/dask/dataframe/core.pyc in apply_and_enforce(*args, **kwargs)
3792 func = kwargs.pop('_func')
3793 meta = kwargs.pop('_meta')
-> 3794 df = func(*args, **kwargs)
3795 if is_dataframe_like(df) or is_series_like(df) or is_index_like(df):
3796 if not len(df):
/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/dask/utils.pyc in __call__(self, obj, *args, **kwargs)
714
715 def __call__(self, obj, *args, **kwargs):
--> 716 return getattr(obj, self.method)(*args, **kwargs)
717
718 def __reduce__(self):
/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/pandas/core/frame.pyc in apply(self, func, axis, broadcast, raw, reduce, result_type, args, **kwds)
6485 args=args,
6486 kwds=kwds)
-> 6487 return op.get_result()
6488
6489 def applymap(self, func):
/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/pandas/core/apply.pyc in get_result(self)
149 return self.apply_raw()
150
--> 151 return self.apply_standard()
152
153 def apply_empty_result(self):
/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/pandas/core/apply.pyc in apply_standard(self)
258
259 # wrap results
--> 260 return self.wrap_results()
261
262 def apply_series_generator(self):
/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/pandas/core/apply.pyc in wrap_results(self)
306 if len(results) > 0 and is_sequence(results[0]):
307
--> 308 return self.wrap_results_for_axis()
309
310 # dict of scalars
/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/pandas/core/apply.pyc in wrap_results_for_axis(self)
382 # we have requested to expand
383 if self.result_type == 'expand':
--> 384 result = self.infer_to_same_shape()
385
386 # we have a non-series and don't want inference
/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/pandas/core/apply.pyc in infer_to_same_shape(self)
400 results = self.results
401
--> 402 result = self.obj._constructor(data=results)
403 result = result.T
404
/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/pandas/core/frame.pyc in __init__(self, data, index, columns, dtype, copy)
390 dtype=dtype, copy=copy)
391 elif isinstance(data, dict):
--> 392 mgr = init_dict(data, index, columns, dtype=dtype)
393 elif isinstance(data, ma.MaskedArray):
394 import numpy.ma.mrecords as mrecords
/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/pandas/core/internals/construction.pyc in init_dict(data, index, columns, dtype)
210 arrays = [data[k] for k in keys]
211
--> 212 return arrays_to_mgr(arrays, data_names, index, columns, dtype=dtype)
213
214
/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/pandas/core/internals/construction.pyc in arrays_to_mgr(arrays, arr_names, index, columns, dtype)
49 # figure out the index, if necessary
50 if index is None:
---> 51 index = extract_index(arrays)
52 else:
53 index = ensure_index(index)
/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/pandas/core/internals/construction.pyc in extract_index(data)
306
307 if not indexes and not raw_lengths:
--> 308 raise ValueError('If using all scalar values, you must pass'
309 ' an index')
310
ValueError: If using all scalar values, you must pass an index
I can omit result_type='expand'
and the meta
kwarg to get a DataFrame full of the DataFrames I returned in the method, but I do want it to expand inline. I'm using Dask 1.1.4 and Pandas 0.24.1 on Python 2.7.6.
EDIT: I've found that I can expand the rows later like so:
new_frame = df.apply(custom_fn, axis=1)
dd.concat([
data for _, data in new_frame.iteritems()
], interleave_partitions=True).head()
It's a little messy, but it seems to work for now at least.
python pandas dask
add a comment |
I have some data that I'd like to expand from a single column to multiple columns using Dask. I'm doing so by using Dask's DataFrame apply
method and a custom function that returns a new Pandas DataFrame. However, when I try to do so, I get ValueError: If using all scalar values, you must pass an index
. Here's a minimal reproduction case:
import dask.dataframe as dd
import pandas as pd
pd_df = pd.DataFrame('a': [1, 2, 3, 4, 5], dtype=float)
df = dd.from_pandas(pd_df, npartitions=2)
def custom_fn(row):
num = row['a']
frame = pd.DataFrame(
'squared': [num * num],
'x2': [num * 2],
, dtype=float, index=[0])
return frame
new_frame = df.apply(custom_fn, axis=1, meta=
'squared': float,
'x2': float,
, result_type='expand')
new_frame.head()
And the stacktrace:
---------------------------------------------------------------------------
ValueError Traceback (most recent call last)
<ipython-input-38-6aaf3a5d32b2> in <module>()
12 }, result_type='expand')
13
---> 14 new_frame.head()
/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/dask/dataframe/core.pyc in head(self, n, npartitions, compute)
896
897 if compute:
--> 898 result = result.compute()
899 return result
900
/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/dask/base.pyc in compute(self, **kwargs)
154 dask.base.compute
155 """
--> 156 (result,) = compute(self, traverse=False, **kwargs)
157 return result
158
/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/dask/base.pyc in compute(*args, **kwargs)
396 keys = [x.__dask_keys__() for x in collections]
397 postcomputes = [x.__dask_postcompute__() for x in collections]
--> 398 results = schedule(dsk, keys, **kwargs)
399 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
400
/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/dask/threaded.pyc in get(dsk, result, cache, num_workers, pool, **kwargs)
74 results = get_async(pool.apply_async, len(pool._pool), dsk, result,
75 cache=cache, get_id=_thread_get_id,
---> 76 pack_exception=pack_exception, **kwargs)
77
78 # Cleanup pools associated to dead threads
/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/dask/local.pyc in get_async(apply_async, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, **kwargs)
458 _execute_task(task, data) # Re-execute locally
459 else:
--> 460 raise_exception(exc, tb)
461 res, worker_id = loads(res_info)
462 state['cache'][key] = res
/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/dask/local.pyc in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
228 try:
229 task, data = loads(task_info)
--> 230 result = _execute_task(task, data)
231 id = get_id()
232 result = dumps((result, id))
/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/dask/core.pyc in _execute_task(arg, cache, dsk)
116 elif istask(arg):
117 func, args = arg[0], arg[1:]
--> 118 args2 = [_execute_task(a, cache) for a in args]
119 return func(*args2)
120 elif not ishashable(arg):
/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/dask/core.pyc in _execute_task(arg, cache, dsk)
117 func, args = arg[0], arg[1:]
118 args2 = [_execute_task(a, cache) for a in args]
--> 119 return func(*args2)
120 elif not ishashable(arg):
121 return arg
/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/dask/optimization.pyc in __call__(self, *args)
940 % (len(self.inkeys), len(args)))
941 return core.get(self.dsk, self.outkey,
--> 942 dict(zip(self.inkeys, args)))
943
944 def __reduce__(self):
/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/dask/core.pyc in get(dsk, out, cache)
147 for key in toposort(dsk):
148 task = dsk[key]
--> 149 result = _execute_task(task, cache)
150 cache[key] = result
151 result = _execute_task(out, cache)
/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/dask/core.pyc in _execute_task(arg, cache, dsk)
117 func, args = arg[0], arg[1:]
118 args2 = [_execute_task(a, cache) for a in args]
--> 119 return func(*args2)
120 elif not ishashable(arg):
121 return arg
/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/dask/dataframe/core.pyc in apply_and_enforce(*args, **kwargs)
3792 func = kwargs.pop('_func')
3793 meta = kwargs.pop('_meta')
-> 3794 df = func(*args, **kwargs)
3795 if is_dataframe_like(df) or is_series_like(df) or is_index_like(df):
3796 if not len(df):
/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/dask/utils.pyc in __call__(self, obj, *args, **kwargs)
714
715 def __call__(self, obj, *args, **kwargs):
--> 716 return getattr(obj, self.method)(*args, **kwargs)
717
718 def __reduce__(self):
/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/pandas/core/frame.pyc in apply(self, func, axis, broadcast, raw, reduce, result_type, args, **kwds)
6485 args=args,
6486 kwds=kwds)
-> 6487 return op.get_result()
6488
6489 def applymap(self, func):
/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/pandas/core/apply.pyc in get_result(self)
149 return self.apply_raw()
150
--> 151 return self.apply_standard()
152
153 def apply_empty_result(self):
/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/pandas/core/apply.pyc in apply_standard(self)
258
259 # wrap results
--> 260 return self.wrap_results()
261
262 def apply_series_generator(self):
/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/pandas/core/apply.pyc in wrap_results(self)
306 if len(results) > 0 and is_sequence(results[0]):
307
--> 308 return self.wrap_results_for_axis()
309
310 # dict of scalars
/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/pandas/core/apply.pyc in wrap_results_for_axis(self)
382 # we have requested to expand
383 if self.result_type == 'expand':
--> 384 result = self.infer_to_same_shape()
385
386 # we have a non-series and don't want inference
/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/pandas/core/apply.pyc in infer_to_same_shape(self)
400 results = self.results
401
--> 402 result = self.obj._constructor(data=results)
403 result = result.T
404
/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/pandas/core/frame.pyc in __init__(self, data, index, columns, dtype, copy)
390 dtype=dtype, copy=copy)
391 elif isinstance(data, dict):
--> 392 mgr = init_dict(data, index, columns, dtype=dtype)
393 elif isinstance(data, ma.MaskedArray):
394 import numpy.ma.mrecords as mrecords
/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/pandas/core/internals/construction.pyc in init_dict(data, index, columns, dtype)
210 arrays = [data[k] for k in keys]
211
--> 212 return arrays_to_mgr(arrays, data_names, index, columns, dtype=dtype)
213
214
/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/pandas/core/internals/construction.pyc in arrays_to_mgr(arrays, arr_names, index, columns, dtype)
49 # figure out the index, if necessary
50 if index is None:
---> 51 index = extract_index(arrays)
52 else:
53 index = ensure_index(index)
/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/pandas/core/internals/construction.pyc in extract_index(data)
306
307 if not indexes and not raw_lengths:
--> 308 raise ValueError('If using all scalar values, you must pass'
309 ' an index')
310
ValueError: If using all scalar values, you must pass an index
I can omit result_type='expand'
and the meta
kwarg to get a DataFrame full of the DataFrames I returned in the method, but I do want it to expand inline. I'm using Dask 1.1.4 and Pandas 0.24.1 on Python 2.7.6.
EDIT: I've found that I can expand the rows later like so:
new_frame = df.apply(custom_fn, axis=1)
dd.concat([
data for _, data in new_frame.iteritems()
], interleave_partitions=True).head()
It's a little messy, but it seems to work for now at least.
python pandas dask
stackoverflow.com/questions/17839973/…
– Poete Maudit
May 31 at 11:03
add a comment |
I have some data that I'd like to expand from a single column to multiple columns using Dask. I'm doing so by using Dask's DataFrame apply
method and a custom function that returns a new Pandas DataFrame. However, when I try to do so, I get ValueError: If using all scalar values, you must pass an index
. Here's a minimal reproduction case:
import dask.dataframe as dd
import pandas as pd
pd_df = pd.DataFrame('a': [1, 2, 3, 4, 5], dtype=float)
df = dd.from_pandas(pd_df, npartitions=2)
def custom_fn(row):
num = row['a']
frame = pd.DataFrame(
'squared': [num * num],
'x2': [num * 2],
, dtype=float, index=[0])
return frame
new_frame = df.apply(custom_fn, axis=1, meta=
'squared': float,
'x2': float,
, result_type='expand')
new_frame.head()
And the stacktrace:
---------------------------------------------------------------------------
ValueError Traceback (most recent call last)
<ipython-input-38-6aaf3a5d32b2> in <module>()
12 }, result_type='expand')
13
---> 14 new_frame.head()
/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/dask/dataframe/core.pyc in head(self, n, npartitions, compute)
896
897 if compute:
--> 898 result = result.compute()
899 return result
900
/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/dask/base.pyc in compute(self, **kwargs)
154 dask.base.compute
155 """
--> 156 (result,) = compute(self, traverse=False, **kwargs)
157 return result
158
/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/dask/base.pyc in compute(*args, **kwargs)
396 keys = [x.__dask_keys__() for x in collections]
397 postcomputes = [x.__dask_postcompute__() for x in collections]
--> 398 results = schedule(dsk, keys, **kwargs)
399 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
400
/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/dask/threaded.pyc in get(dsk, result, cache, num_workers, pool, **kwargs)
74 results = get_async(pool.apply_async, len(pool._pool), dsk, result,
75 cache=cache, get_id=_thread_get_id,
---> 76 pack_exception=pack_exception, **kwargs)
77
78 # Cleanup pools associated to dead threads
/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/dask/local.pyc in get_async(apply_async, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, **kwargs)
458 _execute_task(task, data) # Re-execute locally
459 else:
--> 460 raise_exception(exc, tb)
461 res, worker_id = loads(res_info)
462 state['cache'][key] = res
/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/dask/local.pyc in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
228 try:
229 task, data = loads(task_info)
--> 230 result = _execute_task(task, data)
231 id = get_id()
232 result = dumps((result, id))
/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/dask/core.pyc in _execute_task(arg, cache, dsk)
116 elif istask(arg):
117 func, args = arg[0], arg[1:]
--> 118 args2 = [_execute_task(a, cache) for a in args]
119 return func(*args2)
120 elif not ishashable(arg):
/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/dask/core.pyc in _execute_task(arg, cache, dsk)
117 func, args = arg[0], arg[1:]
118 args2 = [_execute_task(a, cache) for a in args]
--> 119 return func(*args2)
120 elif not ishashable(arg):
121 return arg
/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/dask/optimization.pyc in __call__(self, *args)
940 % (len(self.inkeys), len(args)))
941 return core.get(self.dsk, self.outkey,
--> 942 dict(zip(self.inkeys, args)))
943
944 def __reduce__(self):
/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/dask/core.pyc in get(dsk, out, cache)
147 for key in toposort(dsk):
148 task = dsk[key]
--> 149 result = _execute_task(task, cache)
150 cache[key] = result
151 result = _execute_task(out, cache)
/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/dask/core.pyc in _execute_task(arg, cache, dsk)
117 func, args = arg[0], arg[1:]
118 args2 = [_execute_task(a, cache) for a in args]
--> 119 return func(*args2)
120 elif not ishashable(arg):
121 return arg
/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/dask/dataframe/core.pyc in apply_and_enforce(*args, **kwargs)
3792 func = kwargs.pop('_func')
3793 meta = kwargs.pop('_meta')
-> 3794 df = func(*args, **kwargs)
3795 if is_dataframe_like(df) or is_series_like(df) or is_index_like(df):
3796 if not len(df):
/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/dask/utils.pyc in __call__(self, obj, *args, **kwargs)
714
715 def __call__(self, obj, *args, **kwargs):
--> 716 return getattr(obj, self.method)(*args, **kwargs)
717
718 def __reduce__(self):
/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/pandas/core/frame.pyc in apply(self, func, axis, broadcast, raw, reduce, result_type, args, **kwds)
6485 args=args,
6486 kwds=kwds)
-> 6487 return op.get_result()
6488
6489 def applymap(self, func):
/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/pandas/core/apply.pyc in get_result(self)
149 return self.apply_raw()
150
--> 151 return self.apply_standard()
152
153 def apply_empty_result(self):
/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/pandas/core/apply.pyc in apply_standard(self)
258
259 # wrap results
--> 260 return self.wrap_results()
261
262 def apply_series_generator(self):
/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/pandas/core/apply.pyc in wrap_results(self)
306 if len(results) > 0 and is_sequence(results[0]):
307
--> 308 return self.wrap_results_for_axis()
309
310 # dict of scalars
/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/pandas/core/apply.pyc in wrap_results_for_axis(self)
382 # we have requested to expand
383 if self.result_type == 'expand':
--> 384 result = self.infer_to_same_shape()
385
386 # we have a non-series and don't want inference
/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/pandas/core/apply.pyc in infer_to_same_shape(self)
400 results = self.results
401
--> 402 result = self.obj._constructor(data=results)
403 result = result.T
404
/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/pandas/core/frame.pyc in __init__(self, data, index, columns, dtype, copy)
390 dtype=dtype, copy=copy)
391 elif isinstance(data, dict):
--> 392 mgr = init_dict(data, index, columns, dtype=dtype)
393 elif isinstance(data, ma.MaskedArray):
394 import numpy.ma.mrecords as mrecords
/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/pandas/core/internals/construction.pyc in init_dict(data, index, columns, dtype)
210 arrays = [data[k] for k in keys]
211
--> 212 return arrays_to_mgr(arrays, data_names, index, columns, dtype=dtype)
213
214
/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/pandas/core/internals/construction.pyc in arrays_to_mgr(arrays, arr_names, index, columns, dtype)
49 # figure out the index, if necessary
50 if index is None:
---> 51 index = extract_index(arrays)
52 else:
53 index = ensure_index(index)
/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/pandas/core/internals/construction.pyc in extract_index(data)
306
307 if not indexes and not raw_lengths:
--> 308 raise ValueError('If using all scalar values, you must pass'
309 ' an index')
310
ValueError: If using all scalar values, you must pass an index
I can omit result_type='expand'
and the meta
kwarg to get a DataFrame full of the DataFrames I returned in the method, but I do want it to expand inline. I'm using Dask 1.1.4 and Pandas 0.24.1 on Python 2.7.6.
EDIT: I've found that I can expand the rows later like so:
new_frame = df.apply(custom_fn, axis=1)
dd.concat([
data for _, data in new_frame.iteritems()
], interleave_partitions=True).head()
It's a little messy, but it seems to work for now at least.
python pandas dask
I have some data that I'd like to expand from a single column to multiple columns using Dask. I'm doing so by using Dask's DataFrame apply
method and a custom function that returns a new Pandas DataFrame. However, when I try to do so, I get ValueError: If using all scalar values, you must pass an index
. Here's a minimal reproduction case:
import dask.dataframe as dd
import pandas as pd
pd_df = pd.DataFrame('a': [1, 2, 3, 4, 5], dtype=float)
df = dd.from_pandas(pd_df, npartitions=2)
def custom_fn(row):
num = row['a']
frame = pd.DataFrame(
'squared': [num * num],
'x2': [num * 2],
, dtype=float, index=[0])
return frame
new_frame = df.apply(custom_fn, axis=1, meta=
'squared': float,
'x2': float,
, result_type='expand')
new_frame.head()
And the stacktrace:
---------------------------------------------------------------------------
ValueError Traceback (most recent call last)
<ipython-input-38-6aaf3a5d32b2> in <module>()
12 }, result_type='expand')
13
---> 14 new_frame.head()
/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/dask/dataframe/core.pyc in head(self, n, npartitions, compute)
896
897 if compute:
--> 898 result = result.compute()
899 return result
900
/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/dask/base.pyc in compute(self, **kwargs)
154 dask.base.compute
155 """
--> 156 (result,) = compute(self, traverse=False, **kwargs)
157 return result
158
/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/dask/base.pyc in compute(*args, **kwargs)
396 keys = [x.__dask_keys__() for x in collections]
397 postcomputes = [x.__dask_postcompute__() for x in collections]
--> 398 results = schedule(dsk, keys, **kwargs)
399 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
400
/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/dask/threaded.pyc in get(dsk, result, cache, num_workers, pool, **kwargs)
74 results = get_async(pool.apply_async, len(pool._pool), dsk, result,
75 cache=cache, get_id=_thread_get_id,
---> 76 pack_exception=pack_exception, **kwargs)
77
78 # Cleanup pools associated to dead threads
/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/dask/local.pyc in get_async(apply_async, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, **kwargs)
458 _execute_task(task, data) # Re-execute locally
459 else:
--> 460 raise_exception(exc, tb)
461 res, worker_id = loads(res_info)
462 state['cache'][key] = res
/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/dask/local.pyc in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
228 try:
229 task, data = loads(task_info)
--> 230 result = _execute_task(task, data)
231 id = get_id()
232 result = dumps((result, id))
/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/dask/core.pyc in _execute_task(arg, cache, dsk)
116 elif istask(arg):
117 func, args = arg[0], arg[1:]
--> 118 args2 = [_execute_task(a, cache) for a in args]
119 return func(*args2)
120 elif not ishashable(arg):
/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/dask/core.pyc in _execute_task(arg, cache, dsk)
117 func, args = arg[0], arg[1:]
118 args2 = [_execute_task(a, cache) for a in args]
--> 119 return func(*args2)
120 elif not ishashable(arg):
121 return arg
/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/dask/optimization.pyc in __call__(self, *args)
940 % (len(self.inkeys), len(args)))
941 return core.get(self.dsk, self.outkey,
--> 942 dict(zip(self.inkeys, args)))
943
944 def __reduce__(self):
/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/dask/core.pyc in get(dsk, out, cache)
147 for key in toposort(dsk):
148 task = dsk[key]
--> 149 result = _execute_task(task, cache)
150 cache[key] = result
151 result = _execute_task(out, cache)
/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/dask/core.pyc in _execute_task(arg, cache, dsk)
117 func, args = arg[0], arg[1:]
118 args2 = [_execute_task(a, cache) for a in args]
--> 119 return func(*args2)
120 elif not ishashable(arg):
121 return arg
/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/dask/dataframe/core.pyc in apply_and_enforce(*args, **kwargs)
3792 func = kwargs.pop('_func')
3793 meta = kwargs.pop('_meta')
-> 3794 df = func(*args, **kwargs)
3795 if is_dataframe_like(df) or is_series_like(df) or is_index_like(df):
3796 if not len(df):
/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/dask/utils.pyc in __call__(self, obj, *args, **kwargs)
714
715 def __call__(self, obj, *args, **kwargs):
--> 716 return getattr(obj, self.method)(*args, **kwargs)
717
718 def __reduce__(self):
/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/pandas/core/frame.pyc in apply(self, func, axis, broadcast, raw, reduce, result_type, args, **kwds)
6485 args=args,
6486 kwds=kwds)
-> 6487 return op.get_result()
6488
6489 def applymap(self, func):
/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/pandas/core/apply.pyc in get_result(self)
149 return self.apply_raw()
150
--> 151 return self.apply_standard()
152
153 def apply_empty_result(self):
/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/pandas/core/apply.pyc in apply_standard(self)
258
259 # wrap results
--> 260 return self.wrap_results()
261
262 def apply_series_generator(self):
/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/pandas/core/apply.pyc in wrap_results(self)
306 if len(results) > 0 and is_sequence(results[0]):
307
--> 308 return self.wrap_results_for_axis()
309
310 # dict of scalars
/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/pandas/core/apply.pyc in wrap_results_for_axis(self)
382 # we have requested to expand
383 if self.result_type == 'expand':
--> 384 result = self.infer_to_same_shape()
385
386 # we have a non-series and don't want inference
/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/pandas/core/apply.pyc in infer_to_same_shape(self)
400 results = self.results
401
--> 402 result = self.obj._constructor(data=results)
403 result = result.T
404
/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/pandas/core/frame.pyc in __init__(self, data, index, columns, dtype, copy)
390 dtype=dtype, copy=copy)
391 elif isinstance(data, dict):
--> 392 mgr = init_dict(data, index, columns, dtype=dtype)
393 elif isinstance(data, ma.MaskedArray):
394 import numpy.ma.mrecords as mrecords
/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/pandas/core/internals/construction.pyc in init_dict(data, index, columns, dtype)
210 arrays = [data[k] for k in keys]
211
--> 212 return arrays_to_mgr(arrays, data_names, index, columns, dtype=dtype)
213
214
/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/pandas/core/internals/construction.pyc in arrays_to_mgr(arrays, arr_names, index, columns, dtype)
49 # figure out the index, if necessary
50 if index is None:
---> 51 index = extract_index(arrays)
52 else:
53 index = ensure_index(index)
/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/pandas/core/internals/construction.pyc in extract_index(data)
306
307 if not indexes and not raw_lengths:
--> 308 raise ValueError('If using all scalar values, you must pass'
309 ' an index')
310
ValueError: If using all scalar values, you must pass an index
I can omit result_type='expand'
and the meta
kwarg to get a DataFrame full of the DataFrames I returned in the method, but I do want it to expand inline. I'm using Dask 1.1.4 and Pandas 0.24.1 on Python 2.7.6.
EDIT: I've found that I can expand the rows later like so:
new_frame = df.apply(custom_fn, axis=1)
dd.concat([
data for _, data in new_frame.iteritems()
], interleave_partitions=True).head()
It's a little messy, but it seems to work for now at least.
python pandas dask
python pandas dask
edited Mar 25 at 21:30
Shawn Walton
asked Mar 25 at 21:13
Shawn WaltonShawn Walton
1,1192 gold badges11 silver badges21 bronze badges
1,1192 gold badges11 silver badges21 bronze badges
stackoverflow.com/questions/17839973/…
– Poete Maudit
May 31 at 11:03
add a comment |
stackoverflow.com/questions/17839973/…
– Poete Maudit
May 31 at 11:03
stackoverflow.com/questions/17839973/…
– Poete Maudit
May 31 at 11:03
stackoverflow.com/questions/17839973/…
– Poete Maudit
May 31 at 11:03
add a comment |
0
active
oldest
votes
Your Answer
StackExchange.ifUsing("editor", function ()
StackExchange.using("externalEditor", function ()
StackExchange.using("snippets", function ()
StackExchange.snippets.init();
);
);
, "code-snippets");
StackExchange.ready(function()
var channelOptions =
tags: "".split(" "),
id: "1"
;
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function()
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled)
StackExchange.using("snippets", function()
createEditor();
);
else
createEditor();
);
function createEditor()
StackExchange.prepareEditor(
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader:
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
,
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
);
);
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55346480%2fvalueerror-if-using-all-scalar-values-you-must-pass-an-index-when-using-dask%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
0
active
oldest
votes
0
active
oldest
votes
active
oldest
votes
active
oldest
votes
Is this question similar to what you get asked at work? Learn more about asking and sharing private information with your coworkers using Stack Overflow for Teams.
Is this question similar to what you get asked at work? Learn more about asking and sharing private information with your coworkers using Stack Overflow for Teams.
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55346480%2fvalueerror-if-using-all-scalar-values-you-must-pass-an-index-when-using-dask%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
stackoverflow.com/questions/17839973/…
– Poete Maudit
May 31 at 11:03