Source code for holoviews.core.data.multipath

import numpy as np

from .. import util
from ..element import Element
from ..ndmapping import NdMapping, item_check, sorted_context
from .dictionary import DictInterface
from .interface import Interface, DataError


[docs]class MultiInterface(Interface): """ MultiInterface allows wrapping around a list of tabular datasets including dataframes, the columnar dictionary format or 2D tabular NumPy arrays. Using the split method the list of tabular data can be split into individual datasets. The interface makes the data appear a list of tabular datasets as a single dataset. The interface may be used to represent geometries so the behavior depends on the type of geometry being represented. """ types = () datatype = 'multitabular' subtypes = ['dictionary', 'dataframe', 'array', 'dask'] geom_types = ['Polygon', 'Ring', 'Line', 'Point'] multi = True @classmethod def init(cls, eltype, data, kdims, vdims): from ...element import Polygons, Path new_data = [] dims = {'kdims': eltype.kdims, 'vdims': eltype.vdims} if kdims is not None: dims['kdims'] = kdims if vdims is not None: dims['vdims'] = vdims if (isinstance(data, list) and len(data) and all(isinstance(d, tuple) and all(util.isscalar(v) for v in d) for d in data)): data = [data] elif not isinstance(data, list): interface = [Interface.interfaces.get(st).applies(data) for st in cls.subtypes if st in Interface.interfaces] if (interface or isinstance(data, tuple)) and issubclass(eltype, Path): data = [data] else: raise ValueError('MultiInterface data must be a list of tabular data types.') prev_interface, prev_dims = None, None for d in data: datatype = cls.subtypes if isinstance(d, dict): if Polygons._hole_key in d: datatype = [dt for dt in datatype if hasattr(Interface.interfaces.get(dt), 'has_holes')] geom_type = d.get('geom_type') if geom_type is not None and geom_type not in cls.geom_types: raise DataError("Geometry type '%s' not recognized, " "must be one of %s." % (geom_type, cls.geom_types)) else: datatype = [dt for dt in datatype if hasattr(Interface.interfaces.get(dt), 'geom_type')] d, interface, dims, _ = Interface.initialize(eltype, d, kdims, vdims, datatype=datatype) if prev_interface: if prev_interface != interface: raise DataError('MultiInterface subpaths must all have matching datatype.', cls) if dims['kdims'] != prev_dims['kdims']: raise DataError('MultiInterface subpaths must all have matching kdims.', cls) if dims['vdims'] != prev_dims['vdims']: raise DataError('MultiInterface subpaths must all have matching vdims.', cls) new_data.append(d) prev_interface, prev_dims = interface, dims return new_data, dims, {} @classmethod def validate(cls, dataset, vdims=True): if not dataset.data: return from holoviews.element import Polygons ds = cls._inner_dataset_template(dataset, validate_vdims=vdims) for d in dataset.data: ds.data = d ds.interface.validate(ds, vdims) if isinstance(dataset, Polygons) and ds.interface is DictInterface: holes = ds.interface.holes(ds) if not isinstance(holes, list): raise DataError('Polygons holes must be declared as a list-of-lists.', cls) subholes = holes[0] coords = ds.data[ds.kdims[0].name] splits = np.isnan(coords.astype('float')).sum() if len(subholes) != (splits+1): raise DataError('Polygons with holes containing multi-geometries ' 'must declare a list of holes for each geometry.', cls) @classmethod def geom_type(cls, dataset): from holoviews.element import Polygons, Path, Points if isinstance(dataset, type): eltype = dataset else: eltype = type(dataset) if isinstance(dataset.data, list): ds = cls._inner_dataset_template(dataset) if hasattr(ds.interface, 'geom_type'): geom_type = ds.interface.geom_type(ds) if geom_type is not None: return geom_type if issubclass(eltype, Polygons): return 'Polygon' elif issubclass(eltype, Path): return 'Line' elif issubclass(eltype, Points): return 'Point' @classmethod def _inner_dataset_template(cls, dataset, validate_vdims=True): """ Returns a Dataset template used as a wrapper around the data contained within the multi-interface dataset. """ from . import Dataset vdims = dataset.vdims if getattr(dataset, 'level', None) is None else [] return Dataset(dataset.data[0], datatype=cls.subtypes, kdims=dataset.kdims, vdims=vdims, _validate_vdims=validate_vdims) @classmethod def dimension_type(cls, dataset, dim): if not dataset.data: # Note: Required to make empty datasets work at all (should fix) # Other interfaces declare equivalent of empty array # which defaults to float type return float ds = cls._inner_dataset_template(dataset) return ds.interface.dimension_type(ds, dim) @classmethod def range(cls, dataset, dim): if not dataset.data: return (None, None) ranges = [] ds = cls._inner_dataset_template(dataset) # Backward compatibility for Contours/Polygons level level = getattr(dataset, 'level', None) dim = dataset.get_dimension(dim) if level is not None and dim is dataset.vdims[0]: return (level, level) for d in dataset.data: ds.data = d ranges.append(ds.interface.range(ds, dim)) return util.max_range(ranges) @classmethod def has_holes(cls, dataset): if not dataset.data: return False ds = cls._inner_dataset_template(dataset) for d in dataset.data: ds.data = d if ds.interface.has_holes(ds): return True return False @classmethod def holes(cls, dataset): holes = [] if not dataset.data: return holes ds = cls._inner_dataset_template(dataset) for d in dataset.data: ds.data = d holes += ds.interface.holes(ds) return holes
[docs] @classmethod def isscalar(cls, dataset, dim, per_geom=False): """ Tests if dimension is scalar in each subpath. """ if not dataset.data: return True geom_type = cls.geom_type(dataset) ds = cls._inner_dataset_template(dataset) combined = [] for d in dataset.data: ds.data = d values = ds.interface.values(ds, dim, expanded=False) unique = list(util.unique_iterator(values)) if len(unique) > 1: return False elif per_geom and geom_type != 'Point': continue unique = unique[0] if unique not in combined: if combined: return False combined.append(unique) return True
[docs] @classmethod def select(cls, dataset, selection_mask=None, **selection): """ Applies selectiong on all the subpaths. """ from ...element import Polygons if not dataset.data: return dataset.data elif selection_mask is not None: return [d for b, d in zip(selection_mask, dataset.data) if b] ds = cls._inner_dataset_template(dataset) skipped = (Polygons._hole_key,) if hasattr(ds.interface, 'geo_column'): skipped += (ds.interface.geo_column(ds),) data = [] for d in dataset.data: ds.data = d selection_mask = ds.interface.select_mask(ds, selection) sel = ds.interface.select(ds, selection_mask) is_dict = isinstance(sel, dict) if ((not len(sel) and not is_dict) or (is_dict and any(False if util.isscalar(v) else len(v) == 0 for k, v in sel.items() if k not in skipped))): continue data.append(sel) return data
[docs] @classmethod def select_paths(cls, dataset, index): """ Allows selecting paths with usual NumPy slicing index. """ selection = np.array([{0: p} for p in dataset.data])[index] if isinstance(selection, dict): return [selection[0]] return [s[0] for s in selection]
@classmethod def aggregate(cls, dataset, dimensions, function, **kwargs): raise NotImplementedError('Aggregation currently not implemented') @classmethod def groupby(cls, dataset, dimensions, container_type, group_type, **kwargs): # Get dimensions information dimensions = [dataset.get_dimension(d) for d in dimensions] kdims = [kdim for kdim in dataset.kdims if kdim not in dimensions] # Update the kwargs appropriately for Element group types group_kwargs = {} group_type = list if group_type == 'raw' else group_type if issubclass(group_type, Element): group_kwargs.update(util.get_param_values(dataset)) group_kwargs['kdims'] = kdims group_kwargs.update(kwargs) # Find all the keys along supplied dimensions values = [] for d in dimensions: if not cls.isscalar(dataset, d, True): raise ValueError('MultiInterface can only apply groupby ' 'on scalar dimensions, %s dimension ' 'is not scalar' % d) vals = cls.values(dataset, d, False, True) values.append(vals) values = tuple(values) # Iterate over the unique entries applying selection masks from . import Dataset ds = Dataset(values, dimensions) keys = (tuple(vals[i] for vals in values) for i in range(len(vals))) grouped_data = [] for unique_key in util.unique_iterator(keys): mask = ds.interface.select_mask(ds, dict(zip(dimensions, unique_key))) selection = [data for data, m in zip(dataset.data, mask) if m] group_data = group_type(selection, **group_kwargs) grouped_data.append((unique_key, group_data)) if issubclass(container_type, NdMapping): with item_check(False), sorted_context(False): return container_type(grouped_data, kdims=dimensions) else: return container_type(grouped_data) @classmethod def sample(cls, dataset, samples=[]): raise NotImplementedError('Sampling operation on subpaths not supported')
[docs] @classmethod def shape(cls, dataset): """ Returns the shape of all subpaths, making it appear like a single array of concatenated subpaths separated by NaN values. """ if not dataset.data: return (0, len(dataset.dimensions())) elif cls.geom_type(dataset) != 'Point': return (len(dataset.data), len(dataset.dimensions())) rows, cols = 0, 0 ds = cls._inner_dataset_template(dataset) for d in dataset.data: ds.data = d r, cols = ds.interface.shape(ds) rows += r return rows, cols
[docs] @classmethod def length(cls, dataset): """ Returns the length of the multi-tabular dataset making it appear like a single array of concatenated subpaths separated by NaN values. """ if not dataset.data: return 0 elif cls.geom_type(dataset) != 'Point': return len(dataset.data) length = 0 ds = cls._inner_dataset_template(dataset) for d in dataset.data: ds.data = d length += ds.interface.length(ds) return length
@classmethod def dtype(cls, dataset, dimension): if not dataset.data: return np.dtype('float') ds = cls._inner_dataset_template(dataset) return ds.interface.dtype(ds, dimension) @classmethod def sort(cls, dataset, by=[], reverse=False): by = [dataset.get_dimension(d).name for d in by] if len(by) == 1: sorting = cls.values(dataset, by[0], False).argsort() else: arrays = [dataset.dimension_values(d, False) for d in by] sorting = util.arglexsort(arrays) return [dataset.data[s] for s in sorting] @classmethod def nonzero(cls, dataset): return bool(dataset.data) @classmethod def reindex(cls, dataset, kdims=None, vdims=None): new_data = [] ds = cls._inner_dataset_template(dataset) for d in dataset.data: ds.data = d new_data.append(ds.reindex(kdims, vdims)) return new_data @classmethod def redim(cls, dataset, dimensions): if not dataset.data: return dataset.data new_data = [] ds = cls._inner_dataset_template(dataset) for d in dataset.data: ds.data = d new_data.append(ds.interface.redim(ds, dimensions)) return new_data
[docs] @classmethod def values(cls, dataset, dimension, expanded=True, flat=True, compute=True, keep_index=False): """ Returns a single concatenated array of all subpaths separated by NaN values. If expanded keyword is False an array of arrays is returned. """ if not dataset.data: return np.array([]) values, scalars = [], [] all_scalar = True ds = cls._inner_dataset_template(dataset) geom_type = cls.geom_type(dataset) is_points = geom_type == 'Point' is_geom = dimension in dataset.kdims[:2] for d in dataset.data: ds.data = d dvals = ds.interface.values( ds, dimension, True, flat, compute, keep_index ) scalar = len(util.unique_array(dvals)) == 1 and not is_geom gt = ds.interface.geom_type(ds) if hasattr(ds.interface, 'geom_type') else None if gt is None: gt = geom_type if (gt in ('Polygon', 'Ring') and (not scalar or expanded) and not geom_type == 'Points'): gvals = ds.array([0, 1]) dvals = ensure_ring(gvals, dvals) if scalar and not expanded: dvals = dvals[:1] all_scalar &= scalar scalars.append(scalar) if not len(dvals): continue values.append(dvals) if not is_points and expanded: values.append([np.NaN]) if not values: return np.array([]) elif expanded or (all_scalar and not is_geom): if not is_points and expanded: values = values[:-1] return np.concatenate(values) if values else np.array([]) else: array = np.empty(len(values), dtype=object) array[:] = [a[0] if s else a for s, a in zip(scalars, values)] return array
[docs] @classmethod def split(cls, dataset, start, end, datatype, **kwargs): """ Splits a multi-interface Dataset into regular Datasets using regular tabular interfaces. """ objs = [] if datatype is None: for d in dataset.data[start: end]: objs.append(dataset.clone([d])) return objs elif not dataset.data: return objs geom_type = cls.geom_type(dataset) ds = dataset.clone([]) for d in dataset.data[start:end]: ds.data = [d] if datatype == 'array': obj = ds.array(**kwargs) elif datatype == 'dataframe': obj = ds.dframe(**kwargs) elif datatype in ('columns', 'dictionary'): if hasattr(ds.interface, 'geom_type'): gt = ds.interface.geom_type(ds) if gt is None: gt = geom_type if isinstance(ds.data[0], dict): obj = dict(ds.data[0]) xd, yd = ds.kdims if (geom_type in ('Polygon', 'Ring') or xd not in obj or yd not in obj): obj[xd.name] = ds.interface.values(ds, xd) obj[yd.name] = ds.interface.values(ds, yd) else: obj = ds.columns() if gt is not None: obj['geom_type'] = gt else: raise ValueError("%s datatype not support" % datatype) objs.append(obj) return objs
@classmethod def add_dimension(cls, dataset, dimension, dim_pos, values, vdim): if not len(dataset.data): return dataset.data elif values is None or util.isscalar(values): values = [values]*len(dataset.data) elif not len(values) == len(dataset.data): raise ValueError('Added dimension values must be scalar or ' 'match the length of the data.') new_data = [] template = cls._inner_dataset_template(dataset) array_type = template.interface.datatype == 'array' for d, v in zip(dataset.data, values): template.data = d if array_type: ds = template.clone(template.columns()) else: ds = template new_data.append(ds.interface.add_dimension(ds, dimension, dim_pos, v, vdim)) return new_data @classmethod def iloc(cls, dataset, index): rows, cols = index scalar = np.isscalar(cols) and np.isscalar(rows) template = cls._inner_dataset_template(dataset) if cls.geom_type(dataset) != 'Point': geoms = cls.select_paths(dataset, rows) new_data = [] for d in geoms: template.data = d new_data.append(template.iloc[:, cols].data) return new_data count = 0 new_data = [] for d in dataset.data: template.data = d length = len(template) if np.isscalar(rows): if (count+length) > rows >= count: data = template.iloc[rows-count, cols] return data if scalar else [data.data] elif isinstance(rows, slice): if rows.start is not None and rows.start > (count+length): continue elif rows.stop is not None and rows.stop < count: break start = None if rows.start is None else max(rows.start - count, 0) stop = None if rows.stop is None else min(rows.stop - count, length) if rows.step is not None: dataset.param.warning(".iloc step slicing currently not supported for" "the multi-tabular data format.") slc = slice(start, stop) new_data.append(template.iloc[slc, cols].data) else: sub_rows = [r-count for r in rows if 0 <= (r-count) < (count+length)] new = template.iloc[sub_rows, cols] if len(new): new_data.append(new.data) count += length return new_data
[docs]def ensure_ring(geom, values=None): """Ensure the (multi-)geometry forms a ring. Checks the start- and end-point of each geometry to ensure they form a ring, if not the start point is inserted at the end point. If a values array is provided (which must match the geometry in length) then the insertion will occur on the values instead, ensuring that they will match the ring geometry. Args: geom: 2-D array of geometry coordinates values: Optional array of values Returns: Array where values have been inserted and ring closing indexes """ if values is None: values = geom breaks = np.where(np.isnan(geom.astype('float')).sum(axis=1))[0] starts = [0] + list(breaks+1) ends = list(breaks-1) + [len(geom)-1] zipped = zip(geom[starts], geom[ends], ends, values[starts]) unpacked = tuple(zip(*[(v, i+1) for s, e, i, v in zipped if (s!=e).any()])) if not unpacked: return values inserts, inds = unpacked return np.insert(values, list(inds), list(inserts), axis=0)
Interface.register(MultiInterface)