“ValueError: If using all scalar values, you must pass an index” when using Dask DataFrame.apply()Constructing pandas DataFrame from values in variables gives “ValueError: If using all scalar values, you must pass an index”Constructing pandas DataFrame from values in variables gives “ValueError: If using all scalar values, you must pass an index”Trouble saving a dask dataframe in any suitable formatDask: ValueError: Integer column has NA valuesdask.async.MemoryError on to_csvReceived “ValueError: If using all scalar values, you must pass an index” in PythonUsing numpy.vecotrize correctlyDask AttributeError: 'DataFrame' object has no attribute '_repr_data'How to merge dataframes with dask without running out of memory?

What are the effects of abstaining from eating a certain flavor?

How did the Time Lords put a whole "Star" in a Tardis?

Can you create a free-floating MASYU puzzle?

When is one 'Ready' to make Original Contributions to Mathematics?

Why do people prefer metropolitan areas, considering monsters and villains?

Is it ok for parents to kiss and romance with each other while their 2- to 8-year-old child watches?

Where are the Wazirs?

What do you call a situation where you have choices but no good choice?

Is this car delivery via Ebay Motors on Craigslist a scam?

WPF Palindrome Checker Application

Why do Martians have to wear space helmets?

What factors could lead to bishops establishing monastic armies?

Is it acceptable that I plot a time-series figure with years increasing from right to left?

Possibility to correct pitch from digital versions of records with the hole not centered

Troubling glyphs

Users forgotting to regenerate PDF before sending it

What's the difference between a type and a kind?

How did the IEC decide to create kibibytes?

How was the website able to tell my credit card was wrong before it processed it?

Alice's First Code Review

Why no parachutes in the Orion AA2 abort test?

Passwordless authentication - how and when to invalidate a login code

Are host configurations in the SSH config merged?

Did William Shakespeare hide things in his writings?



“ValueError: If using all scalar values, you must pass an index” when using Dask DataFrame.apply()


Constructing pandas DataFrame from values in variables gives “ValueError: If using all scalar values, you must pass an index”Constructing pandas DataFrame from values in variables gives “ValueError: If using all scalar values, you must pass an index”Trouble saving a dask dataframe in any suitable formatDask: ValueError: Integer column has NA valuesdask.async.MemoryError on to_csvReceived “ValueError: If using all scalar values, you must pass an index” in PythonUsing numpy.vecotrize correctlyDask AttributeError: 'DataFrame' object has no attribute '_repr_data'How to merge dataframes with dask without running out of memory?






.everyoneloves__top-leaderboard:empty,.everyoneloves__mid-leaderboard:empty,.everyoneloves__bot-mid-leaderboard:empty margin-bottom:0;








0















I have some data that I'd like to expand from a single column to multiple columns using Dask. I'm doing so by using Dask's DataFrame apply method and a custom function that returns a new Pandas DataFrame. However, when I try to do so, I get ValueError: If using all scalar values, you must pass an index. Here's a minimal reproduction case:



import dask.dataframe as dd
import pandas as pd

pd_df = pd.DataFrame('a': [1, 2, 3, 4, 5], dtype=float)
df = dd.from_pandas(pd_df, npartitions=2)

def custom_fn(row):
num = row['a']
frame = pd.DataFrame(
'squared': [num * num],
'x2': [num * 2],
, dtype=float, index=[0])
return frame

new_frame = df.apply(custom_fn, axis=1, meta=
'squared': float,
'x2': float,
, result_type='expand')

new_frame.head()


And the stacktrace:



---------------------------------------------------------------------------
ValueError Traceback (most recent call last)
<ipython-input-38-6aaf3a5d32b2> in <module>()
12 }, result_type='expand')
13
---> 14 new_frame.head()

/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/dask/dataframe/core.pyc in head(self, n, npartitions, compute)
896
897 if compute:
--> 898 result = result.compute()
899 return result
900

