Let's write a simple FASTQ trimmer, so you get a bit more experience processing things with loops. This will be a bit more involved of an exercise, so, beware! ;)
Input: A path to a fastq file. You'll need to read this file in fastq file has been read in for you in the first cell (since you haven't learned to read files yet).
Output: A trimmed fastq file written out at the specified step, returning the statistics of how many reads were removed at each step.
You'll need to write a couple filters, each of these will then be put together at the end to build a full trimmer. This is a multistep problem!
def mean(numbers):
"""
Compute the average value of a provided list of numbers. Hint: copy and paste from QC ;)
returns: float, the mean value
"""
### BEGIN SOLUTION
# Put correct code here. This code is removed for the student version, but is used
# to confirm that your tests are valid.
return sum(numbers) / len(numbers)
### END SOLUTION
from nose.tools import assert_equal
assert_equal(mean([1, 1, 1]), 1)
assert_equal(mean([1, 2, 3]), 2)
assert_equal(mean([-10, 10]), 0)
### BEGIN HIDDEN TESTS
# students will NOT see these extra tests
# Meh.
assert_equal(mean([1000, 2000, 3000]), 2000)
### END HIDDEN TESTS
print("Succes!")
Succes!
def remove_too_short(read_id, sequence, quality, length=30):
"""
Remove a read that is shorter than the specified length parameter
read_id: string with the read ID
sequence: DNA sequence
quality: list of ints, with the quality score of each base.
length: integer definining the minimum acceptable length, nothing shorter than this should be allowed.
returns: true if the read should be kept, false if it should be dropped.
"""
### BEGIN SOLUTION
# Put correct code here. This code is removed for the student version, but is used
# to confirm that your tests are valid.
return len(sequence) >= length
### END SOLUTION
from nose.tools import assert_equal
assert_equal(remove_too_short("x", "aaccttgg", [1,2,3,4,5,6,7,8], length=30), False)
assert_equal(remove_too_short("x", "aaccttgg", [1,2,3,4,5,6,7,8], length=6), True)
print("Succes!")
Succes!
def remove_bad_quality(read_id, sequence, quality, score=20):
"""
Check for sequences with low mean quality, if the mean quality is too low, return False. Otherwise True.
read_id: string with the read ID
sequence: DNA sequence
quality: list of ints, with the quality score of each base.
score: Minimum acceptable quality score across the entire read.
returns: Boolean, true if the read should be kept, false if it should be dropped.
"""
### BEGIN SOLUTION
# Put correct code here. This code is removed for the student version, but is used
# to confirm that your tests are valid.
return mean(quality) >= score
### END SOLUTION
from nose.tools import assert_equal
assert_equal(remove_bad_quality("x", "aaccttgg", [8, 7, 7, 6, 6, 5, 5, 4], score=20), False)
assert_equal(remove_bad_quality("x", "aaccttgg", [8, 7, 7, 6, 6, 5, 5, 4], score=5), True)
def trim_bad_quality_ends(read_id, sequence, quality, score=20):
"""
Trim a read from the 3' end (the tail end, not the start). If the quality of the end most base is below `score`, then remove it. Continue removing bases until the quality of that base is ABOVE score.
read_id: string with the read ID
sequence: DNA sequence
quality: list of ints, with the quality score of each base.
score: bases averaging below this score, should be removed
returns: [read_id, trimmed_sequence, trimmed_quality]
"""
### BEGIN SOLUTION
# Put correct code here. This code is removed for the student version, but is used
# to confirm that your tests are valid.
rev_q = quality[::-1]
rev_s = sequence[::-1]
for idx, q in enumerate(rev_q):
if q <= score:
continue
else:
break
new_q = rev_q[idx:][::-1]
new_s = rev_s[idx:][::-1]
return [read_id, new_s, new_q]
### END SOLUTION
from nose.tools import assert_equal
assert_equal(trim_bad_quality_ends("x", "aaccttgg", [8, 7, 7, 6, 6, 5, 5, 4], score=5), ['x', 'aacct', [8, 7, 7, 6, 6]])
assert_equal(trim_bad_quality_ends("x", "aaccttgg", [8, 7, 7, 6, 6, 5, 5, 4], score=4), ['x', 'aaccttg', [8, 7, 7, 6, 6, 5, 5]])
assert_equal(trim_bad_quality_ends("x", "aaccttgg", [8, 7, 4, 6, 6, 5, 5, 4], score=4), ['x', 'aaccttg', [8, 7, 4, 6, 6, 5, 5]])
def parseFastqFile(path_to_file):
"""
Parse a FASTQ file into an appropriate data structure.
The specific data structure used is up to you (it will matter when you implement the "trim" step)
but we can recommend parsing it into something like:
[
["@read_id1", "ACTG", "+", [40, 40, 39, 27]],
["@read_id2", "ACTG", "+", [40, 38, 31, 29]]
]
i.e. a list of lists, where every child list is the four parts of a read, and
the phred quality score is decoded properly (Tip: copy and paste from the last assignment)
path_to_file: A string pointing to a path on disk that you can open
"""
### BEGIN SOLUTION
# Put correct code here. This code is removed for the student version, but is used
# to confirm that your tests are valid.
reads = []
with open(path_to_file, 'r') as handle:
all_lines = handle.readlines()
texts = handle.read()
print(texts)
for i in range(0, len(all_lines), 4):
read = [
all_lines[i].strip(),
all_lines[i+1].strip(),
all_lines[i+2].strip(),
[ord(x) - 33 for x in all_lines[i+3].strip()]
]
reads.append(read)
return reads
### END SOLUTION
here is your sequence: GTGCCAGCCGCCGCGGTAGTCCGACGTGGCTGTCTCTTATACACATCTCCGAGCCCACGAGACCGAAGAACATCTCGTATGCCGTCTTCTGCTTGAAAAAAAAAAAAAAAAAAAACAAAAAAAAAAAAAGAAGCAAATGACGATTCAAGAAAGAAAAAAACACAGAATACTAACAATAAGTCATAAACATCATCAACATAAAAAAGGAAATACACTTACAACACATATCAATATCTAAAATAAATGATCAGCACACAACATGACGATTACCACACATGTGTACTACAAGTCAACTA and here is your quality GGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGFGGGFGGGGGGAFFGGFGGGGGGGGFGGGGGGGGGGGGGGFGGG+38+35*311*6,,31=******441+++0+0++0+*1*2++2++0*+*2*02*/***1*+++0+0++38++00++++++++++0+0+2++*+*+*+*+*****+0**+0**+***+)*.***1**//*)***)/)*)))*)))*),)0(((-((((-.(4(,,))).,(())))))).)))))))-))-(, and oeps I don't know if this is your identifier but you can eyeballscan: M00970:337:000000000-BR5KF:1:1102:17745:1557 1:N:0:CGCAGAAC+ACAGAGTT here is your sequence: GTGCCAGCAGCCGCGGTAATACGTAGGTGGCAAGCGTTGTCCGGAATTATTGGGCGTAAAGCGCGCGCAGGCGGATAGGTCAGTCTGTCTTAAAAGTTCGGGGCTTAACCCCGTGATGGGATGGAAACTGCCAATCTAGAGTATCGGAGAGGAAAGTGGAATTCCTAGTGTAGCGGTGAAATGCGTAGATATTAGGAAGAACACCAGTGGCGAAGGCGACTTTCTGGACGAAAACTGACGCTGAGGCGCGAAAGCCAGGGGAGCGAACGGGATTAGATACCCGGTAGTCCGCCGTG and here is your quality GGGGGGGGGGGGGGGGGGGGGFFGGGGGGGGGGGGGGGGGGGFGGGGGGGGGGGGGGGGGGFFGEBFGGGGFFCFGGGGGGFGGGGGGGGGGGGGGGGGGGGGGGGGGGFGGGGGG=FGGG8FFGGFFGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGG9<EFGGGGGGGGGGGGGGFGGGGGGEEGGFCFD;EGGFDFGGGGEGGCF?DGDGDGG>D3CFFFGFDFG37D:?FFFFBBBBF:;>>>B;:(4<BFF>,7767990:F29(3?A)--)44-79B<5<?(4;;F, and oeps I don't know if this is your identifier but you can eyeballscan: M00970:337:000000000-BR5KF:1:1102:23974:2015 1:N:0:CGAAGAAC+ACAGAGGT
# Download the file
# You should run this cell :)
import os
import urllib.request
if not os.path.exists("example.fastq"):
urllib.request.urlretrieve("https://gist.githubusercontent.com/hexylena/7d249607f8f763301f06c78a48c3bf6f/raw/a100e278cee1c94035a3a644b16863deee0ba2c0/example.fastq", "example.fastq")
def trim(input_path, output_path, too_short=50, min_overall_quality=25, trim_bad_ends=20):
"""
Trim a set of reads.
This should occur in three phases:
- (read in file)
- trim bad ends
- then drop ones that have overall too low quality
- lastly drop the ones that are too short.
- (write out final file)
Record how many reads you have after each step in an array. Be sure to also include the starting count.
input_path: Path to a fastq file that should be read in via parseFastqFile
output_path: Path where you should write out the final fastq file. (It will be checked.)
reads: provided list of lists, where every item in the list has 4 elements [read_id, sequence, '+', list-of-quality-scores]
trim_bad_ends: integer score to use with your trim_bad_quality_ends function.
min_overall_quality: integer score to use with your remove_bad_quality function
too_short: drop any reads shorter than this length. Do this LAST.
returns: [the final set of reads, the number of reads dropped at each step]
"""
reads = parseFastqFile("example.fastq")
# Now the rest is up to you!
### BEGIN SOLUTION
# Put correct code here. This code is removed for the student version, but is used
# to confirm that your tests are valid.
counts = [len(reads)]
# Remove the +
reads = [[read[0], read[1], read[3]] for read in reads]
# Trim from the end.
reads = [trim_bad_quality_ends(*read, score=trim_bad_ends) for read in reads]
counts.append(len(reads))
reads = [read for read in reads if remove_bad_quality(*read, score=min_overall_quality)]
counts.append(len(reads))
reads = [read for read in reads if remove_too_short(*read, length=too_short)]
counts.append(len(reads))
with open(output_path, 'w') as handle:
for read in reads:
handle.write(read[0] + "\n")
handle.write(read[1] + "\n")
handle.write("+\n")
handle.write("".join([chr(x + 33) for x in read[2]]) + "\n")
return counts
### END SOLUTION
import subprocess
def assertFastqReadCount(path):
try:
return int(subprocess.check_output(['grep', '-c', '^@', 'example-trimmed.fastq']).decode('utf-8'))
except:
return 0
from nose.tools import assert_equal
assert_equal(trim("example.fastq", "example-trimmed.fastq"), [250, 250, 148, 148])
assert_equal(assertFastqReadCount("example-trimmed.fastq"), 148)
assert_equal(trim("example.fastq", "example-trimmed.fastq", trim_bad_ends=30, min_overall_quality=35), [250, 250, 155, 155])
assert_equal(assertFastqReadCount("example-trimmed.fastq"), 155)
assert_equal(trim("example.fastq", "example-trimmed.fastq", trim_bad_ends=20, min_overall_quality=26, too_short=180), [250, 250, 136, 118])
assert_equal(assertFastqReadCount("example-trimmed.fastq"), 118)
### BEGIN HIDDEN TESTS
# students will NOT see these extra tests
# Just re-run the same so they can't modify.
assert_equal(trim("example.fastq", "example-trimmed.fastq", trim_bad_ends=20, min_overall_quality=26, too_short=180), [250, 250, 136, 118])
assert_equal(assertFastqReadCount("example-trimmed.fastq"), 118)
### END HIDDEN TESTS
--------------------------------------------------------------------------- AssertionError Traceback (most recent call last) <ipython-input-12-32b4b390078f> in <module> 7 from nose.tools import assert_equal 8 assert_equal(trim("example.fastq", "example-trimmed.fastq"), [250, 250, 148, 148]) ----> 9 assert_equal(assertFastqReadCount("example-trimmed.fastq"), 148) 10 11 assert_equal(trim("example.fastq", "example-trimmed.fastq", trim_bad_ends=30, min_overall_quality=35), [250, 250, 155, 155]) /usr/lib/python3.8/unittest/case.py in assertEqual(self, first, second, msg) 910 """ 911 assertion_func = self._getAssertEqualityFunc(first, second) --> 912 assertion_func(first, second, msg=msg) 913 914 def assertNotEqual(self, first, second, msg=None): /usr/lib/python3.8/unittest/case.py in _baseAssertEqual(self, first, second, msg) 903 standardMsg = '%s != %s' % _common_shorten_repr(first, second) 904 msg = self._formatMessage(msg, standardMsg) --> 905 raise self.failureException(msg) 906 907 def assertEqual(self, first, second, msg=None): AssertionError: 0 != 148