"""
pyndl.ndl
---------
*pyndl.ndl* provides functions in order to train NDL models
"""
from collections import defaultdict, OrderedDict
import copy
import getpass
import os
from queue import Queue
import socket
import sys
import tempfile
import threading
import time
import warnings
import types
import cython
import pandas as pd
import numpy as np
import xarray as xr
from . import __version__ as pyndl_version
from . import count
from . import preprocess
from . import ndl_parallel
from . import io
# conditional import as openmp is only compiled for linux
if sys.platform.startswith('linux'):
from . import ndl_openmp
elif sys.platform.startswith('win32'):
pass
elif sys.platform.startswith('darwin'):
pass
warnings.simplefilter('always', DeprecationWarning)
[docs]class WeightDict(defaultdict):
# pylint: disable=missing-docstring
"""
Subclass of defaultdict to represent outcome-cue weights.
Notes
-----
Weight for each outcome-cue combination is 0 per default.
"""
# pylint: disable=W0613
def __init__(self, *args, **kwargs):
super().__init__(lambda: defaultdict(float))
self._attrs = OrderedDict()
if 'attrs' in kwargs:
self.attrs = kwargs['attrs']
else:
self.attrs = {}
@property
def attrs(self):
return self._attrs
@attrs.setter
def attrs(self, attrs):
self._attrs = OrderedDict(attrs)
[docs]def ndl(events, alpha, betas, lambda_=1.0, *,
method='openmp', weights=None,
number_of_threads=None, n_jobs=8, len_sublists=None, n_outcomes_per_job=10,
remove_duplicates=None,
verbose=False, temporary_directory=None,
events_per_temporary_file=10000000):
"""
Calculate the weights for all_outcomes over all events in event_file
given by the files path.
This is a parallel python implementation using numpy, multithreading and
the binary format defined in preprocess.py.
Parameters
----------
events : generator or str
generates cues, outcomes pairs or the path to the event file
alpha : float
saliency of all cues
betas : (float, float)
one value for successful prediction (reward) one for punishment
lambda\\_ : float
method : {'openmp', 'threading'}
weights : None or xarray.DataArray
the xarray.DataArray needs to have the named dimensions 'cues' and 'outcomes'
n_jobs : int
a integer giving the number of threads in which the job should
executed
n_outcomes_per_job : int
a integer giving the length of sublists generated from all outcomes
remove_duplicates : {None, True, False}
if None though a ValueError when the same cue is present multiple times
in the same event; True make cues and outcomes unique per event; False
keep multiple instances of the same cue or outcome (this is usually not
preferred!)
verbose : bool
print some output if True.
temporary_directory : str
path to directory to use for storing temporary files created;
if none is provided, the operating system's default will
be used (/tmp on unix)
events_per_temporary_file: int
Number of events in each temporary binary file. Has to be larger than 1
Returns
-------
weights : xarray.DataArray
with dimensions 'outcomes' and 'cues'. You can lookup the weights
between a cue and an outcome with ``weights.loc[{'outcomes': outcome,
'cues': cue}]`` or ``weights.loc[outcome].loc[cue]``.
"""
# Create temporary file if events is a generator
if isinstance(events, types.GeneratorType):
file_path = tempfile.NamedTemporaryFile().name
io.events_to_file(events, file_path)
events = file_path
del file_path
if number_of_threads is not None:
warnings.warn("Parameter `number_of_threads` is renamed to `n_jobs`. The old name "
"will stop working with v0.9.0.",
DeprecationWarning, stacklevel=2)
n_jobs = number_of_threads
if len_sublists is not None:
warnings.warn("Parameter `len_sublists` is renamed to `n_outcomes_per_job`. The old name "
"will stop working with v0.9.0.",
DeprecationWarning, stacklevel=2)
n_outcomes_per_job = len_sublists
if not (remove_duplicates is None or isinstance(remove_duplicates, bool)):
raise ValueError("remove_duplicates must be None, True or False")
if not isinstance(events, (str, os.PathLike)):
raise ValueError("'events' need to be the path to a gzipped event file not {}".format(type(events)))
weights_ini = weights
wall_time_start = time.perf_counter()
cpu_time_start = time.process_time()
# preprocessing
n_events, cues, outcomes = count.cues_outcomes(events,
n_jobs=n_jobs,
verbose=verbose)
cues = list(cues.keys())
outcomes = list(outcomes.keys())
cue_map = OrderedDict(((cue, ii) for ii, cue in enumerate(cues)))
outcome_map = OrderedDict(((outcome, ii) for ii, outcome in enumerate(outcomes)))
all_outcome_indices = [outcome_map[outcome] for outcome in outcomes]
shape = (len(outcome_map), len(cue_map))
# initialize weights
if weights is None:
weights = np.ascontiguousarray(np.zeros(shape, dtype=np.float64, order='C'))
elif isinstance(weights, xr.DataArray):
old_cues = weights.coords["cues"].values.tolist()
new_cues = list(set(cues) - set(old_cues))
old_outcomes = weights.coords["outcomes"].values.tolist()
new_outcomes = list(set(outcomes) - set(old_outcomes))
cues = old_cues + new_cues
outcomes = old_outcomes + new_outcomes
cue_map = OrderedDict(((cue, ii) for ii, cue in enumerate(cues)))
outcome_map = OrderedDict(((outcome, ii) for ii, outcome in enumerate(outcomes)))
all_outcome_indices = [outcome_map[outcome] for outcome in outcomes]
weights_tmp = np.concatenate((weights.values,
np.zeros((len(new_outcomes), len(old_cues)),
dtype=np.float64, order='C')),
axis=0)
weights_tmp = np.concatenate((weights_tmp,
np.zeros((len(outcomes), len(new_cues)),
dtype=np.float64, order='C')),
axis=1)
weights = np.ascontiguousarray(weights_tmp)
del weights_tmp, old_cues, new_cues, old_outcomes, new_outcomes
else:
raise ValueError('weights need to be None or xarray.DataArray with method=%s' % method)
if any(length > 4294967295 for length in weights.shape):
raise ValueError("Neither number of cues nor outcomes shall exceed 4294967295 "
"for now. See https://github.com/quantling/pyndl/issues/169")
beta1, beta2 = betas
with tempfile.TemporaryDirectory(prefix="pyndl", dir=temporary_directory) as binary_path:
number_events = preprocess.create_binary_event_files(events, binary_path, cue_map,
outcome_map, overwrite=True,
n_jobs=n_jobs,
events_per_file=events_per_temporary_file,
remove_duplicates=remove_duplicates,
verbose=verbose)
assert n_events == number_events, (str(n_events) + ' ' + str(number_events))
binary_files = [os.path.join(binary_path, binary_file)
for binary_file in os.listdir(binary_path)
if os.path.isfile(os.path.join(binary_path, binary_file))]
# sort binary files as they were created
binary_files.sort(key=lambda filename: int(os.path.basename(filename)[9:-4]))
if verbose:
print('start learning...')
# learning
if not weights.data.c_contiguous:
raise ValueError('weights has to be c_contiguous')
if method == 'openmp':
if not sys.platform.startswith('linux'):
raise NotImplementedError("OpenMP is linux only at the moment."
"Use method='threading' instead.")
ndl_openmp.learn_inplace_binary_to_binary(binary_files,
alpha,
beta1,
beta2,
lambda_,
weights,
np.array(all_outcome_indices,
dtype=np.uint32),
n_outcomes_per_job,
n_jobs)
elif method == 'threading':
part_lists = slice_list(all_outcome_indices, n_outcomes_per_job)
working_queue = Queue(len(part_lists))
threads = []
queue_lock = threading.Lock()
def worker():
while True:
with queue_lock:
if working_queue.empty():
break
data = working_queue.get()
ndl_parallel.learn_inplace_binary_to_binary(binary_files,
alpha,
beta1,
beta2,
lambda_,
weights,
data)
with queue_lock:
for partlist in part_lists:
working_queue.put(np.array(partlist, dtype=np.uint32))
for _ in range(n_jobs):
thread = threading.Thread(target=worker)
thread.start()
threads.append(thread)
for thread in threads:
thread.join()
else:
raise ValueError('method needs to be either "threading" or "openmp"')
cpu_time_stop = time.process_time()
wall_time_stop = time.perf_counter()
cpu_time = cpu_time_stop - cpu_time_start
wall_time = wall_time_stop - wall_time_start
if weights_ini is not None:
attrs_to_be_updated = weights_ini.attrs
else:
attrs_to_be_updated = None
attrs = _attributes(events, number_events, alpha, betas, lambda_, cpu_time, wall_time,
__name__ + "." + ndl.__name__, method=method, attrs=attrs_to_be_updated)
# post-processing
weights = xr.DataArray(weights, [('outcomes', outcomes), ('cues', cues)],
attrs=attrs)
return weights
def _attributes(event_path, number_events, alpha, betas, lambda_, cpu_time,
wall_time, function, method=None, attrs=None):
if not isinstance(alpha, (float, int)):
alpha_str = 'varying'
else:
alpha_str = str(alpha)
width = max([len(ss) for ss in (event_path,
str(number_events),
str(alpha),
str(betas),
str(lambda_),
function,
str(method),
socket.gethostname(),
getpass.getuser())])
width = max(19, width)
def _format(value):
return '{0: <{width}}'.format(value, width=width)
new_attrs = {'date': _format(time.strftime("%Y-%m-%d %H:%M:%S")),
'event_path': _format(event_path),
'number_events': _format(number_events),
'alpha': _format(alpha_str),
'betas': _format(str(betas)),
'lambda': _format(str(lambda_)),
'function': _format(function),
'method': _format(str(method)),
'cpu_time': _format(str(cpu_time)),
'wall_time': _format(str(wall_time)),
'hostname': _format(socket.gethostname()),
'username': _format(getpass.getuser()),
'pyndl': _format(pyndl_version),
'numpy': _format(np.__version__),
'pandas': _format(pd.__version__),
'xarray': _format(xr.__version__),
'cython': _format(cython.__version__)}
if attrs is not None:
for key in set(attrs.keys()) | set(new_attrs.keys()):
if key in attrs:
old_val = attrs[key]
else:
old_val = ''
if key in new_attrs:
new_val = new_attrs[key]
else:
new_val = ''
new_attrs[key] = old_val + ' | ' + new_val
return new_attrs
[docs]def dict_ndl(events, alphas, betas, lambda_=1.0, *,
weights=None, inplace=False, remove_duplicates=None,
make_data_array=False, verbose=False):
"""
Calculate the weights for all_outcomes over all events in event_file.
This is a pure python implementation using dicts.
Notes
-----
The metadata will only be stored when `make_data_array` is True and then
`dict_ndl` cannot be used to continue learning. At the moment there is no
proper way to automatically store the meta data into the default dict.
Parameters
----------
events : generator or str
generates cues, outcomes pairs or the path to the event file
alphas : dict or float
a (default)dict having cues as keys and a value below 1 as value
betas : (float, float)
one value for successful prediction (reward) one for punishment
lambda\\_ : float
weights : dict of dicts or xarray.DataArray or None
initial weights
inplace: {True, False}
if True calculates the weightmatrix inplace
if False creates a new weightmatrix to learn on
remove_duplicates : {None, True, False}
if None though a ValueError when the same cue is present multiple times
in the same event; True make cues and outcomes unique per event; False
keep multiple instances of the same cue or outcome (this is usually not
preferred!)
make_data_array : {False, True}
if True makes a xarray.DataArray out of the dict of dicts.
verbose : bool
print some output if True.
Returns
-------
weights : dict of dicts of floats
the first dict has outcomes as keys and dicts as values
the second dict has cues as keys and weights as values
weights[outcome][cue] gives the weight between outcome and cue.
or
weights : xarray.DataArray
with dimensions 'outcomes' and 'cues'. You can lookup the weights
between a cue and an outcome with ``weights.loc[{'outcomes': outcome,
'cues': cue}]`` or ``weights.loc[outcome].loc[cue]``.
"""
if not isinstance(make_data_array, bool):
raise ValueError("make_data_array must be True or False")
if not (remove_duplicates is None or isinstance(remove_duplicates, bool)):
raise ValueError("remove_duplicates must be None, True or False")
wall_time_start = time.perf_counter()
cpu_time_start = time.process_time()
if isinstance(events, str):
event_path = events
else:
event_path = ""
attrs_to_update = None
# weights can be seen as an infinite outcome by cue matrix
# weights[outcome][cue]
if weights is None:
weights = WeightDict()
elif isinstance(weights, WeightDict):
attrs_to_update = weights.attrs
elif isinstance(weights, xr.DataArray):
weights_ini = weights
attrs_to_update = weights_ini.attrs
coords = weights_ini.coords
weights = WeightDict()
for outcome_index, outcome in enumerate(coords['outcomes'].values):
for cue_index, cue in enumerate(coords['cues'].values):
weights[outcome][cue] = weights_ini.item((outcome_index, cue_index))
elif not isinstance(weights, defaultdict):
raise ValueError('weights needs to be either defaultdict or None')
if not inplace:
weights = copy.deepcopy(weights)
beta1, beta2 = betas
all_outcomes = set(weights.keys())
if isinstance(events, str):
events = io.events_from_file(events)
if isinstance(alphas, float):
alpha = alphas
alphas = defaultdict(lambda: alpha)
number_events = 0
for cues, outcomes in events:
number_events += 1
if verbose and number_events % 1000:
print('.', end='')
sys.stdout.flush()
if remove_duplicates is None:
if (len(cues) != len(set(cues)) or
len(outcomes) != len(set(outcomes))):
raise ValueError('cues or outcomes needs to be unique: cues '
'"%s"; outcomes "%s"; use '
'remove_duplicates=True' %
(' '.join(cues), ' '.join(outcomes)))
elif remove_duplicates:
cues = set(cues)
outcomes = set(outcomes)
else:
pass
all_outcomes.update(outcomes)
for outcome in all_outcomes:
association_strength = sum(weights[outcome][cue] for cue in cues)
if outcome in outcomes:
update = beta1 * (lambda_ - association_strength)
else:
update = beta2 * (0 - association_strength)
for cue in cues:
weights[outcome][cue] += alphas[cue] * update
cpu_time_stop = time.process_time()
wall_time_stop = time.perf_counter()
cpu_time = cpu_time_stop - cpu_time_start
wall_time = wall_time_stop - wall_time_start
attrs = _attributes(event_path, number_events, alphas, betas, lambda_, cpu_time, wall_time,
__name__ + "." + dict_ndl.__name__, attrs=attrs_to_update)
if make_data_array:
weights = data_array(weights, attrs=attrs)
else:
weights.attrs = attrs
return weights
[docs]def data_array(weights, *, attrs=None):
"""
Calculate the weights for all_outcomes over all events in event_file.
Parameters
----------
weights : dict of dicts of floats or WeightDict
the first dict has outcomes as keys and dicts as values
the second dict has cues as keys and weights as values
weights[outcome][cue] gives the weight between outcome and cue.
If a dict of dicts is given, attrs is required. If a WeightDict is
given, attrs is optional
attrs : dict
A dictionary of attributes
Returns
-------
weights : xarray.DataArray
with dimensions 'outcomes' and 'cues'. You can lookup the weights
between a cue and an outcome with ``weights.loc[{'outcomes': outcome,
'cues': cue}]`` or ``weights.loc[outcome].loc[cue]``.
"""
if isinstance(weights, xr.DataArray) and weights.dims == ('outcomes', 'cues'):
return weights
if attrs is None:
try:
attrs = weights.attrs
except AttributeError:
raise AttributeError("weights does not have attributes and no attrs "
"argument is given.")
outcomes = list(weights.keys())
cues = set()
for outcome in outcomes:
cues.update(set(weights[outcome].keys()))
cues = list(cues)
weights_dict = weights
shape = (len(outcomes), len(cues))
weights = xr.DataArray(np.zeros(shape), attrs=attrs,
coords={'outcomes': outcomes, 'cues': cues},
dims=('outcomes', 'cues'))
for outcome in outcomes:
for cue in cues:
weights.loc[{"outcomes": outcome, "cues": cue}] = weights_dict[outcome][cue]
return weights
[docs]def slice_list(list_, len_sublists):
r"""
Slices a list in sublists with the length len_sublists.
Parameters
----------
list\_ : list
list which should be sliced in sublists
len_sublists : int
integer which determines the length of the sublists
Returns
-------
seq_list : list of lists
a list of sublists with the length len_sublists
"""
if len_sublists < 1:
raise ValueError("'len_sublists' must be larger then one")
assert len(list_) == len(set(list_))
ii = 0
seq_list = list()
while ii < len(list_):
seq_list.append(list_[ii:ii + len_sublists])
ii = ii + len_sublists
return seq_list