/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/dask/base.pyc in compute(self, **kwargs)
154 dask.base.compute
155 """
--> 156 (result,) = compute(self, traverse=False, **kwargs)
157 return result
158

/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/dask/base.pyc in compute(*args, **kwargs)
396 keys = [x.__dask_keys__() for x in collections]
397 postcomputes = [x.__dask_postcompute__() for x in collections]
--> 398 results = schedule(dsk, keys, **kwargs)
399 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
400

/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/dask/threaded.pyc in get(dsk, result, cache, num_workers, pool, **kwargs)
74 results = get_async(pool.apply_async, len(pool._pool), dsk, result,
75 cache=cache, get_id=_thread_get_id,
---> 76 pack_exception=pack_exception, **kwargs)
77
78 # Cleanup pools associated to dead threads

/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/dask/local.pyc in get_async(apply_async, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, **kwargs)
458 _execute_task(task, data) # Re-execute locally
459 else:
--> 460 raise_exception(exc, tb)
461 res, worker_id = loads(res_info)
462 state['cache'][key] = res

/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/dask/local.pyc in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
228 try:
229 task, data = loads(task_info)
--> 230 result = _execute_task(task, data)
231 id = get_id()
232 result = dumps((result, id))

/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/dask/core.pyc in _execute_task(arg, cache, dsk)
116 elif istask(arg):
117 func, args = arg[0], arg[1:]
--> 118 args2 = [_execute_task(a, cache) for a in args]
119 return func(*args2)
120 elif not ishashable(arg):

/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/dask/core.pyc in _execute_task(arg, cache, dsk)
117 func, args = arg[0], arg[1:]
118 args2 = [_execute_task(a, cache) for a in args]
--> 119 return func(*args2)
120 elif not ishashable(arg):
121 return arg

/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/dask/optimization.pyc in __call__(self, *args)
940 % (len(self.inkeys), len(args)))
941 return core.get(self.dsk, self.outkey,
--> 942 dict(zip(self.inkeys, args)))
943
944 def __reduce__(self):

/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/dask/core.pyc in get(dsk, out, cache)
147 for key in toposort(dsk):
148 task = dsk[key]
--> 149 result = _execute_task(task, cache)
150 cache[key] = result
151 result = _execute_task(out, cache)

/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/dask/core.pyc in _execute_task(arg, cache, dsk)
117 func, args = arg[0], arg[1:]
118 args2 = [_execute_task(a, cache) for a in args]
--> 119 return func(*args2)
120 elif not ishashable(arg):
121 return arg

/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/dask/dataframe/core.pyc in apply_and_enforce(*args, **kwargs)
3792 func = kwargs.pop('_func')
3793 meta = kwargs.pop('_meta')
-> 3794 df = func(*args, **kwargs)
3795 if is_dataframe_like(df) or is_series_like(df) or is_index_like(df):
3796 if not len(df):

/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/dask/utils.pyc in __call__(self, obj, *args, **kwargs)
714
715 def __call__(self, obj, *args, **kwargs):
--> 716 return getattr(obj, self.method)(*args, **kwargs)
717
718 def __reduce__(self):

/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/pandas/core/frame.pyc in apply(self, func, axis, broadcast, raw, reduce, result_type, args, **kwds)
6485 args=args,
6486 kwds=kwds)
-> 6487 return op.get_result()
6488
6489 def applymap(self, func):

/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/pandas/core/apply.pyc in get_result(self)
149 return self.apply_raw()
150
--> 151 return self.apply_standard()
152
153 def apply_empty_result(self):

/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/pandas/core/apply.pyc in apply_standard(self)
258
259 # wrap results
--> 260 return self.wrap_results()
261
262 def apply_series_generator(self):

/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/pandas/core/apply.pyc in wrap_results(self)
306 if len(results) > 0 and is_sequence(results[0]):
307
--> 308 return self.wrap_results_for_axis()
309
310 # dict of scalars

/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/pandas/core/apply.pyc in wrap_results_for_axis(self)
382 # we have requested to expand
383 if self.result_type == 'expand':
--> 384 result = self.infer_to_same_shape()
385
386 # we have a non-series and don't want inference

/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/pandas/core/apply.pyc in infer_to_same_shape(self)
400 results = self.results
401
--> 402 result = self.obj._constructor(data=results)
403 result = result.T
404

/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/pandas/core/frame.pyc in __init__(self, data, index, columns, dtype, copy)
390 dtype=dtype, copy=copy)
391 elif isinstance(data, dict):
--> 392 mgr = init_dict(data, index, columns, dtype=dtype)
393 elif isinstance(data, ma.MaskedArray):
394 import numpy.ma.mrecords as mrecords

/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/pandas/core/internals/construction.pyc in init_dict(data, index, columns, dtype)
210 arrays = [data[k] for k in keys]
211
--> 212 return arrays_to_mgr(arrays, data_names, index, columns, dtype=dtype)
213
214

/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/pandas/core/internals/construction.pyc in arrays_to_mgr(arrays, arr_names, index, columns, dtype)
49 # figure out the index, if necessary
50 if index is None:
---> 51 index = extract_index(arrays)
52 else:
53 index = ensure_index(index)

/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/pandas/core/internals/construction.pyc in extract_index(data)
306
307 if not indexes and not raw_lengths:
--> 308 raise ValueError('If using all scalar values, you must pass'
309 ' an index')
310

ValueError: If using all scalar values, you must pass an index


I can omit result_type='expand' and the meta kwarg to get a DataFrame full of the DataFrames I returned in the method, but I do want it to expand inline. I'm using Dask 1.1.4 and Pandas 0.24.1 on Python 2.7.6.



EDIT: I've found that I can expand the rows later like so:



new_frame = df.apply(custom_fn, axis=1)

dd.concat([
data for _, data in new_frame.iteritems()
], interleave_partitions=True).head()


It's a little messy, but it seems to work for now at least.










share|improve this question
























  • stackoverflow.com/questions/17839973/…

    – Poete Maudit
    May 31 at 11:03

















0















I have some data that I'd like to expand from a single column to multiple columns using Dask. I'm doing so by using Dask's DataFrame apply method and a custom function that returns a new Pandas DataFrame. However, when I try to do so, I get ValueError: If using all scalar values, you must pass an index. Here's a minimal reproduction case:



import dask.dataframe as dd
import pandas as pd

pd_df = pd.DataFrame('a': [1, 2, 3, 4, 5], dtype=float)
df = dd.from_pandas(pd_df, npartitions=2)

def custom_fn(row):
num = row['a']
frame = pd.DataFrame(
'squared': [num * num],
'x2': [num * 2],
, dtype=float, index=[0])
return frame

new_frame = df.apply(custom_fn, axis=1, meta=
'squared': float,
'x2': float,
, result_type='expand')

new_frame.head()


And the stacktrace:



---------------------------------------------------------------------------
ValueError Traceback (most recent call last)
<ipython-input-38-6aaf3a5d32b2> in <module>()
12 }, result_type='expand')
13
---> 14 new_frame.head()

/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/dask/dataframe/core.pyc in head(self, n, npartitions, compute)
896
897 if compute:
--> 898 result = result.compute()
899 return result
900

/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/dask/base.pyc in compute(self, **kwargs)
154 dask.base.compute
155 """
--> 156 (result,) = compute(self, traverse=False, **kwargs)
157 return result
158

