"""
pyndl.preprocess
----------------
*pyndl.preprocess* provides functions in order to preprocess data and create
event files from it.
"""
import collections
import gzip
import multiprocessing
import os
import random
import re
import sys
import time
import warnings
from pyndl import io
[docs]def bandsample(population, sample_size=50000, *, cutoff=5, seed=None,
verbose=False):
"""
Creates a sample of size sample_size out of the population using
band sampling.
"""
# make a copy of the population
# filter all words with freq < cutoff
population = [(word, freq) for word, freq in population.items() if freq >=
cutoff]
if seed is not None:
raise NotImplementedError("Reproducable bandsamples by seeding are not properly implemented yet.")
# shuffle words with same frequency
rand = random.Random(seed)
rand.shuffle(population)
population.sort(key=lambda x: x[1]) # lowest -> highest freq
step = sum(freq for word, freq in population) / sample_size
if verbose:
print("step %.2f" % step)
accumulator = 0
index = 0
sample = list()
while 0 <= index < len(population):
word, freq = population[index]
accumulator += freq
if verbose:
print("%s\t%i\t%.2f" % (word, freq, accumulator))
if accumulator >= step:
sample.append((word, freq))
accumulator -= step
if verbose:
print("add\t%s\t%.2f" % (word, accumulator))
del population[index]
while accumulator >= step and index >= 1:
index -= 1
sample.append(population[index])
accumulator -= step
if verbose:
word, freq = population[index]
print(" add\t%s\t%.2f" % (word, accumulator))
del population[index]
else:
# only add to index if no element was removed
# if element was removed, index points at next element already
index += 1
if verbose and index % 1000 == 0:
print(".", end="")
sys.stdout.flush()
sample = collections.Counter({key: value for key, value in sample})
return sample
[docs]def ngrams_to_word(occurrences, n_chars, outfile, remove_duplicates=True):
"""
Process the occurrences and write them to outfile.
Parameters
----------
occurrences : sequence of (cues, outcomes) tuples
cues and outcomes are both strings where underscores and # are
special symbols.
n_chars : number of characters (e.g. 2 for bigrams, 3 for trigrams, ...)
outfile : file handle
remove_duplicates : bool
if True make cues and outcomes per event unique
"""
for cues, outcomes in occurrences:
if cues and outcomes:
occurrence = cues + '_' + outcomes
else: # take either
occurrence = cues + outcomes
phrase_string = "#" + re.sub("_", "#", occurrence) + "#"
ngrams = (phrase_string[i:(i + n_chars)] for i in
range(len(phrase_string) - n_chars + 1))
if not ngrams or not occurrence:
continue
if remove_duplicates:
ngrams = set(ngrams)
occurrence = "_".join(set(occurrence.split("_")))
outfile.write("{}\t{}\n".format("_".join(ngrams), occurrence))
[docs]def process_occurrences(occurrences, outfile, *,
cue_structure="trigrams_to_word", remove_duplicates=True):
"""
Process the occurrences and write them to outfile.
Parameters
----------
occurrences : sequence of (cues, outcomes) tuples
cues and outcomes are both strings where underscores and # are
special symbols.
outfile : file handle
cue_structure : {'bigrams_to_word', 'trigrams_to_word', 'word_to_word'}
remove_duplicates : bool
if True make cues and outcomes per event unique
"""
if cue_structure == "bigrams_to_word":
ngrams_to_word(occurrences, 2, outfile, remove_duplicates=remove_duplicates)
elif cue_structure == "trigrams_to_word":
ngrams_to_word(occurrences, 3, outfile, remove_duplicates=remove_duplicates)
elif cue_structure == "word_to_word":
for cues, outcomes in occurrences:
if not cues:
continue
if remove_duplicates:
cues = "_".join(set(cues.split("_")))
outcomes = "_".join(set(outcomes.split("_")))
outfile.write("{}\t{}\n".format(cues, outcomes))
else:
raise NotImplementedError('cue_structure=%s is not implemented yet.' % cue_structure)
[docs]def create_event_file(corpus_file,
event_file,
*,
allowed_symbols="*",
context_structure="document",
event_structure="consecutive_words",
event_options=(3,), # number_of_words,
cue_structure="trigrams_to_word",
lower_case=False,
remove_duplicates=True,
verbose=False):
"""
Create an text based event file from a corpus file.
.. warning::
'_', '#', and '\t' are removed from the input of the corpus file and
replaced by a ' ', which is treated as a word boundary.
Parameters
----------
corpus_file : str
path where the corpus file is
event_file : str
path where the output file will be created
allowed_symbols : str, function
all allowed symbols to include in the events as a set of characters.
The set of characters might be explicit or contains Regex character sets.
'_', '#', and TAB are special symbols in the event file and will be removed
automatically. If the corpus file contains these special symbols a warning
will be given.
These examples define the same allowed symbols::
'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ'
'a-zA-Z'
'*'
or a function indicating which characters to include. The function should
return `True`, if the passed character is a allowed symbol.
For example::
lambda chr: chr in "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
lambda chr: ('a' <= chr <= 'z') or ('A' <= chr <= 'Z')
context_structure : {"document", "paragraph", "line"}
event_structure : {"line", "consecutive_words", "word_to_word", "sentence"}
event_options : None or (number_of_words,) or (before, after) or None
in "consecutive words" the number of words of the sliding window as
an integer; in "word_to_word" the number of words before and after the
word of interest each as an integer.
cue_structure: {"trigrams_to_word", "word_to_word", "bigrams_to_word"}
lower_case : bool
should the cues and outcomes be lower cased
remove_duplicates : bool
create unique cues and outcomes per event
verbose : bool
Notes
-----
Breaks / Separators :
What marks parts, where we do not want to continue learning?
* ``---end.of.document---`` string?
* line breaks?
* empty lines?
What do we consider one event?
* three consecutive words?
* one line of the corpus?
* everything between two empty lines?
* everything within one document?
Should the events be connected to the events before and after?
No.
Context :
A context is a whole document or a paragraph within which we will take
(three) consecutive words as occurrences or events. The last words of a
context will not form an occurrence with the first words of the next
context.
Occurrence :
An occurrence or event is will result in one event in the end. This can
be (three) consecutive words, a sentence, or a line in the corpus file.
"""
# define functions to remove special chars / symbols
special_chars = re.compile("[#_\t]")
def _remove_special_chars_without_warning(line):
new_line = special_chars.sub(' ', line)
return new_line
def _remove_special_chars_with_warning(line):
nonlocal remove_special_chars
new_line = special_chars.sub(' ', line)
if line != new_line:
warnings.warn('"_", "#", and "\\t" are special symbols and were therefore removed')
remove_special_chars = _remove_special_chars_without_warning
return new_line
remove_special_chars = _remove_special_chars_with_warning
if callable(allowed_symbols):
def filter_symbols(line, replace):
line_copy = list(line)
for ii in range(len(line)):
if not allowed_symbols(line[ii]):
line_copy[ii] = replace
return ''.join(line_copy)
else:
not_in_symbols = re.compile(f"[^{allowed_symbols:s}]")
def filter_symbols(line, replace):
return not_in_symbols.sub(replace, line)
if event_structure not in ('consecutive_words', 'line', 'word_to_word'):
raise NotImplementedError('This event structure (%s) is not implemented yet.' % event_structure)
if context_structure not in ('document', 'line'):
raise NotImplementedError('This context structure (%s) is not implemented yet.' % context_structure)
if os.path.isfile(event_file):
raise OSError('%s file exits. Remove file and start again.' % event_file)
context_pattern = re.compile("(---end.of.document---|---END.OF.DOCUMENT---)")
if event_structure == 'consecutive_words':
number_of_words, = event_options
elif event_structure == 'word_to_word':
before, after = event_options
def gen_occurrences(words):
"""
Make an occurrence out of consecutive words.
Take all number_of_words number of consecutive words and make an
occurrence out of it.
For words = (A, B, C, D); number_of_words = 3 make: (A, ), (A_B, ),
(A_B_C, ), (B_C_D, ), (C_D, ), (D, )
"""
if event_structure == 'consecutive_words':
occurrences = list()
# can't have more consecutive words than total words
length = min(number_of_words, len(words))
# slide window over list of words
for ii in range(1 - length, len(words)):
# no consecutive words before first word
start = max(ii, 0)
# no consecutive words after last word
end = min(ii + length, len(words))
# append (cues, outcomes) with empty outcomes
occurrences.append(("_".join(words[start:end]), ""))
return occurrences
# for words = (A, B, C, D); before = 2, after = 1
# make: (B, A), (A_C, B), (A_B_D, C), (B_C, D)
elif event_structure == 'word_to_word':
occurrences = list()
for ii, word in enumerate(words):
# words before the word to a maximum of before
cues = words[max(0, ii - before):ii]
# words after the word to a maximum of before
cues.extend(words[(ii + 1):min(len(words), ii + 1 + after)])
# append (cues, outcomes)
occurrences.append(("_".join(cues), word))
return occurrences
elif event_structure == 'line':
# (cues, outcomes) with empty outcomes
return [('_'.join(words), ''), ]
else:
raise ValueError('gen_occurrences should be one of {"consecutive_words", "word_to_word", "line"}')
def process_line(line):
"""processes one line of text."""
if lower_case:
line = line.lower()
# remove special chars
line = remove_special_chars(line)
# replace all weird characters with space
line = filter_symbols(line, replace=' ')
return line
def gen_words(line):
"""generates words out of a line of text."""
return [word.strip() for word in line.split(" ") if word.strip()]
def process_words(words):
"""processes one word and makes an occurrence out of it."""
occurrences = gen_occurrences(words)
process_occurrences(occurrences, outfile,
cue_structure=cue_structure,
remove_duplicates=remove_duplicates)
def process_context(line):
"""called when a context boundary is found."""
if context_structure == 'document':
# remove document marker
line = context_pattern.sub("", line)
return line
with open(corpus_file, "rt") as corpus:
with gzip.open(event_file, "wt") as outfile:
outfile.write("cues\toutcomes\n")
words = []
for ii, line in enumerate(corpus):
if verbose and ii % 100000 == 0:
print(".", end="")
sys.stdout.flush()
outfile.flush()
line = line.strip()
if context_structure == 'line':
line = process_line(line)
words = gen_words(line)
process_words(words)
else:
if context_pattern.search(line) is not None:
# process the first context
context1, *contexts = context_pattern.split(line)
context1 = process_context(context1)
if context1.strip():
context1 = process_line(context1.strip())
words.extend(gen_words(context1))
process_words(words)
# process in between contexts
while len(contexts) > 1:
words = []
context1, *contexts = contexts
context1 = process_context(context1)
if context1.strip():
context1 = process_line(context1.strip())
words.extend(gen_words(context1))
process_words(words)
# add last part to next context
context1 = contexts[0]
context1 = process_context(context1)
if context1.strip():
context1 = process_line(context1.strip())
words.extend(gen_words(context1))
else:
line = process_line(line)
words.extend(gen_words(line))
# write the last context (the rest) when context_structure is not
# 'line'
if context_structure != 'line':
process_words(words)
[docs]class JobFilter():
# pylint: disable=E0202,missing-docstring
"""
Stores the persistent information over several jobs and exposes a job
method that only takes the varying parts as one argument.
.. note::
Using a closure is not possible as it is not pickable / serializable.
"""
[docs] @staticmethod
def return_empty_string():
return ''
def __init__(self, keep_cues, keep_outcomes, remove_cues, remove_outcomes,
cue_map, outcome_map):
if ((cue_map is not None and remove_cues is not None) or
(cue_map is not None and keep_cues != 'all') or
(remove_cues is not None and keep_cues != 'all')):
raise ValueError('You can either specify keep_cues, remove_cues, or cue_map.')
if ((outcome_map is not None and remove_outcomes is not None) or
(outcome_map is not None and keep_outcomes != 'all') or
(remove_outcomes is not None and keep_outcomes != 'all')):
raise ValueError('You can either specify keep_outcomes, remove_outcomes, or outcome_map.')
if cue_map is not None:
self.cue_map = collections.defaultdict(self.return_empty_string, cue_map)
self.process_cues = self.process_cues_map
elif remove_cues is not None:
self.remove_cues = set(remove_cues)
self.process_cues = self.process_cues_remove
elif keep_cues == 'all':
self.keep_cues = 'all'
self.process_cues = self.process_cues_all
else:
self.keep_cues = keep_cues
self.process_cues = self.process_cues_keep
if outcome_map is not None:
self.outcome_map = collections.defaultdict(self.return_empty_string, outcome_map)
self.process_outcomes = self.process_outcomes_map
elif remove_outcomes is not None:
self.remove_outcomes = set(remove_outcomes)
self.process_outcomes = self.process_outcomes_remove
elif keep_outcomes == 'all':
self.keep_outcomes = 'all'
self.process_outcomes = self.process_outcomes_all
else:
self.keep_outcomes = set(keep_outcomes)
self.process_outcomes = self.process_outcomes_keep
[docs] def process_cues(self, cues):
raise NotImplementedError("Needs to be implemented or assigned by a specific method.")
[docs] def process_cues_map(self, cues):
cues = [self.cue_map[cue] for cue in cues]
return [cue for cue in cues if cue]
[docs] def process_cues_remove(self, cues):
return [cue for cue in cues if cue not in self.remove_cues]
[docs] def process_cues_keep(self, cues):
return [cue for cue in cues if cue in self.keep_cues]
[docs] def process_cues_all(self, cues):
return cues
[docs] def process_outcomes(self, outcomes):
raise NotImplementedError("Needs to be implemented or assigned by a specific method.")
[docs] def process_outcomes_map(self, outcomes):
outcomes = [self.outcome_map[outcome] for outcome in outcomes]
return [outcome for outcome in outcomes if outcome]
[docs] def process_outcomes_remove(self, outcomes):
return [outcome for outcome in outcomes if outcome not in self.remove_outcomes]
[docs] def process_outcomes_keep(self, outcomes):
return [outcome for outcome in outcomes if outcome in self.keep_outcomes]
[docs] def process_outcomes_all(self, outcomes):
return outcomes
[docs] def job(self, line):
try:
cues, outcomes = line.strip('\n').split("\t")
except ValueError:
raise ValueError("tabular event file need to have two tab separated columns")
cues = cues.split("_")
outcomes = outcomes.split("_")
cues = self.process_cues(cues)
outcomes = self.process_outcomes(outcomes)
# no cues left?
# NOTE: We want to keep events with no outcomes as this is the
# background for the cues in that events.
if not cues:
return None
processed_line = ("%s\t%s\n" % ("_".join(cues), "_".join(outcomes)))
return processed_line
[docs]def filter_event_file(input_event_file, output_event_file, *,
keep_cues="all", keep_outcomes="all",
remove_cues=None, remove_outcomes=None,
cue_map=None, outcome_map=None,
n_jobs=1, number_of_processes=None, chunksize=100000,
verbose=False):
"""
Filter an event file by a list or a map of cues and outcomes.
Parameters
----------
You can either use keep_*, remove_*, or map_*.
input_event_file : str
path where the input event file is
output_event_file : str
path where the output file will be created
keep_cues : "all" or sequence of str
list of all cues that should be kept
keep_outcomes : "all" or sequence of str
list of all outcomes that should be kept
remove_cues : None or sequence of str
list of all cues that should be removed
remove_outcomes : None or sequence of str
list of all outcomes that should be removed
cue_map : dict
maps every cue as key to the value. Removes all cues that do not have a
key. This can be used to map several different cues to the same cue or
to rename cues.
outcome_map : dict
maps every outcome as key to the value. Removes all outcome that do not have a
key. This can be used to map several different outcomes to the same
outcome or to rename outcomes.
n_jobs : int
number of threads to use
chunksize : int
number of chunks per submitted job, should be around 100000
Notes
-----
It will keep all cues that are within the event and that (for a human
reader) might clearly belong to a removed outcome. This is on purpose and
is the expected behaviour as these cues are in the context of this outcome.
If an event has no cues it gets removed, but if an event has no outcomes it
is still present in order to capture the background rate of that cues.
"""
if number_of_processes is not None:
warnings.warn("Parameter `number_of_processes` is renamed to `n_jobs`. The old name "
"will stop working with v0.9.0.",
DeprecationWarning, stacklevel=2)
n_jobs = number_of_processes
job = JobFilter(keep_cues, keep_outcomes, remove_cues, remove_outcomes,
cue_map, outcome_map)
with multiprocessing.Pool(n_jobs) as pool:
with gzip.open(input_event_file, "rt") as infile:
with gzip.open(output_event_file, "wt") as outfile:
# copy header
outfile.write(infile.readline())
for ii, processed_line, in enumerate(pool.imap(job.job, infile,
chunksize=chunksize)):
if processed_line is not None:
outfile.write(processed_line)
if verbose and ii % 100000 == 0:
print('.', end='')
sys.stdout.flush()
################
# Preprocessing
################
MAGIC_NUMBER = 14159265
CURRENT_VERSION_WITH_FREQ = 215
CURRENT_VERSION = 2048 + 215
[docs]def read_binary_file(binary_file_path):
with open(binary_file_path, "rb") as binary_file:
magic_number = to_integer(binary_file.read(4))
if not magic_number == MAGIC_NUMBER:
raise ValueError('Header does not match the magic number')
version = to_integer(binary_file.read(4))
if version == CURRENT_VERSION:
pass
else:
raise ValueError('Version is incorrectly specified')
nr_of_events = to_integer(binary_file.read(4))
for _ in range(nr_of_events):
# Cues
number_of_cues = to_integer(binary_file.read(4))
cue_ids = [to_integer(binary_file.read(4)) for ii in range(number_of_cues)]
# outcomes
number_of_outcomes = to_integer(binary_file.read(4))
outcome_ids = [to_integer(binary_file.read(4)) for ii in range(number_of_outcomes)]
yield (cue_ids, outcome_ids)
[docs]def to_bytes(int_):
return int_.to_bytes(4, 'little')
[docs]def to_integer(byte_):
return int.from_bytes(byte_, "little")
[docs]def write_events(events, filename, *, start=0, stop=4294967295, remove_duplicates=None):
"""
Write out a list of events to a disk file in binary format.
Parameters
----------
events : iterator of (cue_ids, outcome_ids) tuples called event
filename : string
start : first event to write (zero based index)
stop : last event to write (zero based index; excluded)
remove_duplicates : {None, True, False}
if None though a ValueError when the same cue is present multiple times
in the same event; True make cues and outcomes unique per event; False
keep multiple instances of the same cue or outcome (this is usually not
preferred!)
Returns
-------
number_events : int
actual number of events written to file
Notes
-----
The **binary format** as the following structure::
8 byte header
nr of events
nr of cues in first event
ids for every cue
nr of outcomes in first event
ids for every outcome
nr of cues in second event
...
Raises
------
StopIteration : events generator is exhausted before stop is reached
"""
with open(filename, "wb") as out_file:
# 8 bytes header
out_file.write(to_bytes(MAGIC_NUMBER))
out_file.write(to_bytes(CURRENT_VERSION))
# events
# estimated number of events (will be rewritten if the actual number
# differs)
n_events_estimate = stop - start
out_file.write(to_bytes(n_events_estimate))
n_events = 0
for ii, event in enumerate(events):
if ii < start:
continue
if ii >= stop:
break
n_events += 1
cue_ids, outcome_ids = event
if remove_duplicates is None:
if len(cue_ids) != len(set(cue_ids)) or len(outcome_ids) != len(set(outcome_ids)):
raise ValueError(''.join([
'event %i does not have unique cues or outcomes.'
'Use remove_duplicates=True in order to force unique cues and outcomes.'
'Use remove_duplicates=False to allow the same cue or outcome multiple'
'times in the same event (not recommended)']) % ii)
elif remove_duplicates:
cue_ids = set(cue_ids)
outcome_ids = set(outcome_ids)
else:
pass
# cues in event
out_file.write(to_bytes(len(cue_ids)))
for cue_id in cue_ids:
out_file.write(to_bytes(cue_id))
# outcomes in event
out_file.write(to_bytes(len(outcome_ids)))
for outcome_id in outcome_ids:
out_file.write(to_bytes(outcome_id))
if n_events != n_events_estimate and not n_events == 0:
# the generator was exhausted earlier
out_file.seek(8)
out_file.write(to_bytes(n_events))
raise StopIteration(("event generator was exhausted before stop", n_events))
if n_events == 0:
os.remove(filename)
return n_events
[docs]def event_generator(event_file, cue_id_map, outcome_id_map, *, sort_within_event=False):
for cues, outcomes in io.events_from_file(event_file):
# uses list and not generators; as generators can only be traversed once
event = ([cue_id_map[cue] for cue in cues],
[outcome_id_map[outcome] for outcome in outcomes])
if sort_within_event:
cue_ids, outcome_ids = event
cue_ids = list(cue_ids)
cue_ids.sort()
outcome_ids = list(outcome_ids)
outcome_ids.sort()
event = (cue_ids, outcome_ids)
yield event
def _job_binary_event_file(*,
file_name,
event_file,
cue_id_map,
outcome_id_map,
sort_within_event,
start,
stop,
remove_duplicates):
# create generator which is not pickable
events = event_generator(event_file, cue_id_map, outcome_id_map, sort_within_event=sort_within_event)
n_events = write_events(events, file_name, start=start, stop=stop, remove_duplicates=remove_duplicates)
return n_events
[docs]def create_binary_event_files(event_file,
path_name,
cue_id_map,
outcome_id_map,
*,
sort_within_event=False,
n_jobs=2,
events_per_file=10000000,
overwrite=False,
remove_duplicates=None,
verbose=False):
"""
Creates the binary event files for a tabular cue outcome corpus.
Parameters
----------
event_file : str
path to tab separated text file that contains all events in a cue
outcome table.
path_name : str
folder name where to store the binary event files
cue_id_map : dict (str -> int)
cue to id map
outcome_id_map : dict (str -> int)
outcome to id map
sort_within_event : bool
should we sort the cues and outcomes within the event
n_jobs : int
number of threads to use
events_per_file : int
Number of events in each binary file. Has to be larger than 1
overwrite : bool
overwrite files if they exist
remove_duplicates : {None, True, False}
if None though a ValueError when the same cue is present multiple times
in the same event; True make cues and outcomes unique per event; False
keep multiple instances of the same cue or outcome (this is usually not
preferred!)
verbose : bool
Returns
-------
number_events : int
sum of number of events written to binary files
"""
# pylint: disable=missing-docstring
if events_per_file < 2:
raise ValueError("events_per_file has to be larger than 1")
if not os.path.isdir(path_name):
if verbose:
print("create event file folder '%s'" % path_name)
os.mkdir(path_name, 0o773)
elif not overwrite:
raise IOError("folder %s exists and overwrite is False" % path_name)
else:
if verbose:
print("removing event files in '%s'" % path_name)
for file_name in os.listdir(path_name):
if "events_0_" in file_name:
os.remove(os.path.join(path_name, file_name))
number_events = 0
with multiprocessing.Pool(n_jobs) as pool:
def _error_callback(error):
if isinstance(error, StopIteration):
_, result = error.value
nonlocal number_events
number_events += result # pylint: disable=undefined-variable
pool.close()
else:
raise error
def _callback(result):
nonlocal number_events
number_events += result
if verbose:
print("finished job")
sys.stdout.flush()
ii = 0
while True:
kwargs = {
"file_name": os.path.join(path_name, "events_0_%i.dat" % ii),
"event_file": event_file,
"cue_id_map": cue_id_map,
"outcome_id_map": outcome_id_map,
"sort_within_event": sort_within_event,
"start": ii*events_per_file,
"stop": (ii+1)*events_per_file,
"remove_duplicates": remove_duplicates,
}
try:
result = pool.apply_async(_job_binary_event_file,
kwds=kwargs,
callback=_callback,
error_callback=_error_callback)
if verbose:
print("submitted job %i" % ii)
except ValueError as error:
# someone has closed the pool with the correct error callback
if error.args[0] == 'Pool not running':
if verbose:
print("reached end of events")
break # out of while True
else:
raise error
ii += 1
# only start jobs in chunks of 4*n_jobs
if ii % (n_jobs*4) == 0:
while True:
if result.ready():
break
time.sleep(1.0) # check every second
if verbose:
print('c')
sys.stdout.flush()
# wait until all jobs are done
pool.close()
pool.join()
if verbose:
print("finished all jobs.\n")
return number_events
# for example code see function test_preprocess in file
# ./tests/test_preprocess.py.