Source code for metator.contact_map
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""Generates a HiC contact map of a MetaTOR object.
General utility functions and class for handling one MetaTOR object and for
generating its HiC contact map using hicstuff pipeline.
Core class to handle bin object easily:
- MetatorObject
Core function to build the network are:
- extract_pairs
- generate_contact_map
"""
import hicstuff.pipeline as hcp
import metator.io as mio
from metator.log import logger
from os.path import join
import pandas as pd
import pypairix
import subprocess as sp
[docs]class MetatorObject:
"""
Class to handle MetaTOR object informations and to extract easily aligned
pairs from the object given.
The class is initiate by the MetaTOR object type, its name, the assembly
file, the contig data file from MetaTOR and the pairs files.
It's possible to add a threshold on the contigs size.
"""
def __init__(
self,
metator_object,
name,
assembly,
contig_data_file,
pairs,
min_size=5000,
):
"""Initiates the MetaTOR objects and set useful informations.
Parameters:
-----------
metator_object : str
Object to extract contigs to build the matrix. Either "contig",
"core_bin", "overlapping_bin", "recursive_bin", "final_bin" or
"other".
name : str
Name of the object. Could be the name of a contig, an id of a bin or
the name of the bin. Example: "NODE_1" or "MetaTOR_1_0".
assembly : str
Path to the fasta file containing the contigs of interest. Could be
the whole or the extracted contigs of one bin.
contig_data_file : str
Path to the contig_data_final.txt file form MetaTOR output.
pairs : List of str
List of path of the ".pairs" file.
min_size : int
Size threshold used to filter contigs. Default: 5000.
"""
self.assembly = assembly
self.contigs_data = contig_data_file
self.pairs_files = pairs
self.min_size = min_size
self.set_metator_object(metator_object, name)
self.contigs = None
self.large_contigs = None
self.contigs_size = None
self.fasta = None
self.pairs = None
[docs] def get_contigs_data(self):
"""Method to get contigs data Dataframe
Returns:
--------
pandas.core.frame.DataFrame
Table with informations from metaTOR binning for each contigs.
"""
contigs_data = pd.read_csv(self.contigs_data, sep="\t")
return contigs_data
[docs] def set_contigs(self):
"""Method to extract the list of contigs of the class from the contig
data file and their size.
"""
contigs_data = self.get_contigs_data()
try:
contigs_data[self.object]
except KeyError:
logger.error("The object that you gave is not in the table.")
raise ValueError
self.contigs = list(
contigs_data["Name"][contigs_data[self.object] == self.name]
)
self.contigs_size = list(
contigs_data["Size"][contigs_data[self.object] == self.name]
)
[docs] def set_large_contigs(self):
"""Method to keep only contigs bigger than the threshold given to remove
small bins which will be smaller than the binning value and which will
prevent a good scaffolding with Instagraal.
"""
self.large_contigs = []
for index, value in enumerate(self.contigs_size):
if value >= self.min_size:
self.large_contigs.append(self.contigs[index])
self.contigs = self.large_contigs
self.contigs_size = [
size for size in self.contigs_size if size >= self.min_size
]
[docs] def set_metator_object(self, metator_object, name):
"""Method to get the metator object and name of the object usable for
the algorithm.
Parameters:
-----------
metator_object : str Object to extract contigs to build the matrix.
Either "contig", "core_bin", "overlapping_bin", "recursive_bin",
"final_bin" or "other".
name : str
Name of the object. Could be the name of a contig, an id of a bin or
the name of the bin. Example: "NODE_1" or "MetaTOR_1_0".
"""
if metator_object == "contig":
self.object = "Name"
self.name = name
elif metator_object == "core_bin":
self.object = "Core_bin_ID"
try:
int(name)
self.name = str(name)
except ValueError as object_no_exist:
logger.error(
"With core bin object, the name should be the numeric ID of\
the core bin."
)
raise ValueError from object_no_exist
elif metator_object == "overlapping_bin":
self.object = "Overlapping_bin_ID"
try:
int(name)
self.name = str(name)
except ValueError:
self.name = str(name.split("_")[1])
try:
int(self.name)
except ValueError as object_no_exist:
logger.error(
"Overlapping bin name should be either numerical ID or \
a name like 'MetaTOR_1' or 'MetaTOR_1_0'."
)
raise ValueError from object_no_exist
elif metator_object == "recursive_bin":
self.object = "Recursive_bin_ID"
try:
int(name)
self.name = str(name)
except ValueError:
self.name = str(name.split("_")[2])
try:
int(self.name)
except ValueError as object_no_exist:
logger.error(
"Overlapping bin name should be either numerical ID or \
a name like 'MetaTOR_1_1'."
)
raise ValueError from object_no_exist
if int(self.name) <= 0:
logger.error(
"A recursive bin should have an id bigger than 0."
)
elif metator_object == "final_bin":
self.object = "Final_bin"
self.name = name
else:
self.object = str(metator_object)
try:
self.name = int(name)
except ValueError:
self.name = str(name)
[docs] def write_fasta(self, tmp_dir, out_dir):
"""Method to write the new fasta with only the contigs of interest in
the outdir directory.
Parameters:
-----------
tmp_dir : str
Path to the temporary directory to write the list of contigs of the
object.
out_dir : str
Path to the output directory where to write the new fasta file.
"""
# Create a fasta ouput file.
self.fasta = join(out_dir, f"{self.name}.fa")
# Write a contigs used by pyfastx extract.
contigs_list = join(tmp_dir, f"{self.name}.txt")
with open(contigs_list, "w") as file:
for contig_name in self.contigs:
file.write("%s\n" % contig_name)
# Extract contigs from the fastq.
cmd = "pyfastx extract {0} -l {1} > {2}".format(
self.assembly, contigs_list, self.fasta
)
process = sp.Popen(cmd, shell=True)
process.communicate()
[docs]def extract_pairs(metator_data):
"""Function to extract the pairs of a MetaTOR object from pairs files.
Parameters:
-----------
metator_data : object from class MetatorObject
Object of the class MetatorObject. It should have the parameters contigs
and the output pairs file of the metaTOR object set.
Returns:
--------
int:
Number of pairs from the MetaTOR object.
"""
# Initiation
n_pairs = 0
output_file = metator_data.pairs
# Write one pair file for all the ones given.
with open(output_file, "w") as output_pairs:
# Write the header of the output pairs
output_pairs.write("## pairs format v1.0\n")
output_pairs.write(
"#columns: readID chr1 pos1 chr2 pos2 strand1 strand2\n"
)
for contig_id, contig in enumerate(metator_data.contigs):
output_pairs.write(
"#chromsize: {0} {1}\n".format(
contig,
metator_data.contigs_size[contig_id],
)
)
for pairs_file in metator_data.pairs_files:
# Check if the pairix index exist.
pairs_data = mio.get_pairs_data(pairs_file)
# Need a sorted (chr1 chr2 pos1 pos2) pair file indexed with pairix.
for contig_id1, contig in enumerate(metator_data.contigs):
# Only need to retrieve the upper triangle.
for contig_id2 in range(contig_id1, len(metator_data.contigs)):
pairs_lines = pairs_data.query2D(
contig,
0,
metator_data.contigs_size[contig_id1],
metator_data.contigs[contig_id2],
0,
metator_data.contigs_size[contig_id2],
1,
)
for pairs_line in pairs_lines:
pairs_line = pairs_line[:7]
n_pairs += 1
output_pairs.write("\t".join(pairs_line) + "\n")
return n_pairs
[docs]def generate_contact_map(
assembly,
contig_data_file,
enzyme,
name,
pairs,
out_dir,
tmp_dir,
filter_events=False,
force=False,
mat_fmt="graal",
metator_object="final_bin",
min_size=5000,
pcr_duplicates=False,
threads=1,
):
"""General function to extract pairs of the MetaTOR object and generate its
the contact map.
Parameters:
-----------
assembly : str
Path to the fasta file containing the contigs of interest. Could be the
whole or the extracted contigs of one bin.
contig_data_file : str
Path to the contig_data_final.txt file form MetaTOR output.
enzyme : str
Enzyme used to digest the genome in the HiC experiment. Example:
HpaII,MluCI.
name : str
Name of the object. Could be the name of a contig, an id of a bin or the
name of the bin. Example: "NODE_1" or "MetaTOR_1_0".
pairs : str
Path of the ".pairs" file or bgzip indexed pair file. If more than one
is given, files should be separated by a comma.
out_dir : str
Path where output files should be written. Current directory by default.
tmp_dir : str
Path where temporary files will be written.
filter_events : bool
Filter spurious or uninformative 3C events. Requires a restriction
enzyme. Default: False.
force : bool
If True, overwrite existing files with the same name as output.
Default: False.
mat_fmt : str
Select the output matrix format. Can be either "bg2" for the bedgraph2
format, "cool" for Mirnylab's cool format, or graal for a plain text COO
format compatible with Koszullab's instagraal software.
Default: "graal".
metator_object : str
Object to extract contigs to build the matrix. Either "contig",
"core_bin", "overlapping_bin", "recursive_bin", "final_bin" or "other".
min_size : int
Minimum contig size required to keep it.
pcr_duplicates : bool
If True, PCR duplicates will be filtered based on genomic positions.
Pairs where both reads have exactly the same coordinates are considered
duplicates and only one of those will be conserved. Default: False.
threads : int
Numbers of threads to use. Default: 1.
"""
# Extract bin information from metaTOR outdir.
logger.info("Generate HiC contact map for %s", name)
metator_data = MetatorObject(
metator_object, name, assembly, contig_data_file, pairs, min_size
)
metator_data.set_contigs()
if min_size > 0:
metator_data.set_large_contigs()
metator_data.write_fasta(tmp_dir, out_dir)
metator_data.pairs = join(tmp_dir, name + ".pairs")
# Extract pairs of the bin.
n_pairs = extract_pairs(metator_data)
if n_pairs == 0:
logger.info("No pairs have been extracted")
else:
logger.info("%d pairs have been extracted.", n_pairs)
# Launch hicstuff pipeline.
hcp.full_pipeline(
genome=metator_data.fasta,
input1=metator_data.pairs,
distance_law=False,
enzyme=enzyme,
filter_events=filter_events,
force=force,
mat_fmt=mat_fmt,
out_dir=out_dir,
pcr_duplicates=pcr_duplicates,
prefix=name,
plot=False,
start_stage="pairs",
threads=threads,
tmp_dir=tmp_dir,
)
return n_pairs