/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/dask/base.pyc in compute(*args, **kwargs)
396 keys = [x.__dask_keys__() for x in collections]
397 postcomputes = [x.__dask_postcompute__() for x in collections]
--> 398 results = schedule(dsk, keys, **kwargs)
399 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
400

/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/dask/threaded.pyc in get(dsk, result, cache, num_workers, pool, **kwargs)
74 results = get_async(pool.apply_async, len(pool._pool), dsk, result,
75 cache=cache, get_id=_thread_get_id,
---> 76 pack_exception=pack_exception, **kwargs)
77
78 # Cleanup pools associated to dead threads

/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/dask/local.pyc in get_async(apply_async, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, **kwargs)
458 _execute_task(task, data) # Re-execute locally
459 else:
--> 460 raise_exception(exc, tb)
461 res, worker_id = loads(res_info)
462 state['cache'][key] = res

/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/dask/local.pyc in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
228 try:
229 task, data = loads(task_info)
--> 230 result = _execute_task(task, data)
231 id = get_id()
232 result = dumps((result, id))

/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/dask/core.pyc in _execute_task(arg, cache, dsk)
116 elif istask(arg):
117 func, args = arg[0], arg[1:]
--> 118 args2 = [_execute_task(a, cache) for a in args]
119 return func(*args2)
120 elif not ishashable(arg):

/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/dask/core.pyc in _execute_task(arg, cache, dsk)
117 func, args = arg[0], arg[1:]
118 args2 = [_execute_task(a, cache) for a in args]
--> 119 return func(*args2)
120 elif not ishashable(arg):
121 return arg

/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/dask/optimization.pyc in __call__(self, *args)
940 % (len(self.inkeys), len(args)))
941 return core.get(self.dsk, self.outkey,
--> 942 dict(zip(self.inkeys, args)))
943
944 def __reduce__(self):

