"""
toolbox for handling fast5 (hdf5) files.
Fast5 files are generated by OxfordNanopore
Sequencing devices. This class is designed to
read formats up to SQK-MAP006. Compatibility
with more recent formats is not guaranteed.
"""
import h5py
import re
class Fast5Exception(Exception):
pass
class Fast5PathException(Fast5Exception):
pass
def identify_events_path(fast5):
"""
Newer versions of the a fast5 file have a different path to the event table.
Identify which type of fast5 file we have and return the path accordingly.
Args:
fast5: h5py File object
Returns:
corresponding index of the fast5 file
"""
try:
_ = fast5["/Analyses/Basecall_1D_000/BaseCalled_template"]
return "/Analyses/Basecall_1D_000/BaseCalled_{0}"
except KeyError:
try:
_ = fast5["/Analyses/Basecall_2D_000/BaseCalled_template"]
return "/Analyses/Basecall_2D_000/BaseCalled_{0}"
except KeyError:
raise Fast5PathException("Unknown version of fast5 format")
[docs]class Fast5File(object):
def __init__(self, path):
result = re.search(r'ch(\d+)_file(\d+)_', path)
self.file_id = int(result.group(2))
self.channel_id = int(result.group(1))
self.seqs = {
"template": None,
"complement": None
}
try:
self.f5 = h5py.File(path, 'r')
self.event_path = identify_events_path(self.f5)
except OSError:
raise Fast5Exception("Unable to open fast5-file.")
[docs] def get_id(self):
"""
Returns:
unique identifier for f5 file.
"""
return "ch_{0}_file_{1}".format(self.channel_id, self.file_id)
[docs] def get_attrs(self, strand):
"""
Get Attributes for template/complement strand
Args:
strand: either "template" or "complement"
Returns:
dict: {shift: foo, drift: foo, scale: foo} or None if strand not in File
"""
assert strand in ["template", "complement"]
attrs = ["shift", "scale", "drift"]
try:
all_attrs = self.f5[self.event_path.format(strand) + "/Model"].attrs
return {k: all_attrs[k] for k in attrs}
except (KeyError, ValueError):
return None
[docs] def get_events(self, strand):
"""
Get events for template/complement strand
Args:
strand: either "template" or "complement"
Returns:
generator yielding one event-dict at a time or None if strand not in File
"""
assert strand in ["template", "complement"]
try:
events = self.f5[self.event_path.format(strand) + "/Events"]
for raw_ev in events:
ev = {}
ev["mean"] = raw_ev[0]
ev["start"] = float(raw_ev[1])
ev["stdv"] = raw_ev[2]
ev["length"] = float(raw_ev[3])
ev["kmer"] = bytes(raw_ev[4]).decode('utf-8')
ev["move"] = int(raw_ev[6])
yield ev
except KeyError:
pass
[docs] def get_corrected_events(self, strand):
"""
Get events for template/complement strand and apply the shift/scale/drift corrections.
Unfortunately, these corrections are not documented exactly anywhere. Some information is on
https://wiki.nanoporetech.com/display/BP/1D+Basecalling+overview.
Args:
strand: either "template" or "complement"
Returns:
generator yielding one event-dict at a time or None if strand not in File
"""
assert strand in ["template", "complement"]
attrs = self.get_attrs(strand)
if attrs is None:
raise Fast5Exception("events cannot be shift/scale/drift corrected.")
# apparently, drift is applied relatively to the start of the read.
read_start = next(self.get_events(strand))["start"]
shift = lambda mean: mean - attrs["shift"]
scale = lambda mean: mean / attrs["scale"]
drift = lambda mean, start: mean - (start - read_start) * attrs["drift"]
for ev in self.get_events(strand):
ev["mean"] = drift(scale(shift(ev["mean"])), ev["start"])
yield ev
[docs] def get_seq(self, strand):
"""
get the nt-sequence, based on the kmers and 'move'.
Evaluation is done in a lazy fashion. If the function
was called once, the sequence can be accessed in constant time.
Args:
strand (str): either template or complement
Returns:
(str) nucleotide sequence
"""
assert strand in ["template", "complement"]
if self.seqs[strand] is None:
self.seqs[strand] = self.events2seq(self.get_events(strand))
return self.seqs[strand]
@staticmethod
[docs] def events2seq(events):
"""turn events into a nt sequence based on the 'move' column
Args:
events: list of events (dictionaries)
"""
seq = list()
seq.append(next(events)["kmer"])
for ev in events:
move = ev["move"]
kmer = ev["kmer"]
seq.append(kmer[len(kmer)-move:])
return "".join(seq)