Source code for pyndl.io

"""
pyndl.io
--------

*pyndl.io* provides functions to create event generators from different
sources in order to use them with *pyndl.ndl* to train NDL models or to save
existing events from a DataFrame or a list to a file.

"""

import gzip
from collections.abc import Iterator, Iterable
from pathlib import Path
import itertools
import warnings

import pandas as pd


[docs]def events_from_file(event_path, compression="gzip", start=0, step=1): """ Yields events for all events in a gzipped event file. Parameters ---------- event_path : str path to gzipped event file compression : str indicates whether the events should be read from gunzip file or not can be {"gzip" or None} start: int first event to read step: int slice every step-th event (useful for parallel computations) Yields ------ cues, outcomes : list, list a tuple of two lists containing cues and outcomes """ if compression == "gzip": event_file = gzip.open(event_path, 'rt') elif compression is None: event_file = open(event_path, 'rt') else: raise ValueError("compression needs to be 'gzip' or None") try: # skip header event_file.readline() for line in itertools.islice(event_file, start, None, step): entries = line.strip('\n').split('\t') if len(entries) == 2: cues, outcomes = entries frequency = 1 else: cues, outcomes, frequency = entries cues = cues.split('_') outcomes = outcomes.split('_') for i in range(int(frequency)): yield (cues, outcomes) finally: event_file.close()
[docs]def events_to_file(events, file_path, delimiter="\t", compression="gzip", columns=("cues", "outcomes"), compatible=False): """ Writes events to a file Parameters ---------- events : pandas.DataFrame or Iterator or Iterable a pandas DataFrame with one event per row and one colum with the cues and one column with the outcomes or a list of cues and outcomes as strings or a list of a list of cues and a list of outcomes which should be written to a file file_path: str path to where the file should be saved delimiter: str Seperator which should be used. Default ist a tab compression : str indicates whether the events should be read from gunzip file or not can be {"gzip" or None} columns: tuple a tuple of column names compatible: bool if true add a third frequency column (all ones) for compatibility with ndl2 """ if isinstance(events, pd.DataFrame): events = events_from_dataframe(events) elif isinstance(events, (Iterator, Iterable)): events = events_from_list(events) else: raise ValueError("events should either be a pd.DataFrame or an Iterator or an Iterable.") if compression == "gzip": out_file = gzip.open(file_path, 'wt') elif compression is None: out_file = open(file_path, 'wt') else: raise ValueError("compression needs to be 'gzip' or None") try: legacy_columns = ('Cues', 'Outcomes', 'Frequency') if compatible and columns != legacy_columns: warnings.warn(f"events_to_file sets the columns to the legacy names '{legacy_columns}' for ndl2 compatibility.\n" "Remove the warning by setting the columns parameter explicitly to this value.") columns = legacy_columns out_file.write("{}\n".format(delimiter.join(columns))) for cues, outcomes in events: if isinstance(cues, list) and isinstance(outcomes, list): cues = "_".join(cues) outcomes = "_".join(outcomes) elif not (isinstance(cues, str) and isinstance(outcomes, str)): raise ValueError("cues and outcomes should either be a list or a string.") if compatible: line = "{}{}{}{}1\n".format(cues, delimiter, outcomes, delimiter) else: line = "{}{}{}\n".format(cues, delimiter, outcomes) out_file.write(line) finally: out_file.close()
[docs]def events_from_dataframe(df, columns=("cues", "outcomes")): """ Yields events for all events in a pandas dataframe. Parameters ---------- df : pandas.DataFrame a pandas DataFrame with one event per row and one colum with the cues and one column with the outcomes. columns : tuple a tuple of column names Yields ------ cues, outcomes : list, list a tuple of two lists containing cues and outcomes """ for _, row in df.iterrows(): cues, outcomes = row[list(columns)] cues = cues.split('_') outcomes = outcomes.split('_') yield (cues, outcomes)
[docs]def events_from_list(lst): """ Yields events for all events in a list. Parameters ---------- lst : list of list of str or list of str a list either containing a list of cues as strings and a list of outcomes as strings or a list containing a cue and an outcome string, where cues respectively outcomes are seperated by an undescore Yields ------ cues, outcomes : list, list a tuple of two lists containing cues and outcomes """ for cues, outcomes in lst: if isinstance(cues, str): cues = cues.split('_') if isinstance(outcomes, str): outcomes = outcomes.split('_') yield (cues, outcomes)
[docs]def safe_write_path(path, template='{path.stem}-{counter}{path.suffix}'): """ Create a file path to avoid overwriting existing files. Returns the original path if it does not exist or an incremented version according to the template. This function with the default template creates filenames like pathname/example.png, pathname/example-1.png, pathname/example-2.png, ... Parameters ---------- path: file path template: format string syntax of incremented file name. available variables are counter (int) and path (pathlib.Path). Returns ------- path: the input path or (if file exists) the path with incremented filename. """ if template.format(path=path, counter=1) == template.format(path=path, counter=2): raise ValueError(f"Expects template to change by '{{counter}}', got {template}") new_path = path = Path(path) base_dir = path.parent.resolve() counter = 0 while new_path.exists(): counter = counter + 1 new_path = Path(template.format(path=path, counter=counter)) # is new_path already relative to path's directory? if not str(new_path.resolve()).startswith(str(base_dir)): new_path = path.parent / new_path return new_path