/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/dask/core.pyc in get(dsk, out, cache)
147 for key in toposort(dsk):
148 task = dsk[key]
--> 149 result = _execute_task(task, cache)
150 cache[key] = result
151 result = _execute_task(out, cache)

/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/dask/core.pyc in _execute_task(arg, cache, dsk)
117 func, args = arg[0], arg[1:]
118 args2 = [_execute_task(a, cache) for a in args]
--> 119 return func(*args2)
120 elif not ishashable(arg):
121 return arg

/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/dask/dataframe/core.pyc in apply_and_enforce(*args, **kwargs)
3792 func = kwargs.pop('_func')
3793 meta = kwargs.pop('_meta')
-> 3794 df = func(*args, **kwargs)
3795 if is_dataframe_like(df) or is_series_like(df) or is_index_like(df):
3796 if not len(df):

/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/dask/utils.pyc in __call__(self, obj, *args, **kwargs)
714
715 def __call__(self, obj, *args, **kwargs):
--> 716 return getattr(obj, self.method)(*args, **kwargs)
717
718 def __reduce__(self):

/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/pandas/core/frame.pyc in apply(self, func, axis, broadcast, raw, reduce, result_type, args, **kwds)
6485 args=args,
6486 kwds=kwds)
-> 6487 return op.get_result()
6488
6489 def applymap(self, func):

/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/pandas/core/apply.pyc in get_result(self)
149 return self.apply_raw()
150
--> 151 return self.apply_standard()
152
153 def apply_empty_result(self):

/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/pandas/core/apply.pyc in apply_standard(self)
258
259 # wrap results
--> 260 return self.wrap_results()
261
262 def apply_series_generator(self):

/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/pandas/core/apply.pyc in wrap_results(self)
306 if len(results) > 0 and is_sequence(results[0]):
307
--> 308 return self.wrap_results_for_axis()
309
310 # dict of scalars

/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/pandas/core/apply.pyc in wrap_results_for_axis(self)
382 # we have requested to expand
383 if self.result_type == 'expand':
--> 384 result = self.infer_to_same_shape()
385
386 # we have a non-series and don't want inference

/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/pandas/core/apply.pyc in infer_to_same_shape(self)
400 results = self.results
401
--> 402 result = self.obj._constructor(data=results)
403 result = result.T
404

/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/pandas/core/frame.pyc in __init__(self, data, index, columns, dtype, copy)
390 dtype=dtype, copy=copy)
391 elif isinstance(data, dict):
--> 392 mgr = init_dict(data, index, columns, dtype=dtype)
393 elif isinstance(data, ma.MaskedArray):
394 import numpy.ma.mrecords as mrecords

/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/pandas/core/internals/construction.pyc in init_dict(data, index, columns, dtype)
210 arrays = [data[k] for k in keys]
211
--> 212 return arrays_to_mgr(arrays, data_names, index, columns, dtype=dtype)
213
214

/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/pandas/core/internals/construction.pyc in arrays_to_mgr(arrays, arr_names, index, columns, dtype)
49 # figure out the index, if necessary
50 if index is None:
---> 51 index = extract_index(arrays)
52 else:
53 index = ensure_index(index)

/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/pandas/core/internals/construction.pyc in extract_index(data)
306
307 if not indexes and not raw_lengths:
--> 308 raise ValueError('If using all scalar values, you must pass'
309 ' an index')
310

ValueError: If using all scalar values, you must pass an index


I can omit result_type='expand' and the meta kwarg to get a DataFrame full of the DataFrames I returned in the method, but I do want it to expand inline. I'm using Dask 1.1.4 and Pandas 0.24.1 on Python 2.7.6.



EDIT: I've found that I can expand the rows later like so:



new_frame = df.apply(custom_fn, axis=1)

dd.concat([
data for _, data in new_frame.iteritems()
], interleave_partitions=True).head()


It's a little messy, but it seems to work for now at least.










share|improve this question
























  • stackoverflow.com/questions/17839973/…

    – Poete Maudit
    May 31 at 11:03













0












0








0








I have some data that I'd like to expand from a single column to multiple columns using Dask. I'm doing so by using Dask's DataFrame apply method and a custom function that returns a new Pandas DataFrame. However, when I try to do so, I get ValueError: If using all scalar values, you must pass an index. Here's a minimal reproduction case:



import dask.dataframe as dd
import pandas as pd

pd_df = pd.DataFrame('a': [1, 2, 3, 4, 5], dtype=float)
df = dd.from_pandas(pd_df, npartitions=2)

