Source code for snakecdysis.useful_function

import os
import urllib.request
import click
from tqdm import tqdm
from pathlib import Path
from warnings import warn
import subprocess
import sys


def __add_header(snake_installer):
    def inner(f):
        """
        replace_package_name on help.
        param f: function to decorate
        return: decorated function
        """
        click.secho(f"{snake_installer.description_tool}{snake_installer.get_epilog()}", fg='green')
        return f
    return inner


def __replace_package_name(snake_installer):
    def inner(f):
        """
        replace_package_name on help.
        param f: function to decorate
        return: decorated function
        """
        f.__doc__ = f"{f.__doc__.strip().replace('tool', snake_installer.soft_name).rstrip()}"
        return f
    return inner


[docs] def check_privileges(): """used to know if user have root access to reset package installation""" if not os.environ.get("SUDO_UID") and os.geteuid() != 0: click.secho(f"\n ERROR : You need to run -r, --restore with sudo privileges or as root\n", fg="red") else: return True
[docs] class DownloadProgressBar(tqdm): """Build progress bar on terminal for downloading url link""" def __init__(self, *args, **kargs): self.total = None super().__init__(*args, **kargs)
[docs] def update_to(self, b=1, bsize=1, tsize=None): if tsize is not None: self.total = tsize self.update(b * bsize - self.n)
[docs] def download_url(tuple_value): """download url to output path. url cans be http or oras, if oras use singularity or apptainer to pull url""" url, output_path = tuple_value import ssl ssl._create_default_https_context = ssl._create_unverified_context if url.startswith("http"): with DownloadProgressBar(unit='B', unit_scale=True, miniters=1, desc=url.split('/')[-1]) as t: urllib.request.urlretrieve(url, filename=output_path, reporthook=t.update_to) elif url.startswith("oras://"): cmd = f"singularity pull {output_path} {url}" click.secho(f"Download with command line: '{cmd}'") process = subprocess.run(cmd, shell=True, check=False) if int(process.returncode) >= 1: raise Exception(f"Error: Bad url '{url}', please check url") else: raise Exception(f"Error: Bad url '{url}', please use url start with 'http' or 'oras'")
[docs] def multiprocessing_download(urls_list, threads=2): """Used multiprocessing for download URL list on parallel""" from concurrent.futures import ThreadPoolExecutor from concurrent.futures import as_completed # create a thread pool with ThreadPoolExecutor(threads) as executor: # execute many tasks futures = {executor.submit(download_url, url): url for url in urls_list} # wait for all tasks to complete, or one task to fail for future in as_completed(futures): try: data = future.result() except Exception as exc: raise SystemExit(exc)
def __command_required_option_from_option(require_name, require_map): import click class CommandOptionRequiredClass(click.Command): def invoke(self, ctx): require = ctx.params[require_name] if require not in require_map: raise click.ClickException(click.style(f"Unexpected value for --'{require_name}': {require}\n", fg="red")) if ctx.params[require_map[require].lower()] is None: raise click.ClickException(click.style(f"With {require_name}={require} must specify option --{require_map[require]} path/to/modules\n", fg="red")) super(CommandOptionRequiredClass, self).invoke(ctx) return CommandOptionRequiredClass
[docs] def get_dir(path): """List of directory included on folder""" return [elm.name for elm in Path(path).glob("*") if elm.is_dir()]
[docs] def get_files_ext(path, extensions, add_ext=True): """List of files with specify extension included on folder Arguments: path (str): a path to folder extensions (list or tuple): a list or tuple of extension like (".py") add_ext (bool): if True (default), file have extension Returns: :class:`list`: List of files name with or without extension , with specify extension include on folder :class:`list`: List of all extension found """ warn('This method will be replace by get_files_ext2 on latest version.', DeprecationWarning, stacklevel=2) if not (extensions, (list, tuple)) or not extensions: raise ValueError(f'ERROR: "extensions" must be a list or tuple not "{type(extensions)}"') tmp_all_files = [] all_files = [] files_ext = [] for ext in extensions: tmp_all_files.extend(Path(path).glob(f"**/*{ext}")) for elm in tmp_all_files: ext = "".join(elm.suffixes) if ext not in files_ext: files_ext.append(ext) if add_ext: all_files.append(elm.as_posix()) else: if len(elm.suffixes) > 1: all_files.append(Path(elm.stem).stem) else: all_files.append(elm.stem) return all_files, files_ext
[docs] def get_files_ext2(path, extensions, add_ext=True): """List of files with specify extension included on folder Arguments: path (str): a path to folder extensions (list or tuple): a list or tuple of extension like (".py") add_ext (bool): if True (default), file have extension Returns: :class:`list`: List of files name with or without extension , with specify extension include on folder :class:`str`: Extension found :class:`boolean`: Boolean of gzip or not Raise: ValueError: if not files or all files doesn't have the same extension """ if not (extensions, (list, tuple)) or not extensions: raise ValueError(f'ERROR: "extensions" must be a list or tuple not "{type(extensions)}"') tmp_all_files = [] all_files = [] files_ext = [] files_gzip = False file_type_ext = None for ext in extensions: tmp_all_files.extend(Path(path).glob(f"**/*{ext}")) for elm in tmp_all_files: ext = "".join(elm.suffixes) if ext not in files_ext: files_ext.append(ext) if add_ext: all_files.append(elm.as_posix()) else: if len(elm.suffixes) > 1: all_files.append(Path(elm.stem).stem) else: all_files.append(elm.stem) # check if files if not all_files: raise ValueError( f"CONFIG FILE CHECKING FAIL : you need to append at least on fasta with extension on {extensions}") # check if all fastq have the same extension if len(files_ext) > 1: raise ValueError( f"CONFIG FILE CHECKING FAIL : Please use only the same format for all data, mixed of: {files_ext}") else: file_type_ext = files_ext[0] # check if files are gzip if "gz" in file_type_ext: files_gzip = True return all_files, file_type_ext, files_gzip
[docs] def var_2_bool(key, tool=None, to_convert=None): """convert to boolean""" if isinstance(type(to_convert), bool): return to_convert elif f"{to_convert}".lower() in ("yes", "true", "t"): return True elif f"{to_convert}".lower() in ("no", "false", "f"): return False else: raise TypeError( f'CONFIG FILE CHECKING FAIL : in the "{key}" section, "{tool}" key: "{to_convert}" is not a valid boolean')
[docs] def convert_genome_size(size): import re mult = dict(K=10 ** 3, M=10 ** 6, G=10 ** 9, T=10 ** 12, N=1) search = re.search(r'^(\d+\.?\d*)\s*(.*)$', size) if not search or len(search.groups()) != 2: raise ValueError( f"CONFIG FILE CHECKING FAIL : not able to convert genome size please only use int value with{' '.join(mult.keys())} upper or lower, N or empty is bp size") else: value, unit = search.groups() if not unit: return int(value) elif unit and unit.upper() not in mult.keys(): raise ValueError( f"CONFIG FILE CHECKING FAIL : '{unit}' unit value not allow or not able to convert genome size please only use int value with{' '.join(mult.keys())} upper or lower, N or empty is bp size") else: return int(float(value) * mult[unit.upper()])
[docs] def sort_human(in_list, _nsre=None): """ Sort a :class:`list` with alpha/digit on the way that humans expect,\n use list.sort(key=sort_human) or\n sorted(list, key=sort_human)). Arguments: in_list (:obj:`list`): a python :class:`list` _nsre (:obj:`re.compil`, optional): re expression use for compare , defaults re.compile('([0-9]+)' Returns: list: sorted with human sort number Example: >>> list_to_sorted = ["something1","something32","something17","something2","something29","something24"] >>> print(sorted(list_to_sorted, key=sort_human)) ['something1', 'something2', 'something17', 'something24', 'something29', 'something32'] >>> list_to_sorted.sort(key=sort_human) >>> print(list_to_sorted) ['something1', 'something2', 'something17', 'something24', 'something29', 'something32'] """ from warnings import warn import re if not _nsre: _nsre = re.compile('([0-9]+)') try: return [int(text) if text.isdigit() else f"{text}".lower() for text in re.split(_nsre, in_list)] except TypeError: if not isinstance(in_list, int): warn( f"Yoda_powers::sort_human : element '{in_list}' on the list not understand so don't sort this element\n", SyntaxWarning, stacklevel=2) return in_list