#!/usr/bin/env python3
# coding: utf-8
"""Abstract command classes for metaTOR
This module contains all classes related to metaTOR commands:
- network
- partition
- pipeline
- validation
- qualitycheck
- contactmap
Note
----
Structure based on Rémy Greinhofer (rgreinho) tutorial on subcommands in docopt
: https://github.com/rgreinho/docopt-subcommands-example
abignaud, 20201118
Raises
------
NotImplementedError
Will be raised if AbstractCommand is called for some reason instead of one
of its children.
"""
import logging
import multiprocessing as mp
import os
import shutil
import time
import metator.align as mta
import metator.contact_map as mtc
import metator.io as mio
import metator.log as mtl
import metator.quality_check as mtq
import metator.network as mtn
import metator.partition as mtp
import metator.scaffold as mts
import metator.validation as mtv
from docopt import docopt
from functools import partial
from metator.log import logger
from metator.version import __version__
from os.path import exists, dirname, join
from scipy.sparse import save_npz, load_npz
[docs]class AbstractCommand:
"""Abstract base command class
Base class for the commands from which other metaTOR commands derive.
"""
def __init__(self, command_args, global_args):
"""Initialize the commands"""
self.args = docopt(self.__doc__, argv=command_args)
self.global_args = global_args
[docs] def execute(self):
"""Execute the commands"""
raise NotImplementedError
[docs] def check_output_path(self, path, force=False):
"""Throws error if the output file exists. Create required file tree
otherwise.
"""
# Get complete output filename and prevent overwriting unless force is
# enabled
if not force and os.path.exists(path):
raise IOError(
"Output file already exists. Use --force to overwrite"
)
if dirname(path):
os.makedirs(dirname(path), exist_ok=True)
[docs]class Network(AbstractCommand):
"""Generation of network command
Generate metaHiC contigs network from fastq reads or bam files.Generates a
network file (in edgelist form) from either fastq files or bam files. For
both starting input forward and reverse reads or alignment need to be in
seprated files with the same read names. In the output network contigs are
the network nodes and the edges are the contact counts.
If multiple fastq are given will align them seprately and return a hit file
with a number of hit for each sample.
Note: the network is in a strict barebone form so that it can be reused and
imported quickly into other applications etc. Verbose information about
every single node in the network is written on a 'contig data' file.
usage:
network --forward=STR --assembly=FILE [--reverse=STR]
[--aligner=bowtie2] [--aligner-mode=normal] [--depth=FILE] [--edge=0]
[--enzyme=STR] [--normalization=empirical_hit] [--no-clean-up]
[--outdir=DIR] [--min-quality=30] [--self-contacts] [--start=fastq]
[--threads=1] [--tmpdir=DIR]
options:
-1, --forward=STR Fastq file or list of Fastq separated by a comma
containing the forward reads to be aligned or
their corresponding bam or pairs files.
-2, --reverse=STR Fastq file or list of Fastq separated by a comma
containing the reverse reads to be aligned or
their corresponding bam files. Forward and
reverse reads need to have the same identifier
(read names). If start is set to pair, no
argument is necessary.
-a, --assembly=FILE The initial assembly path acting as the
alignment file's reference genome or the
basename of the bowtie2 index.
-b, --aligner=STR Aligner algorithm to use. Either "bwa" or
"bowtie2". [Default: bowtie2]
-B, --aligner-mode=STR Mode of alignment from hicstuff. Either normal,
iterative or cutsite. [Default: normal]
-d, --depth=FILE The depth.txt file from the shotgun reads used
to made the assembly computed by
jgi_summarize_bam_contig_depths from metabat2
pipeline.
-E, --edge=INT Distance of the edge region in base pair on the
contigs where the mapping reads are not
considered as inter contigs. [Default: 0]
-e, --enzyme=STR The list of restriction enzyme used to digest
the contigs separated by a comma. Example:
HpaII,MluCI.
-n, --normalization=STR If None, do not normalized the count of a
contact by the geometric mean of the coverage of
the contigs. Otherwise it's the type of
normalization. 6 values are possible "None",
"abundance", "length", "RS", "empirical_hit",
"theoritical_hit". [Default: empirical_hit]
-N, --no-clean-up Do not remove temporary files.
-o, --outdir=DIR The output directory to write the bam files the
network and contig data into. Default: current
directory.
-q, --min-quality=INT Threshold of quality necessary to considered a
read properly aligned. [Default: 30]
-s, --self-contacts If enabled, count alignments between a contig
and itself (intracontigs contacts).
-S, --start=STR Start stage of the pipeline. Either "fastq",
"bam", or "pair". [Default: fastq]
-t, --threads=INT Number of parallel threads allocated for the
alignement. [Default: 1]
-T, --tmpdir=DIR Temporary directory. Default to current
directory. [Default: ./tmp]
"""
[docs] def execute(self):
# Defined the temporary directory.
if not self.args["--tmpdir"]:
tmp_dir = mio.generate_temp_dir("./tmp")
else:
tmp_dir = self.args["--tmpdir"]
os.makedirs(tmp_dir, exist_ok=True)
# Defined the output directory and output file names.
if not self.args["--outdir"]:
self.args["--outdir"] = "."
os.makedirs(self.args["--outdir"], exist_ok=True)
# Enable file logging
now = time.strftime("%Y%m%d%H%M%S")
log_file = join(self.args["--outdir"], (f"metator_network_{now}.log"))
generate_log_header(log_file, cmd="network", args=self.args)
# Transform integer variables as integer.
min_qual = int(self.args["--min-quality"])
edge = int(self.args["--edge"])
# Defined boolean variables:
self_contacts = self.args["--self-contacts"]
# Check if forward and reverse arguments are given:
if (
self.args["--start"] == "fastq"
or (
self.args["--start"] == "bam"
and self.args["--aligner"] == "bowtie2"
)
) and not self.args["--reverse"]:
logger.error(
"Forward and reverse arguments are necessary for fastq with %s start and %s aligner.",
self.args["--start"],
self.args["--aligner"],
)
raise ValueError
# Check aligner.
if self.args["--aligner"] not in ["bowtie2", "bwa"]:
logger.error('Aligner should be either "bowtie2" or "bwa".')
raise ValueError
# Check aligner mode.
if self.args["--aligner-mode"] not in [
"normal",
"iterative",
"cutsite",
]:
logger.error(
'Aligner mode should be either "normal", "iterative" or "cutsite".'
)
raise ValueError
if (
self.args["--aligner-mode"] == "cutsite"
and not self.args["--enzyme"]
):
logger.warning(
"cutsite mode required an enzyme. Iterative mode will be use instead."
)
self.args["--aligner-mode"] = "iterative"
# Check correct algorithm value.
# Check if normalization in the list of possible normalization.
list_normalization = [
"None",
"abundance",
"length",
"RS",
"empirical_hit",
"theoritical_hit",
]
if self.args["--normalization"] not in list_normalization:
logger.error(
'Normalization should be among this list: "None", "abundance", "length", "RS", "empirical_hit", "theoritical_hit"'
)
raise ValueError
enzyme_required = ["RS", "theoritical_hit"]
if (
self.args["--normalization"] in enzyme_required
and not self.args["--enzyme"]
):
logger.error(
'For "RS" and "theoritical_hit" normalization, enzyme is required.'
)
raise ValueError
depth_required = ["abundance", "theoritical_hit"]
if (
self.args["--normalization"] in depth_required
and not self.args["--depth"]
):
logger.error(
'For "abundance" and "theoritical_hit" normalization, depth is required.'
)
raise ValueError
if self.args["--start"] not in ["fastq", "bam", "pair", "network"]:
logger.error(
"Start argument should be 'fastq', 'bam', 'pair' or 'network'."
)
raise ValueError
# Extract index and genome file
assembly = self.args["--assembly"]
# Check what is the reference. If a fasta is given build the index. If a
# bowtie2 index is given, retreive the fasta.
index = mio.check_fasta_index(assembly, mode=self.args["--aligner"])
if index is None:
if mio.check_is_fasta(assembly):
fasta = assembly
# If start at bam could skip the index generation.
if self.args["--start"] == "fastq":
index = mio.generate_fasta_index(
fasta, self.args["--aligner"], tmp_dir
)
else:
logger.error(
"Please give as assembly argument a %s index or a fasta.",
self.args["--aligner"],
)
raise ValueError
else:
fasta = mio.retrieve_fasta(index, self.args["--aligner"], tmp_dir)
# Print information of teh workflow:
logger.info("Aligner algorithm: %s", self.args["--aligner"])
logger.info("Enzyme: %s", self.args["--enzyme"])
logger.info("Normalization: %s", self.args["--normalization"])
logger.info("Minimum mapping quality: %s", self.args["--min-quality"])
# Do not align if pair start
if self.args["--start"] == "pair":
alignment_files = self.args["--forward"].split(",")
nb_alignment = len(alignment_files)
contig_data, hit_data = mtn.create_contig_data(
fasta,
nb_alignment,
self.args["--depth"],
self.args["--enzyme"],
)
else:
# Align pair-end reads with bowtie2
alignment_files, contig_data, hit_data = mta.get_contact_pairs(
self.args["--forward"],
self.args["--reverse"],
index,
fasta,
self.args["--aligner"],
self.args["--aligner-mode"],
min_qual,
self.args["--start"],
self.args["--depth"],
self.args["--enzyme"],
self.args["--outdir"],
tmp_dir,
self.args["--threads"],
)
# Build the network
mtn.alignment_to_contacts(
alignment_files,
contig_data,
edge,
hit_data,
self.args["--outdir"],
"network.txt",
"contig_data_network.txt",
tmp_dir,
self.args["--threads"],
self.args["--normalization"],
self_contacts,
)
# Delete the temporary folder
if not self.args["--no-clean-up"]:
shutil.rmtree(tmp_dir)
generate_log_footer(log_file)
[docs]class Partition(AbstractCommand):
"""Partition the network using Louvain algorithm
Partition the network file using iteratively the Louvain or Leiden
algorithm. Then looks for 'cores' bins that are constituted by the contigs
which are always in the same cluster at each iterations. Then using the
Hamming distance from these cores bins, merege the bins with a small Hamming
distance (close to 1), bigger than the percentage (overlapping parameter)
given.
It will also update the file to integrate the bins information of the
contigs.
Furthermore, both Leiden and Louvain algorithm are available here. However,
the benchmark made show that here the Louvain algorithm have better
performance and is faster on seawater and gut metagenomic samples.
Note that the Louvain or Leiden software are not, in the strictest sense,
necessary. Any program that assigns a node to a bin, does so non
deterministically and solely outputs a list in the form: 'node_id bin_id'
could be plugged instead.
usage:
partition --assembly=FILE --contigs=FILE --network=FILE
[--algorithm=louvain] [--cluster-matrix] [--force] [--iterations=100]
[--no-clean-up] [--outdir=DIR] [--overlap=80] [--prefix=STR]
[--res-param=1.0] [--size=500000] [--threads=1] [--tmpdir=DIR]
options:
-a, --assembly=FILE The path to the assembly fasta file used to do
the alignment.
-A, --algorithm=STR Either "louvain" or "leiden", algorithm to use
to partition the network. [Default: louvain]
-c, --contigs=FILE The path to the tsv file containing the data of
the contigs (ID, Name, Length, GC content, Hit,
Coverage, Restriction Site).
-C, --cluster-matrix If enabled, save the clustering matrix.
-F, --force If enabled, would remove directory of
overlapping bins in the output directory.
-i, --iterations=INT Number of iterations of Louvain. [Default: 100]
-n, --network=FILE Path to the file containing the network
information from the meta HiC experiment compute
in network function previously.
-N, --no-clean-up Do not remove temporary files.
-o, --outdir=DIR Path to the directory to write the output.
Default to current directory. [Default: ./]
-O, --overlap=INT Hamming distance threshold to use to merge bins
(percentage). [Default: 80]
-p, --prefix=STR Prefix to use for fasta files. By default just
'metator_', otherwise 'STR_metator_'.
-r, --res-param=FLOAT Resolution paramter to use for Leiden algorithm.
[Default: 1.0]
-s, --size=INT Threshold size to keep bins in base pair.
[Default: 500000]
-t, --threads=INT Number of parallel threads allocated for the
partition. [Default: 1]
-T, --tmpdir=DIR Temporary directory. Default to current
directory. [Default: ./tmp]
"""
[docs] def execute(self):
# Defined the temporary directory.
if not self.args["--tmpdir"]:
tmp_dir = mio.generate_temp_dir("./tmp")
else:
tmp_dir = self.args["--tmpdir"]
os.makedirs(tmp_dir, exist_ok=True)
# Defined the output directory.
if not self.args["--outdir"]:
self.args["--outdir"] = "."
os.makedirs(self.args["--outdir"], exist_ok=True)
fasta_dir = join(self.args["--outdir"], "overlapping_bin")
if not exists(fasta_dir):
os.makedirs(fasta_dir)
else:
if self.args["--force"]:
shutil.rmtree(fasta_dir)
os.makedirs(fasta_dir)
else:
logger.error(
"%s already existed. Remove directory or use -F argument to overwrite it.",
fasta_dir,
)
raise ValueError
# Enable file logging
now = time.strftime("%Y%m%d%H%M%S")
log_file = join(self.args["--outdir"], (f"metator_partition_{now}.log"))
generate_log_header(log_file, cmd="partition", args=self.args)
# Transform numeric variable as numeric
iterations = int(self.args["--iterations"])
overlapping_parameter = int(self.args["--overlap"]) / 100
size = int(self.args["--size"])
threads = int(self.args["--threads"])
resolution_parameter = float(self.args["--res-param"])
# Check correct algorithm value
if self.args["--algorithm"] not in ["louvain", "leiden"]:
logger.error('algorithm should be either "louvain" or "leiden"')
raise ValueError
# Create prefix.
if not self.args["--prefix"]:
prefix = "metator"
else:
prefix = f"{self.args['--prefix']}_metator"
# Partition the network
_clustering_matrix_file, _contigs_data_file = mtp.partition(
self.args["--algorithm"],
self.args["--assembly"],
self.args["--cluster-matrix"],
self.args["--contigs"],
iterations,
self.args["--network"],
self.args["--outdir"],
fasta_dir,
overlapping_parameter,
resolution_parameter,
size,
tmp_dir,
threads,
prefix,
)
# Delete pyfastx index:
os.remove(self.args["--assembly"] + ".fxi")
# Delete the temporary folder
if not self.args["--no-clean-up"]:
shutil.rmtree(tmp_dir)
generate_log_footer(log_file)
[docs]class Validation(AbstractCommand):
"""Use CheckM to validate the bins.
Recursively decontaminate the contaminated bins evaluated by checkM. The
script returns new decontaminated fasta and summary files of the
decontamination.
Only bins with more than 50% completion and 5% contamination are subject to
the recursive step. If the recursive step gave worst results than the first
(decrease of the completion with no decrease of the contamination), it will
keep the original bin.
usage:
validation --assembly=FILE --contigs=FILE --fasta=DIR --network=FILE
[--algorithm=louvain] [--cluster-matrix] [--force] [--iterations=10]
[--no-clean-up] [--outdir=DIR] [--overlap=90] [--prefix=STR]
[--res-param=1.0] [--size=500000] [--threads=1] [--tmpdir=DIR]
options:
-a, --assembly=FILE The path to the assembly fasta file used to do
the alignment.
-A, --algorithm=STR Algorithm to use. Either "louvain", "leiden" or
"spinglass". [Default: louvain]
-c, --contigs=FILE The path to the file containing the data of the
contigs from the partition step (13 columns).
-C, --cluster-matrix If enabled, save the clustering matrix.
-f, --fasta=DIR Path to the directory containing the input fasta
files of the bins to decontaminate.
-F, --force If enable, would remove directory of recursive
bins in the output directory.
-i, --iterations=INT Number of recursive iterations of Louvain.
[Default: 10]
-n, --network=FILE Path to the file containing the network
information from the meta HiC experiment compute
in network function previously.
-N, --no-clean-up Do not remove temporary files.
-o, --outdir=DIR Path to the directory to write the output.
Default to current directory. [Default: ./]
-O, --overlap=INT Hamming distance threshold to use to merge bins
(percentage). [Default: 90]
-p, --prefix=STR Prefix to use for fasta files. By default just
'metator_', otherwise 'STR_metator_'.
-r, --res-param=FLOAT Resolution paramter to use for Leiden
algorithm. [Default: 1.0]
-s, --size=INT Threshold size to keep bins in base pair.
[Default: 500000]
-t, --threads=INT Number of parallel threads allocated for the
partition. [Default: 1]
-T, --tmpdir=DIR Temporary directory. Default to current
directory. [Default: ./tmp]
"""
# Launch checkM to evaluate the completion and the contamination. If asked
# rerun Louvain to try to reduce the contamination, rerun checkM if the
# contamination decrease without a huge decrease of the completion keep the
# new bins. Otherwise go back to the old state.
[docs] def execute(self):
# Defined the temporary directory.
if not self.args["--tmpdir"]:
tmp_dir = mio.generate_temp_dir("./tmp")
else:
tmp_dir = self.args["--tmpdir"]
os.makedirs(tmp_dir, exist_ok=True)
# Defined the output directory and output file names.
if not self.args["--outdir"]:
self.args["--outdir"] = "."
os.makedirs(self.args["--outdir"], exist_ok=True)
recursive_fasta_dir = join(self.args["--outdir"], "recursive_bin")
final_fasta_dir = join(self.args["--outdir"], "final_bin")
if not exists(recursive_fasta_dir):
os.makedirs(recursive_fasta_dir)
else:
if self.args["--force"]:
shutil.rmtree(recursive_fasta_dir)
os.makedirs(recursive_fasta_dir)
else:
logger.error(
"%s already existed. Remove directory or use -F argument to overwrite it.",
recursive_fasta_dir,
)
raise ValueError
if not exists(final_fasta_dir):
os.makedirs(final_fasta_dir)
else:
if self.args["--force"]:
shutil.rmtree(final_fasta_dir)
os.makedirs(final_fasta_dir)
else:
logger.error(
"%s already existed. Remove directory or use -F argument to overwrite it.",
final_fasta_dir,
)
raise ValueError
# Enable file logging
now = time.strftime("%Y%m%d%H%M%S")
log_file = join(
self.args["--outdir"], (f"metator_validation_{now}.log")
)
generate_log_header(log_file, cmd="validation", args=self.args)
# Transform numeric variable as numeric
iterations = int(self.args["--iterations"])
size = int(self.args["--size"])
threads = int(self.args["--threads"])
overlapping_parameter = int(self.args["--overlap"]) / 100
resolution_parameter = float(self.args["--res-param"])
# Check correct algorithm value
if self.args["--algorithm"] not in ["louvain", "leiden", "spinglass"]:
logger.error(
'algorithm should be either "louvain", "leiden" or "spinglass.'
)
raise ValueError
# Create prefix.
if not self.args["--prefix"]:
prefix = "metator"
else:
prefix = f"{self.args['--prefix']}_metator"
_clustering_matrix_file = mtv.recursive_decontamination(
self.args["--algorithm"],
self.args["--assembly"],
self.args["--cluster-matrix"],
self.args["--contigs"],
final_fasta_dir,
self.args["--fasta"],
iterations,
self.args["--network"],
self.args["--outdir"],
overlapping_parameter,
recursive_fasta_dir,
resolution_parameter,
size,
tmp_dir,
threads,
prefix,
)
# Delete pyfastx index:
os.remove(self.args["--assembly"] + ".fxi")
# Delete the temporary folder
if not self.args["--no-clean-up"]:
shutil.rmtree(tmp_dir)
generate_log_footer(log_file)
[docs]class Pipeline(AbstractCommand):
"""Launch the full metator pipeline
Partition the contigs from teh assembly in bins from the metaHiC reads.
It's possible to start from the fastq, the bam, the pair or the network
files. It's will also possible to ask or not to run a validation step which
will decontaminate the bins when it's necessary. However as it's the
critical step for memory usage (~40G), it's possible to skip these step.
usage:
pipeline --assembly=FILE [--forward=STR] [--reverse=STR]
[--algorithm=louvain] [--aligner=bowtie2] [--aligner-mode=normal]
[--cluster-matrix] [--depth=FILE] [--edge=0] [--enzyme=STR] [--force]
[--iterations=100] [--rec-iter=10] [--junctions=NNNNN] [--no-clean-up]
[--normalization=empirical_hit] [--outdir=DIR] [--overlap=80]
[--prefix=STR] [--rec-overlap=90] [--min-quality=30] [--res-param=1.0]
[--size=500000] [--start=fastq] [--scaffold] [--threads=1]
[--tmpdir=DIR]
options:
-1, --forward=STR Fastq file or list of Fastq separated by a comma
containing the forward reads to be aligned or
their corresponding bam or pairs files.
-2, --reverse=STR Fastq file or list of Fastq separated by a comma
containing the reverse reads to be aligned or
their corresponding bam files. Forward and
reverse reads need to have the same identifier
(read names). If pair start is used no argument
is necessary.
-a, --assembly=FILE The initial assembly path acting as the
alignment file's reference genome or the
basename of the bowtie2 index.
-A, --algorithm=STR Algorithm to use. Either "louvain", "leiden" or
"spinglass". If spinglass is chosen, the first
partition will be done using louvain
[Default: louvain]
-b, --aligner=STR Aligner algorithm to use. Either "bwa" or
"bowtie2". [Default: bowtie2]
-B, --aligner-mode=STR Mode of alignment from hicstuff. Either normal,
iterative or cutsite. [Default: normal]
-C, --cluster-matrix If enabled, save the clustering matrix.
-d, --depth=FILE The depth.txt file from the shotgun reads used
to made the assembly computed by
jgi_summarize_bam_contig_depths from metabat2
pipeline.
-E, --edge=INT Distance of the edge region in base pair on the
contigs where the mapping reads are not
considered as inter contigs. [Default: 0]
-e, --enzyme=STR The list of restriction enzyme used to digest
the contigs separated by a comma. Example:
HpaII,MluCI.
-F, --force If enable, would remove directory of overlapping
bins in the output directory.
-i, --iterations=INT Number of iterations of Louvain for the
partition step. [Default: 100]
-j, --rec-iter=INT Number of iterations of Louvain for the
recursive step. [Default: 10]
-J, --junctions=STR Sequences to use as junction between contigs.
[Default: NNNNN]
-N, --no-clean-up Do not remove temporary files.
-m, --normalization=STR If None, do not normalized the count of a
contact by the geometric mean of the coverage of
the contigs. Otherwise it's the type of
normalization. 6 values are possible None,
abundance, length, RS, empirical_hit,
theoritical_hit. [Default: empirical_hit]
-o, --outdir=DIR The output directory to write the bam files the
network and contig data into. Default: current
directory.
-O, --overlap=INT Hamming distance threshold to use to merge bins
for the first step (percentage). [Default: 80]
-p, --prefix=STR Prefix to use for fasta files. By default just
'metator_', otherwise 'STR_metator_'.
-P, --rec-overlap=INT Hamming distance threshold to use to merge bins
for the recursive step (percentage).
[Default: 90]
-q, --min-quality=INT Threshold of quality necessary to considered a
read properly aligned. [Default: 30]
-r, --res-param=FLOAT Resolution paramter to use for Leiden
algorithm. [Default: 1.0]
-s, --size=INT Threshold size to keep bins in base pair.
[Default: 500000]
-S, --start=STR Start stage of the pipeline. Either fastq, bam,
or pair. [Default: fastq]
-t, --threads=INT Number of parallel threads allocated for the
alignement. [Default: 1]
-T, --tmpdir=DIR Temporary directory. Default to current
directory. [Default: ./tmp]
-v, --scaffold If enables, it will sacffold the genomes at the
end.
"""
[docs] def execute(self):
# Defined the temporary directory.
if not self.args["--tmpdir"]:
tmp_dir = mio.generate_temp_dir("./tmp")
else:
tmp_dir = self.args["--tmpdir"]
os.makedirs(tmp_dir, exist_ok=True)
# Defined the output directory and output file names.
if not self.args["--outdir"]:
self.args["--outdir"] = "."
os.makedirs(self.args["--outdir"], exist_ok=True)
overlapping_fasta_dir = join(self.args["--outdir"], "overlapping_bin")
if not exists(overlapping_fasta_dir):
os.makedirs(overlapping_fasta_dir)
else:
if self.args["--force"]:
shutil.rmtree(overlapping_fasta_dir)
os.makedirs(overlapping_fasta_dir)
else:
print(self.args["--force"])
logger.error(
"%s already existed. Remove directory or use -F argument to overwrite it.",
overlapping_fasta_dir,
)
raise ValueError
# Enable file logging
now = time.strftime("%Y%m%d%H%M%S")
log_file = join(self.args["--outdir"], (f"metator_pipeline_{now}.log"))
generate_log_header(log_file, cmd="pipeline", args=self.args)
# Define variable
min_qual = int(self.args["--min-quality"])
iterations = int(self.args["--iterations"])
edge = int(self.args["--edge"])
recursive_iterations = int(self.args["--rec-iter"])
overlapping_parameter = int(self.args["--overlap"]) / 100
recursive_overlapping_parameter = int(self.args["--rec-overlap"]) / 100
size = int(self.args["--size"])
threads = int(self.args["--threads"])
resolution_parameter = float(self.args["--res-param"])
# Check aligner.
if self.args["--aligner"] not in ["bowtie2", "bwa"]:
logger.error('Aligner should be either "bowtie2" or "bwa".')
raise ValueError
# Check aligner mode.
if self.args["--aligner-mode"] not in [
"normal",
"iterative",
"cutsite",
]:
logger.error(
'Aligner mode should be either "normal", "iterative" or "cutsite".'
)
raise ValueError
if (
self.args["--aligner-mode"] == "cutsite"
and not self.args["--enzyme"]
):
logger.warning(
"cutsite mode required an enzyme. Iterative mode will be use instead."
)
self.args["--aligner-mode"] = "iterative"
# Check correct algorithm value.
if self.args["--algorithm"] not in ["louvain", "leiden", "spinglass"]:
logger.error(
'algorithm should be either "louvain", "leiden" or "spinglass.'
)
raise ValueError
# Check if normalization in the list of possible normalization.
list_normalization = [
"None",
"abundance",
"length",
"RS",
"empirical_hit",
"theoritical_hit",
]
if self.args["--normalization"] not in list_normalization:
logger.error(
'Normalization should be among this list: "None", "abundance", "length", "RS", "empirical_hit", "theoritical_hit"'
)
raise ValueError
enzyme_required = ["RS", "theoritical_hit"]
if (
self.args["--normalization"] in enzyme_required
and not self.args["--enzyme"]
):
logger.error(
'For "RS" and "theoritical_hit" normalization, enzyme is required.'
)
raise ValueError
depth_required = ["abundance", "theoritical_hit"]
if (
self.args["--normalization"] in depth_required
and not self.args["--depth"]
):
logger.error(
'For "abundance" and "theoritical_hit" normalization, depth is required.'
)
raise ValueError
# Sanity check for validation
recursive_fasta_dir = join(self.args["--outdir"], "recursive_bin")
if not exists(recursive_fasta_dir):
os.makedirs(recursive_fasta_dir)
else:
if self.args["--force"]:
shutil.rmtree(recursive_fasta_dir)
os.makedirs(recursive_fasta_dir)
else:
logger.error(
"%s already existed. Remove directory or use -F argument to overwrite it",
recursive_fasta_dir,
)
raise ValueError
final_fasta_dir = join(self.args["--outdir"], "final_bin_unscaffold")
if not exists(final_fasta_dir):
os.makedirs(final_fasta_dir)
else:
if self.args["--force"]:
shutil.rmtree(final_fasta_dir)
os.makedirs(final_fasta_dir)
else:
logger.error(
"%s already existed. Remove directory or use -F argument to overwrite it.",
final_fasta_dir,
)
raise ValueError
# Sanity check for scaffolding
if self.args["--scaffold"]:
scaffold_fasta_dir = join(self.args["--outdir"], "scaffold_bin")
if not exists(scaffold_fasta_dir):
os.makedirs(scaffold_fasta_dir)
else:
if self.args["--force"]:
shutil.rmtree(scaffold_fasta_dir)
os.makedirs(scaffold_fasta_dir)
else:
logger.error(
"%s already existed. Remove directory or use -F argument to overwrite it.",
scaffold_fasta_dir,
)
raise ValueError
# Create prefix.
if not self.args["--prefix"]:
prefix = "metator"
else:
prefix = f"{self.args['--prefix']}_metator"
# Manage start point.
if self.args["--start"] == "fastq":
start = 1
elif self.args["--start"] == "bam":
start = 2
elif self.args["--start"] == "pair":
start = 3
else:
logger.error("Start argument should be 'fastq', 'bam', or 'pair'.")
raise ValueError
# Check if forward and reverse reads are given for fastq and bam start.
if (
self.args["--start"] == "fastq"
or (
self.args["--start"] == "bam"
and self.args["--aligner"] == "bowtie2"
)
) and not self.args["--reverse"]:
logger.error(
"Forward and reverse arguments are necessary for fastq with %s start and %s aligner.",
self.args["--start"],
self.args["--aligner"],
)
raise ValueError
# Print information of the workflow:
if start == 1:
logger.info(f"Minimum mapping quality: {min_qual}")
if start <= 2:
logger.info(f"Enzyme: {self.args['--enzyme']}")
logger.info(f"Normalization: {self.args['--normalization']}")
logger.info(f"Aligner algorithm: {self.args['--aligner']}")
logger.info(f"Partition algorithm: {self.args['--algorithm']}")
logger.info(f"Partition iterations: {iterations}")
logger.info(f"Overlapping parameter: {overlapping_parameter}")
logger.info(f"Recursive partition iterations: {recursive_iterations}")
logger.info(
f"Recursive overlapping parameter: {recursive_overlapping_parameter}\n"
)
# Extract index and genome file
assembly = self.args["--assembly"]
# Check what is the reference. If a fasta is given build the index. If a
# bowtie2 index is given, retreive the fasta.
index = mio.check_fasta_index(assembly, mode=self.args["--aligner"])
if index is None:
if mio.check_is_fasta(assembly):
fasta = assembly
if start == 1:
index = mio.generate_fasta_index(
fasta, self.args["--aligner"], tmp_dir
)
else:
logger.error(
"Please give as assembly argument a bowtie2 index or a fasta."
)
raise ValueError
else:
fasta = mio.retrieve_fasta(index, self.args["--aligner"], tmp_dir)
# Run the whole workflow
if start <= 2:
# Align pair-end reads with bowtie2
alignment_files, contig_data, hit_data = mta.get_contact_pairs(
self.args["--forward"],
self.args["--reverse"],
index,
fasta,
self.args["--aligner"],
self.args["--aligner-mode"],
min_qual,
self.args["--start"],
self.args["--depth"],
self.args["--enzyme"],
self.args["--outdir"],
tmp_dir,
self.args["--threads"],
)
else:
alignment_files = self.args["--forward"].split(",")
nb_alignment = len(alignment_files)
contig_data, hit_data = mtn.create_contig_data(
fasta,
nb_alignment,
self.args["--depth"],
self.args["--enzyme"],
)
# Build the network
network_file, contigs_data_file = mtn.alignment_to_contacts(
alignment_files,
contig_data,
edge,
hit_data,
self.args["--outdir"],
"network.txt",
"contig_data_network.txt",
tmp_dir,
self.args["--threads"],
self.args["--normalization"],
False,
)
# Partition the network
clustering_matrix_partition_file, contigs_data_file = mtp.partition(
self.args["--algorithm"],
fasta,
self.args["--cluster-matrix"],
contigs_data_file,
iterations,
network_file,
self.args["--outdir"],
overlapping_fasta_dir,
overlapping_parameter,
resolution_parameter,
size,
tmp_dir,
threads,
prefix,
)
# remove contig_data_network if not an input
if start <= 2:
contig_data_network_file = join(
self.args["--outdir"], "contig_data_network.txt"
)
os.remove(contig_data_network_file)
# Launch validation.
clustering_matrix_recursive_file = mtv.recursive_decontamination(
self.args["--algorithm"],
fasta,
self.args["--cluster-matrix"],
contigs_data_file,
final_fasta_dir,
overlapping_fasta_dir,
recursive_iterations,
network_file,
self.args["--outdir"],
recursive_overlapping_parameter,
recursive_fasta_dir,
resolution_parameter,
size,
tmp_dir,
threads,
prefix,
)
if self.args["--cluster-matrix"]:
# Make the sum with the partiton clustering matrix and save it.
clustering_matrix = load_npz(
clustering_matrix_partition_file + ".npz"
)
clustering_matrix_recursive = load_npz(
clustering_matrix_recursive_file + ".npz"
)
clustering_matrix = (
(clustering_matrix + clustering_matrix_recursive) / 2
).tocoo()
clustering_matrix_file = join(
self.args["--outdir"], "clustering_matrix"
)
save_npz(clustering_matrix_file, clustering_matrix)
# Remove contig_data_partition file
contig_data_partition_file = join(
self.args["--outdir"], "contig_data_partition.txt"
)
os.remove(contig_data_partition_file)
# Launch the scaffold
if self.args["--scaffold"]:
bin_summary = mio.read_bin_summary(
join(self.args["--outdir"], "bin_summary.txt")
)
task = partial(
mts.parallel_scaffold,
final_fasta_dir=final_fasta_dir,
alignment_files=alignment_files,
scaffold_fasta_dir=scaffold_fasta_dir,
junctions=self.args["--junctions"],
)
pool = mp.Pool(processes=int(self.args["--threads"]))
pool.map(task, bin_summary.index)
# Delete pyfastx index:
os.remove(fasta + ".fxi")
# Delete the temporary folder.
if not self.args["--no-clean-up"]:
shutil.rmtree(tmp_dir)
if start < 3:
for pair_file in alignment_files:
os.remove(pair_file)
generate_log_footer(log_file)
[docs]class Qc(AbstractCommand):
"""Generates some quality check on the output of metator.
The quality check is done on the high quality MAGs and only contigs witha
size bigger than 100kb. The goal is to assess HiC quality metrics on genomes
we kinda know to have a estimation of the quality of the library.
If the plot are set it will return some plot of the event distribution on
the high quality contigs.
usage:
qc --assembly=FILE --enzyme=STR [--bin-summary=FILE]
[--contig-data=FILE] [--metator-dir=DIR] [--no-clean-up] [--outdir=DIR]
[--prefix=STR] [--plot] [--threshold=STR] [--tmpdir=DIR] <pairsfile>...
arguments:
pairsfile File(s) containing pairs information.
options:
-a, --assembly=FILE Path to the fasta file containing the contigs of
interest. Could be the whole or the extracted
contigs of one bin.
-b, --bin-summary=FILE Path to the bin_summary.txt file from MetaTOR
output.
-c, --contig-data=FILE Path to the contig_data_final.txt file from
MetaTOR output.
-e, --enzyme=STR The list of restriction enzyme used to digest
the contigs separated by a comma. Example:
HpaII,MluCI.
-n, --no-clean-up If enabled intermediary files will be kept.
-O, --metator-dir=DIR Output directory from metator pipeline. If set
contigs data and bin summary will be found
automatically.
-o, --outdir=DIR Directory to save output plots and log.
-p, --prefix=STR Name of the sample to add on plot and files.
-P, --plot If enable display some plots.
-t, --threshold=STR Hicstuff religation and loop thresholds.
Two integers seperated by a coma.
-T, --tmpdir=DIR Temporary directory to save pairs files
"""
[docs] def execute(self):
# Defined the temporary directory.
if not self.args["--tmpdir"]:
self.args["--tmpdir"] = "./tmp"
tmp_dir = mio.generate_temp_dir(self.args["--tmpdir"])
# Set output directory.
if not self.args["--outdir"]:
if self.args["--prefix"]:
prefix = self.args["--prefix"]
self.args["--outdir"] = join(".", f"metator_qc_{prefix}")
else:
self.args["--outdir"] = join(".", "metator_qc")
os.makedirs(self.args["--outdir"], exist_ok=True)
# Enable file logging
now = time.strftime("%Y%m%d%H%M%S")
log_file = join(self.args["--outdir"], (f"metator_qc_{now}.log"))
generate_log_header(log_file, cmd="qc", args=self.args)
# Try to find input files.
if self.args["--metator-dir"]:
metator_dir = self.args["--metator-dir"]
else:
metator_dir = "."
if self.args["--bin-summary"]:
bin_summary_file = self.args["--bin-summary"]
else:
bin_summary_file = join(metator_dir, "bin_summary.txt")
if not exists(bin_summary_file):
logger.error(
"Please give a bin summary file, or the ouput directory of metator or launch this command from the output directory of metator."
)
logger.error(
"You can also check that a bin_summary.txt is present in metator output directory"
)
raise FileNotFoundError
if self.args["--contig-data"]:
contig_data_file = self.args["--contig-data"]
else:
contig_data_file = join(metator_dir, "contig_data_final.txt")
if not exists(contig_data_file):
logger.error(
"Please give a contig data file, or the ouput directory of metator or launch this command from the output directory of metator."
)
logger.error(
"You can also check that a contig_data_final.txt is present in metator output directory"
)
raise FileNotFoundError
# Set parameters.
if self.args["--prefix"]:
prefix = self.args["--prefix"]
else:
prefix = "metator_qc"
if self.args["--enzyme"]:
self.args["--enzyme"] = self.args["--enzyme"].split(",")
if self.args["--threshold"]:
self.args["--threshold"] = self.args["--threshold"].split(",")
# Launch quality check
mtq.quality_check(
contig_data_file,
bin_summary_file,
self.args["--assembly"],
self.args["<pairsfile>"],
self.args["--outdir"],
tmp_dir,
prefix,
self.args["--plot"],
enzyme=self.args["--enzyme"],
threshold=self.args["--threshold"],
)
# Delete the temporary folder.
if not self.args["--no-clean-up"]:
shutil.rmtree(tmp_dir)
generate_log_footer(log_file)
[docs]class Scaffold(AbstractCommand):
"""Scaffold a bin from metator.
Use the pairs to build a scaffold from the binned contigs. The scaffolding
is based on contact between the edges of the contigs. There are normalized
based on intra contacts of the contigs except for the small contigs where
a low score is given.
usage:
scaffold --bin-name=STR --input-fasta=FILE [--junctions=NNNNN]
[--out-fasta=FILE] [--out-frags=FILE] [--threads=1] [--threshold=0.05]
[--window-size=5000] <pairsfile>...
arguments:
pairsfile File(s) containing pairs information.
options:
-b, --bin-name=STR Name of the bin to scaffold.
-i, --input-fasta=FILE Path to the input fasta.
-j, --junctions=STR Sequences to use as junction between contigs.
[Default: NNNNN]
-o, --out-fasta=FILE Path to write the output fasta. Default in the
current directory: '"$bin_name"_scaffolded.fa'.
-O, --out-frags=FILE Path to write the fragments information. Default
in the current directory:
'"$bin_name"_info_frags.txt'.
-t, --threads=INT Numbers of threads to allocate if pairs need to
be sorted. [Default: 1]
-T, --threshold=FLOAT Threshold score to consider an association
between two contigs. [Default: 0.05]
-w, --window-size=INT Size of the window in base pair to use as edge
window to scaffold. [Default: 5000]
"""
[docs] def execute(self):
# Generate log
now = time.strftime("%Y%m%d%H%M%S")
log_file = f"metator_scaffold_{now}.log"
generate_log_header(log_file, cmd="scaffold", args=self.args)
# Create output files:
if self.args["--out-fasta"] is None:
out_fasta = join(".", f'{self.args["--bin-name"]}_scaffolded.fa')
else:
out_fasta = self.args["--out-fasta"]
if self.args["--out-frags"] is None:
out_info = join(".", f'{self.args["--bin-name"]}_info_frags.txt')
else:
out_info = self.args["--out-frags"]
# Get the scaffolds
mts.get_scaffolds(
self.args["--bin-name"],
self.args["--input-fasta"],
self.args["<pairsfile>"],
out_fasta,
out_info,
threshold=float(self.args["--threshold"]),
threads=int(self.args["--threads"]),
window_size=int(self.args["--window-size"]),
junctions=self.args["--junctions"],
)
generate_log_footer(log_file)
[docs]class Pairs(AbstractCommand):
"""Sort and pairs files for faster assess to the data.
Sort the pairs file using pairtools. Compress them using bgzip. Index them
using pairix.
usage:
pairs [--force] [--remove] [--threads=1] <pairsfile>...
arguments:
pairsfile File(s) containing pairs information.
options:
-F, --force Write files even if the output files already exists.
-r, --remove Remove the input file at the end to keep only the
sorted, compressed and indexed pairs file.
-t, --threads=INT Numbers of thread to allocate. [Default: 1]
"""
[docs] def execute(self):
# Generate log
now = time.strftime("%Y%m%d%H%M%S")
log_file = f"metator_pairs_{now}.log"
generate_log_header(log_file, cmd="pairs", args=self.args)
# Iterates on pairfiles given.
pairsfiles = self.args["<pairsfile>"]
for pairsfile in pairsfiles:
logger.info(f"Processing {pairsfile}...")
# Run the sort/compress/index command.
_ = mio.sort_pairs_pairtools(
pairsfile,
threads=int(self.args["--threads"]),
remove=self.args["--remove"],
force=self.args["--force"],
)
generate_log_footer(log_file)