def custom_fn(row):
num = row['a']
frame = pd.DataFrame(
'squared': [num * num],
'x2': [num * 2],
, dtype=float, index=[0])
return frame

new_frame = df.apply(custom_fn, axis=1, meta=
'squared': float,
'x2': float,
, result_type='expand')

new_frame.head()


And the stacktrace:



---------------------------------------------------------------------------
ValueError Traceback (most recent call last)
<ipython-input-38-6aaf3a5d32b2> in <module>()
12 }, result_type='expand')
13
---> 14 new_frame.head()

/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/dask/dataframe/core.pyc in head(self, n, npartitions, compute)
896
897 if compute:
--> 898 result = result.compute()
899 return result
900

/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/dask/base.pyc in compute(self, **kwargs)
154 dask.base.compute
155 """
--> 156 (result,) = compute(self, traverse=False, **kwargs)
157 return result
158

/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/dask/base.pyc in compute(*args, **kwargs)
396 keys = [x.__dask_keys__() for x in collections]
397 postcomputes = [x.__dask_postcompute__() for x in collections]
--> 398 results = schedule(dsk, keys, **kwargs)
399 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
400

/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/dask/threaded.pyc in get(dsk, result, cache, num_workers, pool, **kwargs)
74 results = get_async(pool.apply_async, len(pool._pool), dsk, result,
75 cache=cache, get_id=_thread_get_id,
---> 76 pack_exception=pack_exception, **kwargs)
77
78 # Cleanup pools associated to dead threads

/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/dask/local.pyc in get_async(apply_async, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, **kwargs)
458 _execute_task(task, data) # Re-execute locally
459 else:
--> 460 raise_exception(exc, tb)
461 res, worker_id = loads(res_info)
462 state['cache'][key] = res

/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/dask/local.pyc in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
228 try:
229 task, data = loads(task_info)
--> 230 result = _execute_task(task, data)
231 id = get_id()
232 result = dumps((result, id))

/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/dask/core.pyc in _execute_task(arg, cache, dsk)
116 elif istask(arg):
117 func, args = arg[0], arg[1:]
--> 118 args2 = [_execute_task(a, cache) for a in args]
119 return func(*args2)
120 elif not ishashable(arg):

/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/dask/core.pyc in _execute_task(arg, cache, dsk)
117 func, args = arg[0], arg[1:]
118 args2 = [_execute_task(a, cache) for a in args]
--> 119 return func(*args2)
120 elif not ishashable(arg):
121 return arg

/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/dask/optimization.pyc in __call__(self, *args)
940 % (len(self.inkeys), len(args)))
941 return core.get(self.dsk, self.outkey,
--> 942 dict(zip(self.inkeys, args)))
943
944 def __reduce__(self):

/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/dask/core.pyc in get(dsk, out, cache)
147 for key in toposort(dsk):
148 task = dsk[key]
--> 149 result = _execute_task(task, cache)
150 cache[key] = result
151 result = _execute_task(out, cache)

/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/dask/core.pyc in _execute_task(arg, cache, dsk)
117 func, args = arg[0], arg[1:]
118 args2 = [_execute_task(a, cache) for a in args]
--> 119 return func(*args2)
120 elif not ishashable(arg):
121 return arg

/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/dask/dataframe/core.pyc in apply_and_enforce(*args, **kwargs)
3792 func = kwargs.pop('_func')
3793 meta = kwargs.pop('_meta')
-> 3794 df = func(*args, **kwargs)
3795 if is_dataframe_like(df) or is_series_like(df) or is_index_like(df):
3796 if not len(df):

/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/dask/utils.pyc in __call__(self, obj, *args, **kwargs)
714
715 def __call__(self, obj, *args, **kwargs):
--> 716 return getattr(obj, self.method)(*args, **kwargs)
717
718 def __reduce__(self):

/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/pandas/core/frame.pyc in apply(self, func, axis, broadcast, raw, reduce, result_type, args, **kwds)
6485 args=args,
6486 kwds=kwds)
-> 6487 return op.get_result()
6488
6489 def applymap(self, func):

/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/pandas/core/apply.pyc in get_result(self)
149 return self.apply_raw()
150
--> 151 return self.apply_standard()
152
153 def apply_empty_result(self):

/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/pandas/core/apply.pyc in apply_standard(self)
258
259 # wrap results
--> 260 return self.wrap_results()
261
262 def apply_series_generator(self):

/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/pandas/core/apply.pyc in wrap_results(self)
306 if len(results) > 0 and is_sequence(results[0]):
307
--> 308 return self.wrap_results_for_axis()
309
310 # dict of scalars

/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/pandas/core/apply.pyc in wrap_results_for_axis(self)
382 # we have requested to expand
383 if self.result_type == 'expand':
--> 384 result = self.infer_to_same_shape()
385
386 # we have a non-series and don't want inference

/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/pandas/core/apply.pyc in infer_to_same_shape(self)
400 results = self.results
401
--> 402 result = self.obj._constructor(data=results)
403 result = result.T
404

/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/pandas/core/frame.pyc in __init__(self, data, index, columns, dtype, copy)
390 dtype=dtype, copy=copy)
391 elif isinstance(data, dict):
--> 392 mgr = init_dict(data, index, columns, dtype=dtype)
393 elif isinstance(data, ma.MaskedArray):
394 import numpy.ma.mrecords as mrecords

/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/pandas/core/internals/construction.pyc in init_dict(data, index, columns, dtype)
210 arrays = [data[k] for k in keys]
211
--> 212 return arrays_to_mgr(arrays, data_names, index, columns, dtype=dtype)
213
214

/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/pandas/core/internals/construction.pyc in arrays_to_mgr(arrays, arr_names, index, columns, dtype)
49 # figure out the index, if necessary
50 if index is None:
---> 51 index = extract_index(arrays)
52 else:
53 index = ensure_index(index)

/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/pandas/core/internals/construction.pyc in extract_index(data)
306
307 if not indexes and not raw_lengths:
--> 308 raise ValueError('If using all scalar values, you must pass'
309 ' an index')
310

ValueError: If using all scalar values, you must pass an index


I can omit result_type='expand' and the meta kwarg to get a DataFrame full of the DataFrames I returned in the method, but I do want it to expand inline. I'm using Dask 1.1.4 and Pandas 0.24.1 on Python 2.7.6.



EDIT: I've found that I can expand the rows later like so:



new_frame = df.apply(custom_fn, axis=1)

dd.concat([
data for _, data in new_frame.iteritems()
], interleave_partitions=True).head()


It's a little messy, but it seems to work for now at least.










share|improve this question
















I have some data that I'd like to expand from a single column to multiple columns using Dask. I'm doing so by using Dask's DataFrame apply method and a custom function that returns a new Pandas DataFrame. However, when I try to do so, I get ValueError: If using all scalar values, you must pass an index. Here's a minimal reproduction case:



import dask.dataframe as dd
import pandas as pd

pd_df = pd.DataFrame('a': [1, 2, 3, 4, 5], dtype=float)
df = dd.from_pandas(pd_df, npartitions=2)

def custom_fn(row):
num = row['a']
frame = pd.DataFrame(
'squared': [num * num],
'x2': [num * 2],
, dtype=float, index=[0])
return frame

new_frame = df.apply(custom_fn, axis=1, meta=
'squared': float,
'x2': float,
, result_type='expand')

new_frame.head()


And the stacktrace:



---------------------------------------------------------------------------
ValueError Traceback (most recent call last)
<ipython-input-38-6aaf3a5d32b2> in <module>()
12 }, result_type='expand')
13
---> 14 new_frame.head()

/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/dask/dataframe/core.pyc in head(self, n, npartitions, compute)
896
897 if compute:
--> 898 result = result.compute()
899 return result
900

/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/dask/base.pyc in compute(self, **kwargs)
154 dask.base.compute
155 """
--> 156 (result,) = compute(self, traverse=False, **kwargs)
157 return result
158

