Source code for pyndl.activation

"""
pyndl.activation
----------------

*pyndl.activation* provides the functionality to estimate activation of a
trained ndl model for given events. The trained ndl model is thereby
represented as the outcome-cue weights.
"""
import multiprocessing as mp
import ctypes
from collections import defaultdict, OrderedDict
import warnings

import numpy as np
import xarray as xr

from . import io


# pylint: disable=W0621
[docs]def activation(events, weights, *, n_jobs=1, number_of_threads=None, remove_duplicates=None, ignore_missing_cues=False): """ Estimate activations for given events in event file and outcome-cue weights. Memory overhead for multiprocessing is one copy of weights plus a copy of cues for each thread. Parameters ---------- events : generator or str generates cues, outcomes pairs or the path to the event file weights : xarray.DataArray or dict[dict[float]] the xarray.DataArray needs to have the dimensions 'outcomes' and 'cues' the dictionaries hold weight[outcome][cue]. n_jobs : int a integer giving the number of threads in which the job should executed remove_duplicates : {None, True, False} if None raise a ValueError when the same cue is present multiple times in the same event; True make cues unique per event; False keep multiple instances of the same cue (this is usually not preferred!) ignore_missing_cues : {True, False} if True function ignores cues which are in the test dataset but not in the weight matrix if False raises a KeyError for cues which are not in the weight matrix Returns ------- activations : xarray.DataArray with dimensions 'outcomes' and 'events'. Contains coords for the outcomes. returned if weights is instance of xarray.DataArray or activations : dict of numpy.arrays the first dict has outcomes as keys and dicts as values the list has a activation value per event returned if weights is instance of dict """ if number_of_threads is not None: warnings.warn("Parameter `number_of_threads` is renamed to `n_jobs`. The old name " "will stop working with v0.9.0.", DeprecationWarning, stacklevel=2) n_jobs = number_of_threads if isinstance(events, str): events = io.events_from_file(events) events = (cues for cues, outcomes in events) if remove_duplicates is None: def check_no_duplicates(cues): if len(cues) != len(set(cues)): raise ValueError('cues needs to be unique: "{}"; use ' 'remove_duplicates=True'.format(' '.join(cues))) else: return set(cues) events = (check_no_duplicates(cues) for cues in events) elif remove_duplicates is True: events = (set(cues) for cues in events) if isinstance(weights, xr.DataArray): cues = weights.coords["cues"].values.tolist() outcomes = weights.coords["outcomes"].values.tolist() if not weights.values.shape == (len(outcomes), len(cues)): raise ValueError('dimensions of weights are wrong. Probably you need to transpose the matrix') cue_map = OrderedDict(((cue, ii) for ii, cue in enumerate(cues))) if ignore_missing_cues: event_cue_indices_list = (tuple(cue_map[cue] for cue in event_cues if cue in cues) for event_cues in events) else: event_cue_indices_list = (tuple(cue_map[cue] for cue in event_cues) for event_cues in events) # pylint: disable=W0621 activations = _activation_matrix(list(event_cue_indices_list), weights.values, n_jobs) return xr.DataArray(activations, coords={ 'outcomes': outcomes }, dims=('outcomes', 'events')) elif isinstance(weights, dict): assert n_jobs == 1, "Estimating activations with multiprocessing is not implemented for dicts." activations = defaultdict(lambda: np.zeros(len(events))) events = list(events) for outcome, cue_dict in weights.items(): _activations = activations[outcome] for row, cues in enumerate(events): for cue in cues: _activations[row] += cue_dict[cue] return activations else: raise ValueError("Weights other than xarray.DataArray or dicts are not supported.")
def _init_mp_activation_matrix(weights_, weights_shape_, activations_, activations_shape_): """ Private helper function for multiprocessing in _activation_matrix. Initializes shared variables weights and activations. """ # pylint: disable=C0103, W0621, W0601 global weights, activations weights = np.ctypeslib.as_array(weights_) weights.shape = weights_shape_ activations = np.ctypeslib.as_array(activations_) activations.shape = activations_shape_ def _run_mp_activation_matrix(event_index, cue_indices): """ Private helper function for multiprocessing in _activation_matrix. Calculate activation for all outcomes while a event. """ activations[:, event_index] = weights[:, cue_indices].sum(axis=1) def _activation_matrix(indices_list, weights, n_jobs): """ Estimate activation for indices in weights Memory overhead for multiprocessing is one copy of weights plus a copy of cues for each thread. Parameters ---------- indices_list : list[int] events as cue indices in weights weights : numpy.array weight matrix with shape (outcomes, cues) n_jobs : int Returns ------- activation_matrix : numpy.array estimated activations as matrix with shape (outcomes, events) """ assert n_jobs >= 1, "Can't run with less than 1 thread" activations_dim = (weights.shape[0], len(indices_list)) if n_jobs == 1: activations = np.empty(activations_dim, dtype=np.float64) for row, event_cues in enumerate(indices_list): activations[:, row] = weights[:, event_cues].sum(axis=1) return activations else: shared_activations = mp.RawArray(ctypes.c_double, int(np.prod(activations_dim))) weights = np.ascontiguousarray(weights) shared_weights = mp.sharedctypes.copy(np.ctypeslib.as_ctypes(np.float64(weights))) initargs = (shared_weights, weights.shape, shared_activations, activations_dim) with mp.Pool(n_jobs, initializer=_init_mp_activation_matrix, initargs=initargs) as pool: pool.starmap(_run_mp_activation_matrix, enumerate(indices_list)) activations = np.ctypeslib.as_array(shared_activations) activations.shape = activations_dim return activations