Source code for holoviews.core.data.dask

from __future__ import absolute_import

import sys
try:
    import itertools.izip as zip
except ImportError:
    pass

import numpy as np
import pandas as pd

from .. import util
from ..dimension import Dimension
from ..element import Element
from ..ndmapping import NdMapping, item_check, OrderedDict, sorted_context
from .interface import Interface
from .pandas import PandasInterface


[docs]class DaskInterface(PandasInterface): """ The DaskInterface allows a Dataset objects to wrap a dask DataFrame object. Using dask allows loading data lazily and performing out-of-core operations on the data, making it possible to work on datasets larger than memory. The DaskInterface covers almost the complete API exposed by the PandasInterface with two notable exceptions: 1) Sorting is not supported and any attempt at sorting will be ignored with an warning. 2) Dask does not easily support adding a new column to an existing dataframe unless it is a scalar, add_dimension will therefore error when supplied a non-scalar value. 4) Not all functions can be easily applied to a dask dataframe so some functions applied with aggregate and reduce will not work. """ types = () datatype = 'dask' default_partitions = 100
[docs] @classmethod def loaded(cls): return 'dask.dataframe' in sys.modules and 'pandas' in sys.modules
[docs] @classmethod def applies(cls, obj): if not cls.loaded(): return False import dask.dataframe as dd return isinstance(obj, (dd.DataFrame, dd.Series))
@classmethod def init(cls, eltype, data, kdims, vdims): import dask.dataframe as dd data, dims, extra = PandasInterface.init(eltype, data, kdims, vdims) if not isinstance(data, dd.DataFrame): data = dd.from_pandas(data, npartitions=cls.default_partitions, sort=False) kdims = [d.name if isinstance(d, Dimension) else d for d in dims['kdims']] # If a key dimension can be found, speculatively reset index # to work around lacking dask support for MultiIndex if any(d for d in kdims if d not in data.columns): reset = data.reset_index() if all(d for d in kdims if d in reset.columns): data = reset return data, dims, extra @classmethod def shape(cls, dataset): return (len(dataset.data), len(dataset.data.columns)) @classmethod def range(cls, dataset, dimension): import dask.dataframe as dd column = dataset.data[dataset.get_dimension(dimension).name] if column.dtype.kind == 'O': column = np.sort(column[column.notnull()].compute()) return (column[0], column[-1]) if len(column) else (None, None) else: return dd.compute(column.min(), column.max()) @classmethod def sort(cls, dataset, by=[], reverse=False): dataset.param.warning('Dask dataframes do not support sorting') return dataset.data @classmethod def values(cls, dataset, dim, expanded=True, flat=True, compute=True, keep_index=False): dim = dataset.get_dimension(dim) data = dataset.data[dim.name] if not expanded: data = data.unique() if keep_index: return data.compute() if compute else data else: return data.compute().values if compute else data.values
[docs] @classmethod def select_mask(cls, dataset, selection): """ Given a Dataset object and a dictionary with dimension keys and selection keys (i.e tuple ranges, slices, sets, lists or literals) return a boolean mask over the rows in the Dataset object that have been selected. """ select_mask = None for dim, k in selection.items(): if isinstance(k, tuple): k = slice(*k) masks = [] alias = dataset.get_dimension(dim).name series = dataset.data[alias] if isinstance(k, slice): if k.start is not None: # Workaround for dask issue #3392 kval = util.numpy_scalar_to_python(k.start) masks.append(kval <= series) if k.stop is not None: kval = util.numpy_scalar_to_python(k.stop) masks.append(series < kval) elif isinstance(k, (set, list)): iter_slc = None for ik in k: mask = series == ik if iter_slc is None: iter_slc = mask else: iter_slc |= mask masks.append(iter_slc) elif callable(k): masks.append(k(series)) else: masks.append(series == k) for mask in masks: if select_mask is not None: select_mask &= mask else: select_mask = mask return select_mask
@classmethod def select(cls, dataset, selection_mask=None, **selection): df = dataset.data if selection_mask is not None: return df[selection_mask] selection_mask = cls.select_mask(dataset, selection) indexed = cls.indexed(dataset, selection) df = df if selection_mask is None else df[selection_mask] if indexed and len(df) == 1 and len(dataset.vdims) == 1: return df[dataset.vdims[0].name].compute().iloc[0] return df @classmethod def groupby(cls, dataset, dimensions, container_type, group_type, **kwargs): index_dims = [dataset.get_dimension(d) for d in dimensions] element_dims = [kdim for kdim in dataset.kdims if kdim not in index_dims] group_kwargs = {} if group_type != 'raw' and issubclass(group_type, Element): group_kwargs = dict(util.get_param_values(dataset), kdims=element_dims) group_kwargs.update(kwargs) # Propagate dataset group_kwargs['dataset'] = dataset.dataset data = [] group_by = [d.name for d in index_dims] groupby = dataset.data.groupby(group_by) if len(group_by) == 1: column = dataset.data[group_by[0]] if column.dtype.name == 'category': try: indices = ((ind,) for ind in column.cat.categories) except NotImplementedError: indices = ((ind,) for ind in column.unique().compute()) else: indices = ((ind,) for ind in column.unique().compute()) else: group_tuples = dataset.data[group_by].itertuples() indices = util.unique_iterator(ind[1:] for ind in group_tuples) for coord in indices: if any(isinstance(c, float) and np.isnan(c) for c in coord): continue if len(coord) == 1: coord = coord[0] group = group_type(groupby.get_group(coord), **group_kwargs) data.append((coord, group)) if issubclass(container_type, NdMapping): with item_check(False), sorted_context(False): return container_type(data, kdims=index_dims) else: return container_type(data) @classmethod def aggregate(cls, dataset, dimensions, function, **kwargs): data = dataset.data cols = [d.name for d in dataset.kdims if d in dimensions] vdims = dataset.dimensions('value', label='name') dtypes = data.dtypes numeric = [c for c, dtype in zip(dtypes.index, dtypes.values) if dtype.kind in 'iufc' and c in vdims] reindexed = data[cols+numeric] inbuilts = {'amin': 'min', 'amax': 'max', 'mean': 'mean', 'std': 'std', 'sum': 'sum', 'var': 'var'} if len(dimensions): groups = reindexed.groupby(cols) if (function.__name__ in inbuilts): agg = getattr(groups, inbuilts[function.__name__])() else: agg = groups.apply(function) df = agg.reset_index() else: if (function.__name__ in inbuilts): agg = getattr(reindexed, inbuilts[function.__name__])() else: raise NotImplementedError df = pd.DataFrame(agg.compute()).T dropped = [] for vd in vdims: if vd not in df.columns: dropped.append(vd) return df, dropped
[docs] @classmethod def unpack_scalar(cls, dataset, data): """ Given a dataset object and data in the appropriate format for the interface, return a simple scalar. """ import dask.dataframe as dd if len(data.columns) > 1 or len(data) != 1: return data if isinstance(data, dd.DataFrame): data = data.compute() return data.iat[0,0]
@classmethod def sample(cls, dataset, samples=[]): data = dataset.data dims = dataset.dimensions('key', label='name') mask = None for sample in samples: if np.isscalar(sample): sample = [sample] for i, (c, v) in enumerate(zip(dims, sample)): dim_mask = data[c]==v if mask is None: mask = dim_mask else: mask |= dim_mask return data[mask] @classmethod def add_dimension(cls, dataset, dimension, dim_pos, values, vdim): data = dataset.data if dimension.name not in data.columns: if not np.isscalar(values): err = ('Dask dataframe does not support assigning ' 'non-scalar value.') raise NotImplementedError(err) data = data.assign(**{dimension.name: values}) return data @classmethod def concat_fn(cls, dataframes, **kwargs): import dask.dataframe as dd return dd.concat(dataframes, **kwargs) @classmethod def dframe(cls, dataset, dimensions): if dimensions: return dataset.data[dimensions].compute() else: return dataset.data.compute() @classmethod def nonzero(cls, dataset): return True
[docs] @classmethod def iloc(cls, dataset, index): """ Dask does not support iloc, therefore iloc will execute the call graph and lose the laziness of the operation. """ rows, cols = index scalar = False if isinstance(cols, slice): cols = [d.name for d in dataset.dimensions()][cols] elif np.isscalar(cols): scalar = np.isscalar(rows) cols = [dataset.get_dimension(cols).name] else: cols = [dataset.get_dimension(d).name for d in index[1]] if np.isscalar(rows): rows = [rows] data = OrderedDict() for c in cols: data[c] = dataset.data[c].compute().iloc[rows].values if scalar: return data[cols[0]][0] return tuple(data.values())
Interface.register(DaskInterface)