/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/dask/base.pyc in compute(*args, **kwargs)
396 keys = [x.__dask_keys__() for x in collections]
397 postcomputes = [x.__dask_postcompute__() for x in collections]
--> 398 results = schedule(dsk, keys, **kwargs)
399 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
400

/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/dask/threaded.pyc in get(dsk, result, cache, num_workers, pool, **kwargs)
74 results = get_async(pool.apply_async, len(pool._pool), dsk, result,
75 cache=cache, get_id=_thread_get_id,
---> 76 pack_exception=pack_exception, **kwargs)
77
78 # Cleanup pools associated to dead threads

/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/dask/local.pyc in get_async(apply_async, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, **kwargs)
458 _execute_task(task, data) # Re-execute locally
459 else:
--> 460 raise_exception(exc, tb)
461 res, worker_id = loads(res_info)
462 state['cache'][key] = res

/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/dask/local.pyc in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
228 try:
229 task, data = loads(task_info)
--> 230 result = _execute_task(task, data)
231 id = get_id()
232 result = dumps((result, id))

/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/dask/core.pyc in _execute_task(arg, cache, dsk)
116 elif istask(arg):
117 func, args = arg[0], arg[1:]
--> 118 args2 = [_execute_task(a, cache) for a in args]
119 return func(*args2)
120 elif not ishashable(arg):

