Source code for metator.align
#!/usr/bin/env python3
# coding: utf-8
"""Alignement of reads
Align read files onto the assembly and return a tsv file with 9 columns: ReadID,
ContigA, Position_startA, Position_endA, StrandA, ContigB, Position_startB,
Position_endB, StrandB. Reads are mapped separately, sorted by names, then
interleaved (rather than mapped in paired-end mode) to capture the pairs mapping
on two different contigs.
This module contains all these alignment functions:
- align
- get_contact_pairs
- merge_alignement
- process_bamfile
- process_bwa_bamfile
"""
import csv
import pysam
import shutil as st
import subprocess as sp
import hicstuff.cutsite as hcc
import hicstuff.io as hio
import hicstuff.iteralign as hci
import metator.network as mtn
from metator.log import logger
from os.path import join
from looseversion import LooseVersion
[docs]def align(
fq_in,
index,
aligner,
bam_out,
n_cpu,
tmp_dir,
iterative=False,
fq_in_2=None,
):
"""Alignment
Aligns reads of fq_in with bowtie2. Parameters of bowtie2 are set as
--very-sensitive-local and only the aligned reads are
kept in the bam file.
Multiple files could be given, but only one file will be written as output.
Parameters:
-----------
fq_in : str
Path to input fastq file to align.
index : str
Path to the bowtie2 index genome.
aligner : str
Name of the aligner algorithm to use. Either bowtie2 or bwa.
bam_out : str
Path where the alignment should be written in BAM format.
n_cpu : int
The number of CPUs to use for the alignment.
tmp_dir:
Path to directory to store temporary files.
iterative:
Wether to use the iterative mapping procedure (truncating reads and
extending iteratively)
fq_in_2 : str
Path to the reverse fastq file for bwa (map both reads at the same time
but do not take into account their poistion as pairs.)
Returns:
--------
int :
0
"""
if iterative:
iter_tmp_dir = hio.generate_temp_dir(tmp_dir)
tmp_bam = join(tmp_dir, "tmp.bam")
hci.iterative_align(
fq_in,
tmp_dir=iter_tmp_dir,
ref=index,
n_cpu=n_cpu,
bam_out=tmp_bam,
min_qual=40,
aligner=aligner,
read_len=20,
)
st.rmtree(iter_tmp_dir)
sp.call(
"samtools sort -n -@ {n_cpu} -o {bam} {tmp}".format(
n_cpu=n_cpu, tmp=tmp_bam, bam=bam_out
),
shell=True,
)
else:
# Align the reads on the reference genome with the chosen aligner.
map_args = {
"cpus": n_cpu,
"fq": fq_in,
"idx": index,
"bam": bam_out,
"fq_rev": fq_in_2,
}
if aligner == "bwa":
cmd = "bwa mem -5SP -t {cpus} {idx} {fq} {fq_rev}".format(
**map_args
)
elif aligner == "bowtie2":
cmd = (
"bowtie2 -x {idx} -p {cpus} --very-sensitive-local {fq} --no-unal"
).format(**map_args)
# Write the outputfile in a temporary bam file.
map_process = sp.Popen(cmd, shell=True, stdout=sp.PIPE, stderr=sp.PIPE)
sort_process = sp.Popen(
"samtools sort -n -@ {cpus} -o {bam}".format(**map_args),
shell=True,
stdin=map_process.stdout,
)
_out, _err = sort_process.communicate()
mapping_values = map_process.stderr.read()
for line in mapping_values.split(b"\n"):
logger.info(f"{line.decode('utf-8')}")
return 0
[docs]def get_contact_pairs(
for_in,
rev_in,
index,
assembly,
aligner,
aligner_mode,
min_qual,
start,
depth_file,
enzyme,
out_dir,
tmp_dir,
n_cpu,
):
"""General function to do the whole alignment of both fastq.
The Function write at the output directory location given as an argument and
return a tsv file of the aligned reads with 9 columns: ReadID, ContigA,
Position_startA, Position_endA, StrandA, ContigB, Position_startB,
Position_endB, StrandB. The name of the file will be alignment.txt.
Two start stages are possible, from fastq or bam files.
Parameters
----------
for_in : str
Path to input forward fastq or bam file to align. If multiple files are
given, list of path separated by a comma.
rev_in : str
Path to input reverse fastq or bam file to align. If multiple files are
given, list of path separated by a comma.
index : str
Path to the bowtie2 index of the assembly.
assembly : str
The initial assembly path acting as the alignment file's reference
assembly.
aligner : str
Either 'bowtie2' or 'bwa' aligner used or to be use to map the reads.
aligner_mode : str
Either 'normal', 'iterative' or 'cutsiste'. Mode to align the HiC reads.
min_qual : int
Minimum mapping quality required to keep Hi-C pairs.
start : str
Either fastq or bam. Starting point for the pipeline.
depth_file : str or None
Path to the depth.txt file from jgi_summarize_bam_contig_depths from
Metabat2 Software.
enzyme : str or None
String that contains the names of the enzyme separated by a comma.
out_dir : str
Path to directory where to write the output file.
tmp_dir : str
Path where temporary files should be written.
n_cpu : int
The number of CPUs to use for the alignment.
Returns
-------
list of str:
List of path of the Files with the table containing the alignement data
of the pairs: ReadID, ContigA, Position_startA, Position_endA, StrandA,
ContigB, Position_startB, Position_endB, StrandB.
dict:
Dictionnary of the all the contigs from the assembly, the contigs names
are the keys to the data of the contig available with the following
keys: "id", "length", "GC", "hit", "coverage". Coverage still at 0 and
need to be updated later.
dict:
Dictionnary for hit information on each contigs.
"""
# Iterates on all the input files:
for_list = for_in.split(",")
rev_list = rev_in.split(",")
out_file_list = []
total_aligned_pairs = 0
# Create the contig data dictionnary and hit from each alignments
nb_alignment = len(for_list)
contig_data, hit_data = mtn.create_contig_data(
assembly, nb_alignment, depth_file, enzyme
)
for i in range(len(for_list)):
for_in = for_list[i]
try:
rev_in = rev_list[i]
except IndexError:
rev_in = None
name = "alignment_" + str(i)
out_file = join(out_dir, "alignment_" + str(i) + ".pairs")
out_file_list.append(out_file)
# Align if necessary
if start == "fastq":
iterative = False
# Digest reads if necessary.
if aligner_mode == "cutsite":
digest_for = join(tmp_dir, f"digest_for_{i}.fq.gz")
digest_rev = join(tmp_dir, f"digest_rev_{i}.fq.gz")
hcc.cut_ligation_sites(
fq_for=for_in,
fq_rev=rev_in,
digest_for=digest_for,
digest_rev=digest_rev,
enzyme=enzyme,
mode="for_vs_rev",
seed_size=20,
n_cpu=int(n_cpu),
)
for_in, rev_in = digest_for, digest_rev
elif aligner_mode == "iterative":
iterative = True
if iterative or (aligner == "bowtie2"):
# Create files to save the alignment.
alignment_for = join(out_dir, name + "_for.bam")
alignment_rev = join(out_dir, name + "_rev.bam")
# Align the forward reads
logger.info(f"Alignment of {for_in}:")
align(for_in, index, aligner, alignment_for, n_cpu, iterative)
# Align the reverse reads
logger.info(f"Alignment of {rev_in}:")
align(
rev_in,
index,
aligner,
alignment_rev,
n_cpu,
tmp_dir,
iterative,
)
elif aligner == "bwa":
# Create file to save the alignement.
alignment = join(out_dir, name + ".bam")
logger.info(f"Alignment of {for_in} and {rev_in}:")
align(
for_in,
index,
aligner,
alignment,
n_cpu,
tmp_dir,
fq_in_2=rev_in,
)
elif start == "bam":
if aligner == "bowtie2":
logger.info(f"Processing {for_in} and {rev_in}:")
alignment_for = for_in
alignment_rev = rev_in
elif aligner == "bwa":
alignment = for_in
else:
logger.error("Start argument should be either 'fastq' or 'bam'.")
raise ValueError
if aligner == "bowtie2":
# Create files to save the alignment.
alignment_temp_for = join(tmp_dir, name + "_for_temp.txt")
alignment_temp_rev = join(tmp_dir, name + "_rev_temp.txt")
# Filters the aligned and non aligned reads from the forward and
# reverse bam files.
aligned_reads_for = process_bamfile(
alignment_for, min_qual, alignment_temp_for
)
aligned_reads_rev = process_bamfile(
alignment_rev, min_qual, alignment_temp_rev
)
logger.info(
f"{aligned_reads_for} forward reads aligned and {aligned_reads_rev} reverse reads aligned."
)
# Merge alignement to create a pairs file
logger.info("Merging the pairs:")
n_pairs = merge_alignment(
alignment_temp_for, alignment_temp_rev, contig_data, out_file
)
logger.info(f"{n_pairs} pairs aligned.\n")
total_aligned_pairs += n_pairs
# Case where a bam file from bwa is given as input.
if aligner == "bwa":
n_pairs = process_bwa_bamfile(
alignment, min_qual, contig_data, out_file
)
logger.info(f"{n_pairs} pairs aligned.\n")
total_aligned_pairs += n_pairs
if len(out_file_list) > 1:
logger.info(f"TOTAL PAIRS MAPPED: {total_aligned_pairs}\n")
return out_file_list, contig_data, hit_data
[docs]def merge_alignment(forward_aligned, reverse_aligned, contig_data, out_file):
"""Merge forward and reverse alignment into one file with only pairs which
have both reads are aligned on the genome with 9 columns: ReadID, ContigA,
Position_startA, Position_endA, StrandA, ContigB, Position_startB,
Position_endB, StrandB.
Parameters
----------
forward_alignement : str
File with the table containing the data of the forward reads kept after
the alignment. With five columns: ReadID, Contig, Position_start,
Position_end, strand.
reverse_alignement : str
File with the table containing the data of the forward reads kept after
the alignment. With five columns: ReadID, Contig, Position_start,
Position_end, strand.
contig_data : dict
Dictionnary of the all the contigs from the assembly, the contigs names
are the keys to the data of the contig available with the following
keys: "id", "length", "GC", "hit", "coverage". Coverage still at 0 and
need to be updated later.
out_file : str
Path to write the output pairs file.
Returns
-------
str:
File with the table containing the alignement data of the pairs: ReadID,
ContigA, Position_startA, Position_endA, StrandA, ContigB,
Position_startB, Position_endB, StrandB
"""
# Open files for reading and writing.
with open(forward_aligned, "r") as for_file, open(
reverse_aligned, "r"
) as rev_file, open(out_file, "w") as merged:
for_bam = csv.reader(for_file, delimiter="\t")
rev_bam = csv.reader(rev_file, delimiter="\t")
# Initialization.
n_pairs = 0
for_read = next(for_bam)
rev_read = next(rev_bam)
# Write header of the pairs file.
merged.write("## pairs format v1.0\n")
merged.write("#columns: readID chr1 pos1 chr2 pos2 strand1 strand2\n")
merged.write("#sorted: readID\n")
merged.write("#shape: upper triangle\n")
for contig in contig_data:
merged.write(
"#chromsize: {0} {1}\n".format(
contig, contig_data[contig]["length"]
)
)
# Loop while at least one end of one end is reached. It's possible to
# advance like that as the two tsv files are sorted on the id of the
# reads.
while n_pairs >= 0:
# Case of both reads of the pair map.
if for_read[0] == rev_read[0]:
# Write read ID
merged.write(for_read[0] + "\t")
# Pairs are 1-based so we have to add 1 to 0 based bam position
for_position = (
for_read[1] + "\t" + str(int(for_read[2]) + 1) + "\t"
)
rev_position = (
rev_read[1] + "\t" + str(int(rev_read[2]) + 1) + "\t"
)
# Have upper triangle shape
if (
for_read[1] == rev_read[1]
and int(for_read[2]) <= int(rev_read[2])
) or contig_data[for_read[1]]["id"] < contig_data[rev_read[1]][
"id"
]:
merged.write(
for_position
+ rev_position
+ for_read[3]
+ "\t"
+ rev_read[3]
+ "\n"
)
else:
merged.write(
rev_position
+ for_position
+ rev_read[3]
+ "\t"
+ for_read[3]
+ "\n"
)
n_pairs += 1
try:
for_read = next(for_bam)
except StopIteration:
break
try:
rev_read = next(rev_bam)
except StopIteration:
break
# As the file is version sorted we have to do compare the two names
# according to the version order.
else:
names = [for_read[0], rev_read[0]]
names_sorted = sorted(names, key=LooseVersion)
# Case of the forward read mapped but not the reverse. Indeed,
# no line would have been added previously if the read didn't
# map.
if names == names_sorted:
try:
for_read = next(for_bam)
except StopIteration:
break
# Same but with no mapped forward reads.
else:
try:
rev_read = next(rev_bam)
except StopIteration:
break
return n_pairs
[docs]def process_bamfile(alignment, min_qual, filtered_out):
"""Filter alignment BAM files
Reads all the reads in the input BAM alignment file. Keep reads in the
output if they are aligned with a good quality (greater than min quality
threshold given) saving their only some columns: ReadID, Contig,
Position_start, Position_end, strand to save memory.
Parameters
----------
alignment : str
Path to the input temporary alignment.
min_qual : int
Minimum mapping quality required to keep a Hi-C pair.
filtered_out : str
Path to the output temporary tsv alignement.
Returns
-------
int:
Number of reads aligned.
"""
# Check the quality and status of each aligned fragment.
aligned_reads = 0
save = pysam.set_verbosity(0)
temp_bam = pysam.AlignmentFile(alignment, "rb", check_sq=False)
pysam.set_verbosity(save)
with open(filtered_out, "w") as f:
for r in temp_bam:
# Check mapping quality
if r.mapping_quality >= min_qual:
# Check Mapping (0 or 16 flags are kept only)
if r.flag == 0:
aligned_reads += 1
read = str(
r.query_name
+ "\t"
+ r.reference_name
+ "\t"
+ str(r.reference_start)
+ "\t"
+ "+"
+ "\n"
)
f.write(read)
elif r.flag == 16:
aligned_reads += 1
read = str(
r.query_name
+ "\t"
+ r.reference_name
+ "\t"
+ str(r.reference_start)
+ "\t"
+ "-"
+ "\n"
)
f.write(read)
temp_bam.close()
return aligned_reads
[docs]def process_bwa_bamfile(alignment, min_qual, contig_data, out_file):
"""Filter alignment BAM files
Reads all the reads in the input BAM alignment file. Keep reads in the
output if they are aligned with a good quality (greater than min quality
threshold given) saving their only some columns: ReadID, Contig,
Position_start, Position_end, strand to save memory.
Parameters
----------
alignment : str
Path to the input temporary alignment.
min_qual : int
Minimum mapping quality required to keep a Hi-C pair.
contig_data : dict
Dictionnary of the all the contigs from the assembly, the contigs names
are the keys to the data of the contig available with the following
keys: "id", "length", "GC", "hit", "coverage". Coverage still at 0 and
need to be updated later.
out_file : str
Path to the output pairs file.
Returns
-------
int:
Number of pairs aligned.
"""
# Read the bam file.
n_pairs = 0
save = pysam.set_verbosity(0)
temp_bam = pysam.AlignmentFile(alignment, "rb", check_sq=False)
pysam.set_verbosity(save)
with open(out_file, "w") as merged:
# Write header of the pairs file.
merged.write("## pairs format v1.0\n")
merged.write("#columns: readID chr1 pos1 chr2 pos2 strand1 strand2\n")
merged.write("#sorted: readID\n")
merged.write("#shape: upper triangle\n")
for contig in contig_data:
merged.write(
"#chromsize: {0} {1}\n".format(
contig, contig_data[contig]["length"]
)
)
# Loop until the end of the file. Read the reads by two as the forward
# and reverse reads should be interleaved.
while n_pairs >= 0:
try:
for_read = next(temp_bam)
while for_read.is_supplementary:
for_read = next(temp_bam)
rev_read = next(temp_bam)
while rev_read.is_supplementary:
rev_read = next(temp_bam)
# Check mapping quality
if (
for_read.mapping_quality >= min_qual
and rev_read.mapping_quality >= min_qual
):
# Check flag
if not (for_read.is_unmapped or rev_read.is_unmapped):
n_pairs += 1
# Safety check (forward and reverse are the same reads)
if for_read.query_name != rev_read.query_name:
logger.error(
f"Reads should be paired - {for_read.query_name}\t{rev_read.query_name}"
)
raise ValueError
# Define pairs value.
name = for_read.query_name
contig1 = for_read.reference_name
contig2 = rev_read.reference_name
pos1 = for_read.pos + 1
pos2 = for_read.pos + 1
strand1 = "+"
strand2 = "+"
if for_read.is_reverse:
strand1 = "-"
if rev_read.is_reverse:
strand2 = "-"
# Modify order to have an upper triangle and write
# the pair.
if (contig1 == contig2 and pos1 <= pos2) or contig_data[
contig1
]["id"] < contig_data[contig2]["id"]:
merged.write(
"\t".join(
[
name,
contig1,
str(pos1),
contig2,
str(pos2),
strand1,
strand2,
]
)
+ "\n"
)
else:
merged.write(
"\t".join(
[
name,
contig2,
str(pos2),
contig1,
str(pos1),
strand2,
strand1,
]
)
+ "\n"
)
# Exit the loop if no more reads.
except StopIteration:
break
# Close the bam file and return number of pairs
temp_bam.close()
return n_pairs