"""
This module contains classes designed to work with irregularly structured datafiles.
It provides ``Parsers`` that are able to extract the replicate identifiers and Ct values as pandas DataFrames
from irregular ``csv`` and ``excel`` files.
In fact, ``qpcr.Parsers`` are already implemented in the :ref:`qpcr.Readers <Readers>` so you will often
be able to read irregular datafiles directly with one of the :ref:`qpcr.Readers <Readers>` and will not have to manually work with the ``qpcr.Parsers``.
Working with "irregular files"
==============================
Any datafile that does not only consist of a replicate identifer and Ct column is called "irregular".
In fact, most excel sheets or csv exports from qPCR machines are actually irregular as they often contain some
information about the run, and melting curve data, and so forth. Such data is not relevant or of interest to ``qpcr``, however,
so we have to extract the data we are intersted in. This is the job of the `qpcr.Parsers`. They read in an irregular datafile and use
a guided-parsing approach to find the relevant sections within the datafile. If your datafiles contain multiple datasets / assays, the
``qpcr.Parsers`` will be able to extract *all of them* and store their extracted data.
There are essentially two ways how they can do this, which are explained below.
"Finding" relevant datasets through ``assay_patterns``
------------------------------------------------------
The Parsers are quipped with a method called ``find_assays`` which locates assays (or more formally "datasets") within the datafile
using ``regex``. Of course, in order to do that they need to know the patterns they are supposed to look for. Some patterns are already pre-specified
in the `qpcr.Parsers.assay_patterns` dictionary and can simply be specified using their key. If your own pattern is not yet pre-defined,
`post an issue on github <https://github.com/NoahHenrikKleinschmidt/qpcr/issues>`_ and supply some samples of how your assays usually appear in your datafiles
alongside with the name of the machine that produces your datafiles.
Of course, you can also manually specify your own regex pattern. The only constraint is that is *must* have *one* capturing group for the assay name.
Note
-----
All assay headers must be located either in the same column or the same row to be identified by a ``Parser``!
Once the assays in your datafile are identified, the data columns belonging to them are searched for. The constraint here is that they must start either
in the row exactly below the assay headers or have at most one single row in between them. Anything else is no good! The data columns *must* be labelled
(i.e. have a header). By default ``Name`` and ``Ct`` are assumed as data column labels / headers, but this can be changed.
Working with *multi-assay* files
--------------------------------
While working with datafiles that contain multiple assays you will likely want to use *all* the assays from the datafile, here's how the ``qpcr.Parsers`` help you do this.
Making indivdual assay files from a multi-assay file
This is the core-business of the ``qpcr.Parsers``. So you can simply set up a Parser, set a saving location using the
Parser's ``save_to`` method and then ``pipe`` your file through. All done at this point. Of course, you can also work with the dataframes directly
using the Parser's ``get`` method. Like this you easily separate the assays which you can then pass to your main analysis as assays and normalisers.
Using a multi-assay file directly for my analysis
So, you want to just feed in your one datafile and expect to get a table with your Delta-Delta-Ct values for all assays against all normalisers?!
Sure, no problem! Parsers will be able to do that, but you can more easily read a multi-assay file using the ``MultiReader`` which allows you (after setup) to simply ``pipe`` through your datafile
and returns immediately a list of your assays-of-interest and normaliser-assays. How does it know which is which though?
That is where the *decorators* come into play which you can learn more about down below.
Working with ``qpcr.Parsers``
============================
Because they are already implemented in the Readers it likely that you will never actually use the Parsers directly. However, working with the Parsers is virtually the same as working with the Readers.
.. code-block:: python
myfile = "my_big_irregular_file.csv"
# setting up the parser
parser = CsvParser()
# we know that the assays / datasets are all named by a scheme
# we can define a regex pattern to match this
mypattern = "qPCR run: ([A-Za-z0-9-. ]+)"
# now we can pipe our file through the parser to get the dataframes of our assays
data = parser.pipe( myfile, assay_pattern = mypattern )
# at this point we have a dict of dataframes with their extracted patterns
Decorators
==========
A ``decorator`` technically is a function that wraps another function when coding. Well, that's not quite the case for the ``qpcr`` decorators but the idea is similar.
Instead of wrapping functions the ``qpcr`` decorators wrap assays in a multi-assay file. What does "wrap" mean? It means they provide meta-data about the assay
in question. What does that mean? There are multiple implemented ``qpcr`` decorators. For irregular multi-assay files the two important decorators are:
``@qpcr:assay`` and ``@qpcr:normaliser`` – is it now clear what they do? They are placed in the
cell **exactly above** the cell where the assay header is located (seriously, anything else won't do!) and tell the ``qpcr.Parsers`` (because all the ``MultiReader`` is doing is setting up some Parsers...)
if a specific assay is an "assay-of-interest" or a "normaliser-assay".
So, let us recap this quickly: ``qpcr.Parsers`` can identify assays either through *de novo* finding using ``regex`` patterns *or* through *decorators*. To tell a Parser to use a specific decorator
for finding assays you can specify the `decorator` argument in ``pipe`` or ``parse`` (pipe wraps read+parse). To specify a decorator like this you only write ``qpcr:assay`` or ``qpcr:normaliser``
(the *key* is bascially the decorator but without the ``@``)
If you work with ``qpcr.Parsers`` directly you can choose if you only wish to extract "assays-of-interest" (decorated as ``@qpcr:assay``) *or* "normalisers", or whatever other decorators are available.
However, this flexibility is not available when calling Parsers indirectly through one of the ``qpcr.Readers``.
+---------------------------+-----------------+------------------------------------------+----------------------------------------------------+
| Decorator | Code-reference | Filetype | Available for / Used by `qpcr.Readers` |
+===========================+=================+==========================================+====================================================+
| any except `qpcr:column` | qpcr:all | Irregular single- or multi-assay files. | `SingleReader`, `MultiReader`, `MultiSheetReader` |
+---------------------------+-----------------+------------------------------------------+----------------------------------------------------+
| @qpcr:assay | qpcr:assay | Irregular single- or multi-assay files. | `SingleReader`, `MultiReader`, `MultiSheetReader` |
+---------------------------+-----------------+------------------------------------------+----------------------------------------------------+
| @qpcr:normaliser | qpcr:normaliser | Irregular single- or multi-assay files. | `SingleReader`, `MultiReader`, `MultiSheetReader` |
+---------------------------+-----------------+------------------------------------------+----------------------------------------------------+
| @qpcr:group | qpcr:group | Horizontal Big Table files | `BigTableReader` |
+---------------------------+-----------------+------------------------------------------+----------------------------------------------------+
| @qpcr | qpcr:column | Horizontal or vertical Big Table files | `BigTableReader` |
+---------------------------+-----------------+------------------------------------------+----------------------------------------------------+
Note
-----
- Just like assay headers, all decorators must be located either in the same column or the same row to be identified by a Parser!
- If you are using ``excel`` you may have to add a single tick ``'`` in front of your decorators.
- When specifying a decorator any non-decorated assay will be *ignored*!
"Custom decorators"
You might be tempted to think that you could also specify your own "decorator pattern" together with your assay patterns.
While this is not strictly a built-in feature, there is a way of making this work. The decorators are stored in a simple dictionary within the ``qpcr.Parsers`` submodule.
Hence, you can add your own entries to this dictionary and then let them be accessed regularly through their keys by the Parsers.
.. code-block:: python
from qpcr import Parsers
# the new decorator should be specific for a certain experiment
mydecorator = "(@qpcr:experiment1-assay|'@qpcr:experiment1-assay)"
# now add the decorator
Parsers.decorators[ "mydecorator" ] = mydecorator
# now the decorator will be available for the Parsers to work with...
parser = ExcelParser()
data = parser.pipe( myfile, decorator = "mydecorator", assay_pattern = mypattern )
"""
import logging
import qpcr
import qpcr.defaults as defaults
import qpcr._auxiliary as aux
import qpcr._auxiliary.warnings as aw
import pandas as pd
import numpy as np
import re
from io import StringIO
from copy import deepcopy
import os
logger = aux.default_logger()
__pdoc__ = {"_CORE_Parser": True}
# this is the dictionary where we store pre-defined
# patterns of headers above assays within the datafiles
# important here is that they must specify a capturing group for the assay name.
assay_patterns = {
"all": r"([A-Za-z0-9.:, ()_\-/]+)",
"Rotor-Gene": r"Quantitative analysis of .+(?<=\()([A-Za-z0-9.:, _\-/]+)",
}
# also store default data-column
# headers associated with a Pattern
assay_pattern_col_names = {"Rotor-Gene": ["Name", "Ct"]}
decorators = {
"qpcr:all": "(@qpcr:|'@qpcr:)",
"qpcr:assay": "(@qpcr:assay\s{0,}|'@qpcr:assay\s{0,})",
"qpcr:normaliser": "(@qpcr:normaliser\s{0,}|'@qpcr:normaliser\s{0,})",
"qpcr:group": "(@qpcr:group\s{0,}|'@qpcr:group\s{0,})",
"qpcr:column": "(@qpcr|'@qpcr)",
}
plain_decorators = {
"qpcr:all": "@qpcr:",
"qpcr:assay": "@qpcr:assay",
"qpcr:normaliser": "@qpcr:normaliser",
"qpcr:group": "@qpcr:group",
"qpcr:column": "@qpcr",
}
# get the standard column headers to use for the
# replicate id and Ct column of the finished dataframes
standard_id_header = defaults.raw_col_names[0]
standard_ct_header = defaults.raw_col_names[1]
default_group_name = defaults.group_name
default_dataset_header = defaults.dataset_header
default_id_header = defaults.id_header
default_ct_header = defaults.ct_header
# set a dummy default value for any np.nan values
# in the column storing the assay headers
# we do this in case the "all" assay_pattern is used without decorators
# in this case any cell would be selected as "nan" also matches the pattern
dummy_blank = "$"
# set up a regex pattern for floats. We require this to vet the Ct columns
# during make_dataframes() calling, because there may be entries wihtin the
# Ct column where np.genfromtext crashes (like when it has spaces in it).
# Somehow, more elgant tweaks directly at genfromtext would not work so we
# brute-force match with regex and replace faulty entires manually with "nan"
# before calling genfromtext.
float_pattern = re.compile("\d+\.?\d*")
class _CORE_Parser:
"""
This is the functional core for the irregular multi-assay file-reader classes.
It handles the regex searching and numpy indexing of relevant column subsets of the datafiles.
"""
__slots__ = [
"_src",
"_pattern",
"_data",
"_assay_indices",
"_assay_names",
"_assay_names_start_indices",
"_assay_names_end_indices",
"_assay_ct_start_indices",
"_assay_ct_end_indices",
"_dfs",
"_ids_were_set",
"_max_assay_name_length",
"_save_loc",
"_transpose",
"_bigtable_range",
"_id_label",
"_ct_label",
"_ids_were_set",
"__dict__",
]
def __init__(self):
self._src = None
self._pattern = None
self._data = None
# the found assays, these will be arrays/lists that store the indices,
self._assay_indices = None # indices of the assay identifiers
self._assay_names = None # names of the assays
self._assay_names_start_indices = None # indices of the rep. id headers
self._assay_names_end_indices = None # indices of the last entry of the rep. id columns
self._assay_ct_start_indices = None # indices of the ct headers
self._assay_ct_end_indices = None # indices of the last entry of the ct columns
# a dictionary to store all assay dataframes
self._dfs = {}
# setup the labels for replicate ids and ct value column headers
self.labels()
# and reset the ids_were_set variable to default False
self._ids_were_set = False
# we must specify a maximum allowed length for the assay names before hand
# (since we're using numpy arrays for storing the names, which require enough open slots to store the characters)
self._max_assay_name_length = 20
# a folder into which the new assay-split datafiles should be stored
self._save_loc = None
# set transpose option in case datasets are stored not on separate row ranges but separate column ranges
self._transpose = False
# set up a BigTable data range
self._bigtable_range = None
def prune(self):
"""
Completely resets the Parser, clearing all data and preset-specifics such as the assay_pattern.
"""
self.__init__()
def clear(self):
"""
Clears all datasets that were extracted.
"""
self._dfs = {}
self._assay_indices = None # indices of the assay identifiers
self._assay_names = None # names of the assays
self._assay_names_start_indices = None # indices of the rep. id headers
self._assay_names_end_indices = None # indices of the last entry of the rep. id columns
self._assay_ct_start_indices = None # indices of the ct headers
self._assay_ct_end_indices = None # indices of the last entry of the ct columns
def transpose(self):
"""
Inverts the `col` index used by `qpcr.Parsers._CORE_Parser.find_assays` and `qpcr.Parsers._CORE_Parser.find_by_decorator`.
By default the `col` refers to a column. After using `transpose` it will be interpreted as a `row`.
Note
----
This is method is dynamic, so repeated calling of `transpose` will keep reverting the interpretation from row to col, back to row, etc.
"""
self._transpose = not self._transpose
def save_to(self, location: str = None):
"""
Sets the location into which the individual assay datafiles should be saved.
Parameters
----------
location : str
The path to a directory where the newly generated assay datafiles shall be saved.
If this directory does not yet exist, it will be automatically made.
"""
if location is not None:
self._save_loc = location
if not os.path.exists(self._save_loc):
os.mkdir(self._save_loc)
return self._save_loc
def get(self, assay: str = None):
"""
Parameters
----------
assay : str
The name of an assay found in the datafile. Available assays can be assessed using the `self.assays` method.
Returns
-------
data : pd.DataFrame or dict
Either a specific pandas dataframe of one of the assays (if an `assay` name was specified)
or the entire dictionary of all found dataframes from all assays.
"""
if assay is not None:
data = self._dfs[assay]
else:
data = self._dfs
return data
def save(self):
"""
Saves the individual assays as separate csv files.
This requires that a saving directory has been set using `self.save_to`.
The files will simply be named according to the assay name (i.e. `ActinB.csv` for instance).
"""
if self._save_loc is None:
e = aw.ParserError("no_save_loc")
logger.error(e)
else:
for assay, df in self._dfs.items():
assay_path = os.path.join(self.save_to(), f"{assay}.csv")
df.to_csv(assay_path, index=False)
def labels(self, id_label: str = default_id_header, ct_label: str = default_ct_header):
"""
Sets the headers for the relevant data columns for each assay within the datafile.
Parameters
----------
id_label : str
The header above the column containing replicate identifiers.
ct_label : str
The header above the column containing the replicates' Ct values.
"""
self._id_label = id_label
self._ct_label = ct_label
self._ids_were_set = True
def assays(self):
"""
Returns
-------
names : list
The names of the found assays of the datafile
"""
return list(self._dfs.keys())
def assay_pattern(self, pattern: str = None, *flags):
"""
Sets up a regex pattern defining the assay declarations within the datafile.
Parameters
----------
pattern : str
A string containing either the key to a predefined pattern from the `assay_patterns` dictionary,
or directly regex pattern.
If a regex pattern is directly provided, that pattern must contain a capturing group
for the assay name that can be extracted.
*flags
Any additional flags to pass to `re.compile()` for the regex pattern
Returns
-------
pattern : re.Pattern
The currently used regex pattern to identify assays within the datafile.
"""
if pattern is not None:
# try to get the pattern from the predefined patterns via key
_pattern = aux.from_kwargs(pattern, None, assay_patterns)
# check if we got a hit, and if so,
# also import default data-column headers if possible
# (provided the Parser hasn't got any yet)
if _pattern is not None and not self._ids_were_set:
self._id_label, self._ct_label = aux.from_kwargs(pattern, (None, None), assay_pattern_col_names)
elif _pattern is None:
_pattern = pattern
# _pattern = pattern if _pattern is None else _pattern
self._pattern = re.compile(_pattern, *flags)
return self._pattern
def max_assay_name_length(self, length=20):
"""
Sets the maximum allowed name length (number of characters) assay names.
Parameters
----------
length : int
The maximum number of characters to store for the assay name.
Default is `length = 20` characters.
"""
self._max_assay_name_length = length
def parse(self, **kwargs):
"""
A wrapper for find_assays+find_columns+make_dataframes
This is the functional core of the Parser's `pipe` method.
Parameters
-------
**kwargs
Any additional keyword argument that will be passed to any of the wrapped methods.
"""
decorator = aux.from_kwargs("decorator", None, kwargs, rm=True)
if decorator is not None:
self.find_by_decorator(decorator=decorator, **kwargs)
else:
self.find_assays(**kwargs)
# ignore if no assays were found (default is false, unless we use multi-assay multi-sheet files)
ignore_empty = aux.from_kwargs("ignore_empty", False, kwargs)
if ignore_empty:
try:
self.find_columns()
self.make_dataframes(**kwargs)
except Exception as e:
logger.info(e)
else:
self.find_columns()
self.make_dataframes(**kwargs)
def find_by_decorator(self, decorator: str, col=0, **kwargs):
"""
Parses through a column of the datafile and finds all assays that are decorated with a specific decorator.
Note that this requires that the decorator is in the cell above the assay header. Also, make sure to specify
an `assay_pattern` to extract the assay name. If no `assay_pattern` is provided, it will simply take the entire cell content.
Parameters
-----------
decorator : str
One of the available `qpcr-decorator`'s for irregular multi-assay files.
Available decorators can be assessed via the `qpcr.Parsers.decorators` dictionary keys.
col : int
The column in which to look for assay identifiers.
By default the first column `col = 0`.
Returns
-------
assay_indices : np.ndarray
The indices (row, col) of all assays found.
names : np.ndarray
The extracted names of all assays found.
"""
# ignore if no assays were found (default is false, unless we use multi-assay multi-sheet files)
ignore_empty = aux.from_kwargs("ignore_empty", False, kwargs)
# get the pattern required (or raise error if invalid decorators are provided)
if decorator not in decorators.keys():
e = aw.ParserError("invalid_decorator", d=decorator, all_d=list(decorators.keys()))
logger.error(e)
raise e
decorator_pattern = re.compile(decorators[decorator])
decorator_indices, decorator_names = self.find_assays(col=col, pattern=decorator_pattern, **kwargs)
# check if decorators were identified
found_indices = decorator_indices.size > 0
if not found_indices:
if not ignore_empty: # if none were found either raise error or ignore
raise aw.ParserError("no_decorators_found")
else:
return
# if no assay_pattern was specified then default to generic "all" to get full cell contents
if self.assay_pattern() is None:
logger.info(aw.ParserError("decorators_but_no_pattern"))
self.assay_pattern("all")
assay_indices = decorator_indices
# get assay indices as the cells IMMEDIATELY BELOW the decorators
# we adjust either col or the row indices depending on the transposition
if self._transpose:
col = col + 1
else:
assay_indices = assay_indices + 1
# get all assay header cells into an array to extract their names
array = self._prep_header_array(col, assay_indices)
# adjust avaliable length of stored assay names
max_length = max(list(map(len, array)))
self.max_assay_name_length(max_length)
names = np.array(["-" * self._max_assay_name_length for _ in range(len(array))]) # we need to pre-specify the max allowed length for the assay names by filling an array with some dummy placeholders ('-')
idx = 0
for entry in array:
match = self.assay_pattern().search(entry)
if match is not None:
name = match.group(1)
names[idx] = name
idx += 1
self._assay_indices = assay_indices
self._assay_names = names
return assay_indices, names
def find_assays(self, col=0, **kwargs):
"""
Parses through a column of the datafile and identifies all indices of cells that match the provided `assay_pattern``.
It stores these values internally and also returns the results as numpy arrays.
Parameters
-----------
col : int
The column in which to look for assay identifiers.
By default the first column `col = 0`.
Returns
-------
indices : np.ndarray
The indices (row, col) of all assays found.
names : np.ndarray
The extracted names of all assays found.
"""
custom_pattern = aux.from_kwargs("pattern", None, kwargs)
if self._pattern is None and custom_pattern is None:
raise aw.ParserError("no_pattern_yet")
pattern_to_use = self._pattern if custom_pattern is None else custom_pattern
array = self._prep_header_array(col=col)
indices = np.zeros(len(array))
names = np.array(["-" * self._max_assay_name_length for _ in range(len(array))]) # we need to pre-specify the max allowed length for the assay names by filling an array with some dummy placeholders ('-')
idx = 0
for entry in array:
match = pattern_to_use.search(entry)
if match is not None:
name = match.group(1)
names[idx] = name
indices[idx] = 1
idx += 1
indices = np.argwhere(indices == 1)
if indices.size == 0:
# ignore if no assays were found (default is false,
# unless we use multi-assay multi-sheet files)
ignore_empty = aux.from_kwargs("ignore_empty", False, kwargs)
if not ignore_empty:
e = aw.ParserError("no_assays_found")
SystemExit(e)
names = names[indices]
names = names.reshape(len(names))
self._assay_indices = indices
self._assay_names = names
return indices, names
def find_columns(self):
"""
Identifies the relevant data column belonging to each assay within the datafile.
"""
# search indices of the starts of id and ct columns
# these are now the row, col coordinates of each name_column header
name_col_starts = self._find_column_starts(label=self._id_label, ref_indices=self._assay_indices)
# these are now the row, col coordinates of each ct_column header
ct_col_starts = self._find_column_starts(label=self._ct_label, ref_indices=self._assay_indices)
# now we need to generate know also the end indices of the datacolumns
name_col_ends = self._find_column_ends(name_col_starts)
# now that we know the end indices for the replicate id column we will adopt the end row indices
# onto the ct column as well (we don't parse through the Ct column because it might have missing
# Ct values intersperced which would prematurely terminate the parsing...)
# (1) we transpose to have all row indices easily accessible in the first line
# (2) we adopt row indices from the transposed name col
# (3) and transpose back to get our final ct end indices
ct_col_ends = deepcopy(np.transpose(ct_col_starts))
name_col_ends_t = np.transpose(name_col_ends)
ct_col_ends[0] = name_col_ends_t[0]
ct_col_ends = np.transpose(ct_col_ends)
# now store the data
self._assay_names_start_indices = name_col_starts
self._assay_names_end_indices = name_col_ends
self._assay_ct_start_indices = ct_col_starts
self._assay_ct_end_indices = ct_col_ends
def make_dataframes(self, allow_nan_ct: bool = True, default_to: float = None, **kwargs):
"""
Generates a set of `pandas DataFrame`s each containing two columns
(one for the replicate identifiers, one for the Ct values)
for subsequent use with the main `qpcr` module.
Parameters
------
allow_nan_Ct : bool
Allows Ct values to be NaN within the final dataframe (if `True`, default).
If no NaN Ct values should be maintained a default value for NaN Ct values must be specified
using `default_to`.
default_to : float
The default value to replace NaN Ct values with.
This is ignored if `allow_nan_ct = True`.
"""
adx = 0
# print(self._assay_names, self._assay_indices, self._assay_names_start_indices, self._assay_names_end_indices)
for assay in self._assay_names:
# get the assay's indices of both replicate id and ct columns
names_start = self._assay_names_start_indices[adx]
names_end = self._assay_names_end_indices[adx]
ct_start = self._assay_ct_start_indices[adx]
ct_end = self._assay_ct_end_indices[adx]
# generate the final index slices from the total array
# of both replicate id (names) and ct columns
names_range, names_col = self._make_index_range(names_start, names_end, crop_first=True)
ct_range, ct_col = self._make_index_range(ct_start, ct_end, crop_first=True)
# get the assay data
assay_names = self._data[names_range, names_col]
assay_cts = self._data[ct_range, ct_col]
# and convert to numeric data
# in case a simply astype(float) fails we resort to matching faulty entries
# individually with regex and then convert these to a readable "nan" format
# and then convert to float using np.genfromtext
assay_cts = self._convert_to_numeric(assay, assay_cts)
# assemble the assay dataframe
assay_df = pd.DataFrame(
{
standard_id_header: assay_names,
standard_ct_header: assay_cts,
}
)
if not allow_nan_ct:
if not isinstance(default_to, (int, float)):
e = aw.ParserError("no_ct_nan_default", d=default_to)
logger.error(e)
raise e
# apply defaulting lambda function
assay_df[standard_ct_header] = assay_df[standard_ct_header].apply(lambda x: x if x == x else default_to)
# and store dataframe
self._dfs.update({assay: assay_df})
adx += 1
if adx == len(self._assay_names):
break
def _convert_to_numeric(self, id, array):
"""
Converts a numpy array to floats.
Either directly using np.genfromtext, or if this fails,
by first prepping using regex and then np.genfromtext
Parameters
-----------
array : np.ndarray
The array to convert.
id : str
The associated identifier of the array to include in the error message
that is procuded denoting which faulty entries were set to NaN...
(or just the first thereof, actually).
"""
try:
array = array.astype(float)
except ValueError as e:
# convert to string first, for regex matching
array = np.array(array, dtype=str)
try:
# we first try to just use genfromtext directly
# since it takes a lot of time to do the regex matching
# so we avoid it if possible...
array = np.genfromtxt(array)
except Exception as e:
logger.debug(e)
logger.info("Failed to convert to numeric data. Attempting to use regex to match faulty entries...")
# first get the indices of all entries that are not floats
# and convert these manually to "nan"
faulties = np.argwhere([float_pattern.match(i) is None for i in array])
array[faulties] = "nan"
# now read the the ct values again as floats
array = np.genfromtxt(array)
# print some info about the faulty entries
bad_value = e.__str__().split(": ")[1]
e = aw.ParserError("found_non_readable_cts", assay=id, bad_value=bad_value)
logger.error(e)
return array
def _make_BigTable_range(self, **kwargs):
"""
Generates a pandas DataFrame of a subsection of an irregular datafile
containing a "big data table" with multiple assays specified in it.
It makes use of the `id_label` specified using `_CORE_Parser.labels` as the
anchor. The resulting dataframe fill contain all rows from the cell where `id_label``
as located until the data is empty.
If additionally `replicates` are specified in the `kwargs`
the starting positions of assay replicates are inferred based on `decorators`.
Note, this only works for `horizontal` Big Tables!
"""
is_horizontal = aux.from_kwargs("is_horizontal", False, kwargs)
# get the main data
data = self._data.astype("str")
ref_col_header = self._id_label
# find big table starting row
idx = np.argwhere(data == ref_col_header)
# vet that we actually found the big table
if idx.size == 0:
e = aw.ParserError("no_bigtable_header", header=ref_col_header)
logger.critical(e)
SystemError(e)
idx = idx.reshape(idx.size)
start, col = idx
idx = 1
while True:
try:
entry = data[start + idx, col]
except Exception as e:
logger.debug(e)
break
if entry == "nan":
break
idx += 1
end = start + idx
# put a -1 offset on the rows if horziontal, as the decorators
# are in the row above the actual column headers.
if is_horizontal:
start -= 1
# generate bigtable data range and store
relevant_data = data[start:end, :]
self._bigtable_range = relevant_data
def _infer_BigTable_groups(self, **kwargs):
"""
Gets the group ranges from the bigtable datarange.
Note, this is only used in case of horizontal big tables.
"""
# get the relevant data
array = self._bigtable_range
maxrows, allcols = array.shape
ignore_empty = aux.from_kwargs("ignore_empty", False, kwargs)
# get and vet replicates
replicates = aux.from_kwargs("replicates", None, kwargs, rm=True)
if replicates is None:
e = aw.ParserError("bigtable_no_replicates")
logger.error(e)
SystemExit(e)
replicates, names = self._vet_replicates(ignore_empty, replicates, array, **kwargs)
rdx = 0 # counter for the replicate groups
data_array = None # this array will store the entire transposed data
decorator = plain_decorators["qpcr:group"]
# now get the assays in question
# we already vetted if the file is properly
# decorated during _vet_replicates
# find decorated starting columns
# get only column indices
indices = np.argwhere(array == decorator)
indices = indices[:, 1]
if indices.size == 1:
indices = [indices]
# get total slice of bigtable rows
rows = slice(1, maxrows)
# iterate over each group
for col in indices:
rep = replicates[rdx]
# get data columns
cols = slice(col, col + rep)
# get data
data = array[rows, cols]
# rename data cols if names are provided
if names is not None:
name = names[rdx]
data[0, :] = name
# concatenate data into a single array
if data_array is None:
data_array = data
else:
data_array = np.concatenate(
(data_array, data),
axis=1,
)
rdx += 1
# remove groups from the data array
groups = data_array[0, :]
data_array = data_array[1:, :]
# Actually, right here, instead of having to infer our own replicate
# names thare are then just group0 group0 group0 group1 ...
# We can simply use the sample repeat / tile approach we used to make the
# group1 etc. replicate names, based on the ACTUAL groups (like the ones we
# have just split off from the data -> their first row are already the replicate
# identifiers we just use those directly... )
# reshape data into a single column
data_array = np.concatenate(data_array, axis=0)
# now repeat the groups to match the stacked new data column
groups_tiled = np.tile(groups, data_array.size // groups.size)
# now get the dataset id column
id_col = self._id_label
id_start = np.where(array == id_col)
id_row, id_col = id_start
id_rows = slice(int(id_row + 1), maxrows)
id_col = array[id_rows, id_col]
# repeat dataset ids to match stacked new data column
ids_tiled = np.repeat(id_col, groups.size)
# assemble all data
total_data = [ids_tiled, groups_tiled, data_array]
headers = [default_dataset_header, standard_id_header, standard_ct_header]
# check for qpcr column and if present, get and adjust shape
self._BigTable_horizontal_qpcr_col(array, maxrows, groups, total_data, headers)
# combine the three columns (dataset id, groups, and Ct (actual data_array))
data_array = np.stack(total_data, axis=1)
# add default names into the first row
data_array = np.concatenate(([headers], data_array), axis=0)
# actually return the finished array
return data_array
def _BigTable_horizontal_qpcr_col(self, array, maxrows, groups, total_data, headers):
"""
Checks if a "@qpcr" column is present in the data and if so, adjusts its shape and
adds it to the data to be assembled for the assays.
"""
column_decorator = plain_decorators["qpcr:column"]
qpcr_col = np.where(array == column_decorator)
qpcr_row, qpcr_col = qpcr_col
if len(qpcr_col) != 0:
qpcr_rows = slice(int(qpcr_row + 1), maxrows)
qpcr_col = array[qpcr_rows, qpcr_col]
qpcr_tiled = np.repeat(qpcr_col, groups.size)
total_data.append(qpcr_tiled)
headers.append("@qpcr")
def _vet_replicates(self, ignore_empty, replicates, array, **kwargs):
"""
Checks if provided replicates cover all data groups (annoated columns).
And it also gets the names supposed to be used for the columns.
"""
# get assays for each decorator
groups = 0
decorator = plain_decorators["qpcr:group"]
# find decorated starting columns
indices = np.argwhere(array == decorator)
if indices.size == 0 and not ignore_empty:
e = aw.ParserError("no_decorators_found")
logger.error(e)
SystemExit(e)
# get only column indices
indices = indices[:, 1]
groups += indices.size
# check if replicates are an integer, if so transform to
# tuple that cover all found assays
if aux.same_type(replicates, 1):
replicates = np.tile([replicates], groups)
# else, check it it's a formula that needs to be read out to a tuple.
elif aux.same_type(replicates, ""):
replicates = qpcr.Assay()._reps_from_formula(replicates)
# if replicates are already a tuple, make sure they cover all rows
elif aux.same_type(replicates, ()):
all_covered = groups == len(replicates)
if not all_covered:
e = aw.AssayError("reps_dont_cover", n_samples=groups, reps=replicates)
logger.error(e)
SystemExit(e)
# get names for assays
group_names = aux.from_kwargs("names", None, kwargs)
# vet that names cover
if group_names is not None and len(group_names) != len(replicates):
e = aw.AssayError("groupnames_dont_colver", current_groups=f"None, but needs to be {len(replicates)} names.", new_received=group_names)
logger.error(e)
SystemExit(e)
# return tranformed replicates
return replicates, group_names
def _prep_header_array(self, col=None, row=None):
"""
Generates the array in which header entries should be searched for
"""
if row is None and col is not None:
array = self._data[:, col] if not self._transpose else self._data[col, :]
elif row is not None and col is None:
array = self._data[row, :] if not self._transpose else self._data[:, row]
elif row is not None and col is not None:
array = self._data[row, col] if not self._transpose else self._data[col, row]
else:
e = aw.ParserError("invalid_range")
logger.critical(e)
raise e
# re-format to str and reset "nan" to dummy_blank
array = array.astype(str)
array[np.argwhere(array == "nan")] = dummy_blank
array = array.reshape(array.size)
return array
def _make_index_range(self, start_indices, end_indices, crop_first=True):
"""
Generates an index range for a data column based on start and stop indices.
This assumes that the column entry (i.e. entry[1]) is always the same and only the rows are different.
Parameters
----------
start_indices : np.ndarray
Row, col indices of the header of the data column
end_indices : np.ndarray
Row, col indices of the last entry of the data column
crop_first : bool
If set to True it will offset the start row indices by +1 to exclude the header.
"""
start = start_indices[0] + 1 if crop_first else start_indices[0]
end = end_indices[0]
row_range = slice(start, end)
col = start_indices[1]
return row_range, col
def _find_column_starts(self, label, ref_indices):
"""
This function uses the assays' found reference row indices to
now search for the coordinates of the labeled cell so we know where a data column starts
"""
# get index to match to ref_indices
idx_to_match = 0 if not self._transpose else 1
data = self._data
all_found = np.argwhere(data == label)
row_indices = np.transpose(all_found)[idx_to_match]
# adjust coordinates +1 as the headers would be in the row below the assay declaration
# we only have to do this if we use the default setting of assays in the same column
if not self._transpose:
ref_indices = ref_indices + 1
# ref_indices = ref_indices + 1
matching_rows = np.where(np.isin(row_indices, ref_indices))
# if no matches were found, try incrementing the index offset once more
# (we'll allow for a single row between the header and the start of the data)
no_matches = len(matching_rows) == 1 and matching_rows[0].size == 0
if no_matches:
ref_indices = ref_indices + 1
matching_rows = np.where(np.isin(row_indices, ref_indices))
# check again, and raise Error if still no matches are found
no_matches = len(matching_rows) == 1 and matching_rows[0].size == 0
if no_matches:
e = aw.ParserError("no_data_found", label=label)
logger.error(e)
SystemExit(e)
matching_rows = all_found[matching_rows]
return matching_rows
def _find_column_ends(self, indices):
"""
Determines the end index of a column within the datafile based on the starting indices of
its header label
"""
data = self._data
end_indices = np.zeros(indices.shape, dtype=int)
adx = 0
for i in indices:
row, col = i
idx = 0
value = 0
while True:
try:
value = data[row + idx, col]
except Exception as e:
logger.debug(e)
break
if value != value:
break
idx += 1
finals = np.array([row + idx, col], dtype=int)
end_indices[adx] += finals
adx += 1
return end_indices
[docs]class ArrayParser(_CORE_Parser):
"""
Handles only parsing of irregular files that contain multiple assays.
However, it does not read any specific filetype but requires a `numpy.ndarray`
as input for it's `read` method.
"""
def __init__(self):
super().__init__()
[docs] def read(self, data):
"""
Accepts a numpy array for its data source.
Parameters
-------
data : np.ndarray
A numpy array of some data to parse.
"""
self._data = data
[docs] def pipe(self, data, **kwargs):
"""
Accepts a numpy array for its data
source, and parses for assay datasets.
Parameters
-------
data : np.ndarray
A numpy array of some data to parse.
**kwargs
Any additional keyword argument that will be passed to any of the wrapped methods.
Returns
-------
assays : dict
A dictionary of all the extracted assays from the datafile storing the data as pandas DataFrames.
Individual assays can also be accessed using the `get` method.
"""
self.read(data)
self.parse(**kwargs)
assays = self.get()
if self._save_loc is not None:
self.save()
return assays
[docs]class CsvParser(_CORE_Parser):
"""
Handles reading and parsing irregular `csv` files that contain multiple assays.
It extracts datasets either through regex pattern matching or/and through provided
decorators within the datafile.
"""
def __init__(self):
super().__init__()
[docs] def pipe(self, filename: str, **kwargs):
"""
A wrapper for read+parse
Note
----
This is the suggested use of `CsvParser`.
If a directory has been specified into which the datafiles shall be saved,
then saving will automatically be done.
Parameters
-------
filename : str
A filepath to an input csv file.
**kwargs
Any additional keyword argument that will be passed to any of the wrapped methods.
Returns
-------
assays : dict
A dictionary of all the extracted assays from the datafile storing the data as pandas DataFrames.
Individual assays can also be accessed using the `get` method.
"""
try:
self.read(filename, **kwargs)
except Exception as e:
logger.info(e)
self.read(filename)
e = aw.ParserError("incompatible_read_kwargs", func="pandas.read_csv")
logger.info(e)
self.parse(**kwargs)
assays = self.get()
if self._save_loc is not None:
self.save()
return assays
[docs] def read(self, filename: str, **kwargs):
"""
Reads an input csv file.
Parameters
-------
filename : str
A filepath to an input csv file.
**kwargs
Any additional keyword arguments to be passed to pandas' `read_csv` function.
"""
self._src = filename
contents = self._prepare_commas()
contents = StringIO(contents) # convert to StringIO for pandas to be able to read
delimiter = ";" if self._is_csv2() else ","
delimiter = aux.from_kwargs("sep", delimiter, kwargs, rm=True)
# now read the data and convert to numpy array
try:
df = pd.read_csv(contents, header=None, sep=delimiter, **kwargs)
except Exception as e:
logger.debug(e)
e = aw.ParserError("incompatible_read_kwargs", func="pandas.read_csv()")
logger.info(e)
df = pd.read_csv(contents, header=None, sep=delimiter)
drop_nan = aux.from_kwargs("drop_nan", True, kwargs, rm=True)
if drop_nan:
df = df.dropna(axis=0, how="all").reset_index(drop=True)
data = df.to_numpy()
self._data = data
def _is_csv2(self):
"""
Tests if csv file is ; delimited (True) or common , (False)
"""
with open(self._src, "r") as openfile:
content = openfile.read()
if ";" in content:
return True
return False
def _prepare_commas(self):
"""
This function reads the datafile and adjusts the number of commas
within each line to ensure equal commas in the entire file.
Note
-------
Although the method uses the term "commas" it also works with semicolons for csv2
Returns
-------
new_content : str
A string containing the entire file contents with adjusted commas.
"""
delimiter = ";" if self._is_csv2() else ","
# check if quotes are in datafile and adjust comma-patterns to use
empty_comma_filler = f'{delimiter}""' if self._has_quotes() else f"{delimiter}"
comma_sep = f'"{delimiter}"' if self._has_quotes() else f"{delimiter}"
comma_sep = re.compile(comma_sep)
with open(self._src, "r") as f:
content = f.read()
lines = content.split("\n")
comma_counts = [len(comma_sep.findall(i)) for i in lines]
max_commas = max(comma_counts)
lines = [i + (max_commas - j) * empty_comma_filler for i, j in zip(lines, comma_counts)]
new_content = "\n".join(lines)
return new_content
def _has_quotes(self):
"""
Checks if cells from the csv input file have quotes around them.
Essentially it checks if there are any "," patterns in the file.
"""
delimiter = ";" if self._is_csv2() else ","
with open(self._src, "r") as f:
content = f.read()
has_quotes = f'"{delimiter}"' in content
return has_quotes
[docs]class ExcelParser(_CORE_Parser):
"""
Handles reading and parsing `excel` files that may contain multiple assays.
It extracts datasets either through regex pattern matching or/and through provided
decorators within the datafile.
"""
def __init__(self):
super().__init__()
[docs] def read(self, filename: str, sheet_name: (str or int) = 0, **kwargs):
"""
Reads an input excel file.
Parameters
-------
filename : str
A filepath to an input excel file.
sheet_name : int or str
The name of a specific spreadsheet of the file to read.
If none is provided by default the first sheet will be read.
Only one single sheet can be read at a time.
If an `integer` is provided the sheets will be accessed by their order, otherwise by their name (if a `string` is provided).
**kwargs
Any additional keyword arguments to be passed to pandas `read_excel` function.
"""
self._src = filename
# read data and convert to numpy array
try:
data = pd.read_excel(self._src, sheet_name=sheet_name, header=None, **kwargs)
except Exception as e:
logger.debug(e)
data = pd.read_excel(self._src, sheet_name=sheet_name, header=None)
e = aw.ParserError("incompatible_read_kwargs", func="pandas.read_excel()")
logger.info(e)
drop_nan = aux.from_kwargs("drop_nan", True, kwargs, rm=True)
if drop_nan:
data = data.dropna(axis=0, how="all").reset_index(drop=True)
data = data.to_numpy()
self._data = data
[docs] def pipe(self, filename: str, **kwargs):
"""
A wrapper for read+parse
Note
----
This is the suggested use of `ExcelParser`.
If a directory has been specified into which the datafiles shall be saved,
then saving will automatically be done.
Parameters
-------
filename : str
A filepath to an input excel file.
**kwargs
Any additional keyword argument that will be passed to any of the wrapped methods.
Returns
-------
assays : dict
A dictionary of all the extracted assays from the datafile storing the data as pandas DataFrames.
Individual assays can also be accessed using the `get` method.
"""
try:
self.read(filename, **kwargs)
except Exception as e:
logger.debug(e)
self.read(filename)
e = aw.ParserError("incompatible_read_kwargs", func="pandas.read_excel")
logger.info(e)
self.parse(**kwargs)
assays = self.get()
if self._save_loc is not None:
self.save()
return assays
if __name__ == "__main__":
# parser = CsvParser()
# parser.assay_pattern("Rotor-Gene")
# parser.save_to("__csvparser")
# mycsv = "./__parser_data/Brilliant III Ultra Fast SYBR Green 2019-01-07 (1).csv"
# parser.pipe(mycsv)
# print("""\n\n\n ========================= \n All good with CsvParser \n ========================= \n\n\n""")
# parser2 = ExcelParser()
# parser2.assay_pattern("Rotor-Gene")
# parser2.save_to("./__excelparser")
# myexcel = "./__parser_data/excel 3.9.19.xlsx"
# parser2.pipe(myexcel, sheet_name = 1)
# print("""\n\n\n ========================= \n All good with ExcelParser \n ========================= \n\n\n""")
# parser3 = ExcelParser()
# decorated_excel = "./__parser_data/excel 3.9.19_decorated.xlsx"
# parser3.save_to("./__decorated_excelparser")
# parser3.read(decorated_excel)
# parser3.assay_pattern("Rotor-Gene")
# parser3.find_by_decorator(decorator = "qpcr:all")
# parser3.find_columns()
# parser3.make_dataframes()
# parser3.save()
# # print(parser3.get())
# print("""\n\n\n ========================= \n All good with decorated ExcelParser \n ========================= \n\n\n""")
# parser4 = CsvParser()
# decorated_csv = "./__parser_data/Brilliant III Ultra Fast SYBR Green 2019-01-07 (1)_decorated.csv"
# parser4.save_to("./__decorated_csvparser")
# parser4.read(decorated_csv)
# parser4.assay_pattern("Rotor-Gene")
# parser4.find_by_decorator(decorator = "qpcr:all")
# parser4.find_columns()
# parser4.make_dataframes()
# parser4.save()
# # print(parser4.get())
# print("""\n\n\n ========================= \n All good with decorated CsvParser \n ========================= \n\n\n""")
# parser4 = CsvParser()
# decorated_csv = "./__parser_data/Brilliant III Ultra Fast SYBR Green 2019-01-07 (1)_decorated.csv"
# parser4.save_to("./__decorated_csvparser_pipe")
# parser4.assay_pattern("Rotor-Gene")
# # parser4.pipe(decorated_csv, decorator = "qpcr:assay")
# # print(parser4.get())
# parser4.pipe("./__parser_data/manual_decorated.csv")
# print("""\n\n\n ========================= \n All good with decorated CsvParser using pipe \n ========================= \n\n\n""")
parser3 = ExcelParser()
decorated_excel = "./__parser_data/excel 3.9.19_decorated.xlsx"
parser3.save_to("./__decorated_excelparser_pipe_nodec")
# parser3.labels( "Type", "No.")
parser3.assay_pattern("Rotor-Gene")
parser3.pipe(decorated_excel)
print(parser3.get())
exit()
# print("""\n\n\n ========================= \n All good with decorated ExcelParser using pipe without dec\n ========================= \n\n\n""")
# same_row_assays = "/Users/NoahHK/Downloads/qPCR cytokines upon treatment_decorated.xlsx"
# parser5 = ExcelParser()
# parser5.transpose()
# parser5.read(same_row_assays, sheet_name = 1)
# parser5.save_to("./__transposed_parser")
# parser5.assay_pattern("all")
# parser5.labels( id_label = "Sample Name", ct_label = "CT" )
# print("""\n\n\n ========================= \n Transposed excel (FIND)\n ========================= \n\n\n""")
# parser5.find_assays(col = 1)
# parser5.find_columns()
# parser5.make_dataframes()
# r = parser5.get()
# print(r)
# # assert parser5._assay_indices is not None, "(find_assays) No assay_indices could be found!!!"
# parser5.clear()
# print("""\n\n\n ========================= \n Transposed excel (DECO)\n ========================= \n\n\n""")
# parser5.find_by_decorator("qpcr:all")
# # assert parser5._assay_indices is not None, "(find_by_decorator) No assay_indices could be found!!!"
# parser5.find_columns()
# parser5.make_dataframes()
# r = parser5.get()
# parser5.clear()
# parser5.read(same_row_assays, sheet_name = 1)
# parser5.parse(decorator = "qpcr:all")
# r = parser5.get()
# print(r)
# parser5.save()
bigtable_horiztonal = "/Users/NoahHK/Downloads/Local_cohort_Adenoma_qPCR_rawdata_decorated.xlsx"
parser_bigtable = ExcelParser()
parser_bigtable.read(bigtable_horiztonal)
parser_bigtable.labels(id_label="tissue_number")
parser_bigtable._make_BigTable_range(is_horizontal=True)
r = parser_bigtable._infer_BigTable_groups(replicates="3,4", names=["GAPDH", "SORD1"])
print(r)