/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/dask/core.pyc in _execute_task(arg, cache, dsk)
117 func, args = arg[0], arg[1:]
118 args2 = [_execute_task(a, cache) for a in args]
--> 119 return func(*args2)
120 elif not ishashable(arg):
121 return arg

/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/dask/optimization.pyc in __call__(self, *args)
940 % (len(self.inkeys), len(args)))
941 return core.get(self.dsk, self.outkey,
--> 942 dict(zip(self.inkeys, args)))
943
944 def __reduce__(self):

/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/dask/core.pyc in get(dsk, out, cache)
147 for key in toposort(dsk):
148 task = dsk[key]
--> 149 result = _execute_task(task, cache)
150 cache[key] = result
151 result = _execute_task(out, cache)

/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/dask/core.pyc in _execute_task(arg, cache, dsk)
117 func, args = arg[0], arg[1:]
118 args2 = [_execute_task(a, cache) for a in args]
--> 119 return func(*args2)
120 elif not ishashable(arg):
121 return arg

/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/dask/dataframe/core.pyc in apply_and_enforce(*args, **kwargs)
3792 func = kwargs.pop('_func')
3793 meta = kwargs.pop('_meta')
-> 3794 df = func(*args, **kwargs)
3795 if is_dataframe_like(df) or is_series_like(df) or is_index_like(df):
3796 if not len(df):

/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/dask/utils.pyc in __call__(self, obj, *args, **kwargs)
714
715 def __call__(self, obj, *args, **kwargs):
--> 716 return getattr(obj, self.method)(*args, **kwargs)
717
718 def __reduce__(self):

/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/pandas/core/frame.pyc in apply(self, func, axis, broadcast, raw, reduce, result_type, args, **kwds)
6485 args=args,
6486 kwds=kwds)
-> 6487 return op.get_result()
6488
6489 def applymap(self, func):

/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/pandas/core/apply.pyc in get_result(self)
149 return self.apply_raw()
150
--> 151 return self.apply_standard()
152
153 def apply_empty_result(self):

/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/pandas/core/apply.pyc in apply_standard(self)
258
259 # wrap results
--> 260 return self.wrap_results()
261
262 def apply_series_generator(self):

/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/pandas/core/apply.pyc in wrap_results(self)
306 if len(results) > 0 and is_sequence(results[0]):
307
--> 308 return self.wrap_results_for_axis()
309
310 # dict of scalars

/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/pandas/core/apply.pyc in wrap_results_for_axis(self)
382 # we have requested to expand
383 if self.result_type == 'expand':
--> 384 result = self.infer_to_same_shape()
385
386 # we have a non-series and don't want inference

/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/pandas/core/apply.pyc in infer_to_same_shape(self)
400 results = self.results
401
--> 402 result = self.obj._constructor(data=results)
403 result = result.T
404

/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/pandas/core/frame.pyc in __init__(self, data, index, columns, dtype, copy)
390 dtype=dtype, copy=copy)
391 elif isinstance(data, dict):
--> 392 mgr = init_dict(data, index, columns, dtype=dtype)
393 elif isinstance(data, ma.MaskedArray):
394 import numpy.ma.mrecords as mrecords

/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/pandas/core/internals/construction.pyc in init_dict(data, index, columns, dtype)
210 arrays = [data[k] for k in keys]
211
--> 212 return arrays_to_mgr(arrays, data_names, index, columns, dtype=dtype)
213
214

/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/pandas/core/internals/construction.pyc in arrays_to_mgr(arrays, arr_names, index, columns, dtype)
49 # figure out the index, if necessary
50 if index is None:
---> 51 index = extract_index(arrays)
52 else:
53 index = ensure_index(index)

/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/pandas/core/internals/construction.pyc in extract_index(data)
306
307 if not indexes and not raw_lengths:
--> 308 raise ValueError('If using all scalar values, you must pass'
309 ' an index')
310

ValueError: If using all scalar values, you must pass an index


I can omit result_type='expand' and the meta kwarg to get a DataFrame full of the DataFrames I returned in the method, but I do want it to expand inline. I'm using Dask 1.1.4 and Pandas 0.24.1 on Python 2.7.6.



EDIT: I've found that I can expand the rows later like so:



new_frame = df.apply(custom_fn, axis=1)

dd.concat([
data for _, data in new_frame.iteritems()
], interleave_partitions=True).head()


It's a little messy, but it seems to work for now at least.







python pandas dask






share|improve this question















share|improve this question













share|improve this question




share|improve this question








edited Mar 25 at 21:30







Shawn Walton

















asked Mar 25 at 21:13









Shawn WaltonShawn Walton

1,1192 gold badges11 silver badges21 bronze badges




1,1192 gold badges11 silver badges21 bronze badges












  • stackoverflow.com/questions/17839973/…

    – Poete Maudit
    May 31 at 11:03

















  • stackoverflow.com/questions/17839973/…

    – Poete Maudit
    May 31 at 11:03
















stackoverflow.com/questions/17839973/…

– Poete Maudit
May 31 at 11:03





stackoverflow.com/questions/17839973/…

– Poete Maudit
May 31 at 11:03












0






active

oldest

votes










Your Answer






StackExchange.ifUsing("editor", function ()
StackExchange.using("externalEditor", function ()
StackExchange.using("snippets", function ()
StackExchange.snippets.init();
);
);
, "code-snippets");

StackExchange.ready(function()
var channelOptions =
tags: "".split(" "),
id: "1"
;
initTagRenderer("".split(" "), "".split(" "), channelOptions);

StackExchange.using("externalEditor", function()
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled)
StackExchange.using("snippets", function()
createEditor();
);

else
createEditor();

);

function createEditor()
StackExchange.prepareEditor(
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader:
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
,
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
);



);













draft saved

draft discarded


















StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55346480%2fvalueerror-if-using-all-scalar-values-you-must-pass-an-index-when-using-dask%23new-answer', 'question_page');

);

Post as a guest















Required, but never shown

























0






active

oldest

votes








0






active

oldest

votes









active

oldest

votes






active

oldest

votes




Is this question similar to what you get asked at work? Learn more about asking and sharing private information with your coworkers using Stack Overflow for Teams.







Is this question similar to what you get asked at work? Learn more about asking and sharing private information with your coworkers using Stack Overflow for Teams.



















draft saved

draft discarded
















































Thanks for contributing an answer to Stack Overflow!


  • Please be sure to answer the question. Provide details and share your research!

But avoid


  • Asking for help, clarification, or responding to other answers.

  • Making statements based on opinion; back them up with references or personal experience.

To learn more, see our tips on writing great answers.




draft saved


draft discarded














StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55346480%2fvalueerror-if-using-all-scalar-values-you-must-pass-an-index-when-using-dask%23new-answer', 'question_page');

);

Post as a guest















Required, but never shown





















































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown

































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown







Popular posts from this blog

Kamusi Yaliyomo Aina za kamusi | Muundo wa kamusi | Faida za kamusi | Dhima ya picha katika kamusi | Marejeo | Tazama pia | Viungo vya nje | UrambazajiKuhusu kamusiGo-SwahiliWiki-KamusiKamusi ya Kiswahili na Kiingerezakuihariri na kuongeza habari

SQL error code 1064 with creating Laravel foreign keysForeign key constraints: When to use ON UPDATE and ON DELETEDropping column with foreign key Laravel error: General error: 1025 Error on renameLaravel SQL Can't create tableLaravel Migration foreign key errorLaravel php artisan migrate:refresh giving a syntax errorSQLSTATE[42S01]: Base table or view already exists or Base table or view already exists: 1050 Tableerror in migrating laravel file to xampp serverSyntax error or access violation: 1064:syntax to use near 'unsigned not null, modelName varchar(191) not null, title varchar(191) not nLaravel cannot create new table field in mysqlLaravel 5.7:Last migration creates table but is not registered in the migration table

은진 송씨 목차 역사 본관 분파 인물 조선 왕실과의 인척 관계 집성촌 항렬자 인구 같이 보기 각주 둘러보기 메뉴은진 송씨세종실록 149권, 지리지 충청도 공주목 은진현