Source code for snakecdysis.snake_wrapper

import pandas as pd
import yaml
import pprint
import os
import re
import subprocess
import click
from pathlib import Path
from shutil import rmtree
from collections import OrderedDict
from click import progressbar, secho
from snakemake.common.configfile import load_configfile
from .useful_function import multiprocessing_download


[docs] class SnakeInstaller: """ This class is used to install Snakemake workflows. """ def __init__(self, soft_path=None, url=None, docs=None, description_tool=None, apptainer_url_files=None, datatest_url_files=None, **kargs): """ Initialize SnakeInstaller. Args: - soft_path (str): The path of the wrapped workflow installation. - url (str): URL of versioning repository (GitHub or GitLab). - docs (str): URL of documentation. - description_tool (str): The header printed on the terminal when running the program. Please include string values 'VERSION', 'GIT_URL', and 'DOCS', and the wrapper automatically replaces them with the correct values. - apptainer_url_files (list(tuple())): List of tuples with downloaded URL and install destination with INSTALL_PATH, like INSTALL_PATH/containers/Apptainer.CulebrONT_tools.sif. - datatest_url_files (tuple): Tuple with 2 values, first the URL of datatest, second download name. - snakefile (str): Path to the main Snakemake script file (default: snakefiles/snakefile). - snakemake_scripts (str): Path to the main Snakemake script file (default: snakefiles/snakefile). - default_profile (str): Path to create the cluster 'default_profile' directory (default: default_profile/). - git_configfile_path (str): Path to the default configfile YAML (default: install_files/configfile.yaml). - git_tools_path (str): Path to the default tools config YAML (default: install_files/tools_path.yaml). - git_profile_config (str): Path to the default profile config file (default: install_files/config.yaml). - tools_version_path (str): Path to the csv with soft and command to get version (default: snakemake_scripts/report_template/versions.csv). - slurm_mode_choice (list): List of snakemake value for option --software-deployment-method (default: ['env-modules', 'apptainer', 'conda', "env-modules, conda"]). - local_mode_choice (list): List of snakemake value for option --software-deployment-method (default: ['apptainer']). - kargs: Additional keyword arguments. """ self.tools_config = None self.install_path = soft_path self.home_config_path = Path(f"~/.config/{self.soft_name}-snake8/").expanduser() self.git_url = url self.docs = docs self.description_tool = description_tool self.apptainer_url_files = apptainer_url_files self.datatest_url_files = datatest_url_files self.dico_args = kargs self._latest_version = None self._lasted_file = None self.slurm_mode_choice = ['env-modules', 'apptainer', 'conda', "env-modules, conda"] self.local_mode_choice = ['apptainer'] # autobuild attributs self.install_mode_file = self.install_path.joinpath(".mode.txt") self.bash_completion_script = Path(f"{self.install_path}/{self.soft_name}-complete.sh") # Snakemake settings self.snakefile = self.install_path.joinpath("snakefiles", "snakefile") if "snakefile" not in self.dico_args else \ self.dico_args["snakefile"] self.snakemake_scripts = self.install_path.joinpath("scripts", "snakemake_only") if "snakemake_scripts" not in self.dico_args else \ self.dico_args["snakemake_scripts"] self.default_profile = self.install_path.joinpath( "default_profile") if "default_profile" not in self.dico_args else self.dico_args["default_profile"] self.git_configfile_path = self.install_path.joinpath("install_files", "configfile.yaml") if "git_configfile_path" not in self.dico_args else \ self.dico_args["git_configfile_path"] # Tools settings self.git_tools_path = self.install_path.joinpath("install_files", "tools_path.yaml") if "git_tools_path" not in self.dico_args else \ self.dico_args["git_tools_path"] self.tools_version_path = self.snakemake_scripts.joinpath("report_template", "versions.csv") if "tools_version_path" not in self.dico_args else \ self.dico_args["tools_version_path"] # cluster settings self.git_profile_config = self.install_path.joinpath("install_files", "config.yaml") if "git_profile_config" not in self.dico_args else \ self.dico_args["git_profile_config"] self.slurm_mode_choice = self.slurm_mode_choice if "slurm_mode_choice" not in self.dico_args else \ self.dico_args["slurm_mode_choice"] self.local_mode_choice = self.local_mode_choice if "local_mode_choice" not in self.dico_args else \ self.dico_args["local_mode_choice"] self._lasted_file = self.user_tools_path.parent.joinpath('.latest.txt') @property def soft_name(self) -> str: """The wrapped workflow name.""" return self._install_path.stem @property def install_path(self) -> Path: """The path of wrapped workflow installation.""" return Path(self._install_path) @install_path.setter def install_path(self, path: Path = None): """ Set the installation path. Args: - path (Path): The installation path. """ if not path: raise ValueError("ERROR 'soft_path' is empty but mandatory.") elif not Path(path).exists(): raise NotADirectoryError(f"ERROR 'soft_path', the path '{path}' doesn't exist") elif not Path(path).is_absolute(): raise ValueError(f"ERROR 'soft_path', the path '{path}' is not on absolute path") else: self._install_path = Path(path) @property def git_url(self) -> str: """Url of versioning repository (GitHub or GitLab)""" return self._git_url @git_url.setter def git_url(self, url) -> str: if not url or not isinstance(url, str): raise ValueError("ERROR 'url' is empty or not a string value.") self._git_url = url @property def docs(self) -> str: """Url of documentation""" return self._docs @docs.setter def docs(self, docs) -> str: if not docs or not isinstance(docs, str): raise ValueError("ERROR 'docs' is empty or not a string value.") self._docs = docs @property def description_tool(self) -> str: """The header print on terminal when run programme Please add string values 'VERSION', 'GIT_URL' and 'DOCS' and wrapper automatically replace by the good values """ return self._description_tool @description_tool.setter def description_tool(self, description_tool) -> str: if not description_tool or not isinstance(description_tool, str): raise ValueError("ERROR key 'description_tool' is empty or not a string value on 'dico_tool'.") self._description_tool = description_tool.replace("VERSION", self.version).replace("GIT_URL", self.git_url).replace("DOCS", self.docs) @property def apptainer_url_files(self) -> list: """List of tuple with downloaded url and install destination with INSTALL_PATH. like INSTALL_PATH/containers/Apptainer.CulebrONT_tools.sif""" return self._apptainer_url_files @apptainer_url_files.setter def apptainer_url_files(self, list_url): if not list_url: raise AttributeError("ERROR 'apptainer_url_files' is empty but mandatory") if not isinstance(list_url, list): raise ValueError(f"ERROR 'apptainer_url_files' must be a list of tuple but is {type(list_url)}") self._apptainer_url_files = [(url, path_install.replace("INSTALL_PATH", self.install_path.as_posix())) for url, path_install in list_url] @property def datatest_url_files(self) -> tuple: """Tuple with 2 values, first the url of datatest, second download name.""" return self._datatest_url_files @datatest_url_files.setter def datatest_url_files(self, tuple_test): if not isinstance(tuple_test, tuple) or not len(tuple_test) == 2: raise ValueError("ERROR 'datatest_url_files' must be a tuple of 2 values") self._datatest_url_files = tuple_test @property def install_mode(self): """Detect install mode of the soft, can be 'No install', 'local' or 'cluster'""" if not self.install_mode_file.exists(): self._install_mode = "No install" else: self._install_mode = self.install_mode_file.open("r").readline().strip() return self._install_mode @property def tools_mode(self): """Detect install mode of the tools soft, can be 'env-modules', 'apptainer', 'conda' or 'env-modules, conda'""" if not self.install_mode_file.exists(): self._tools_mode = "No install" else: try: self._tools_mode = self.install_mode_file.open("r").readlines()[1].strip() except IndexError: self._tools_mode = f"""Not avail, please update Snakecdysis with 'python3 -m pip install snakecdysis>=1.0.0' \t\tand then reinstall {self.soft_name} with \t\t'{self.soft_name} -r && {self.soft_name} install'""" return self._tools_mode @property def version(self): """The current workflow version, read on VERSION file""" from importlib.metadata import version return version(self.soft_name) @property def snakecdysis_version(self): """The current workflow version, read on VERSION file""" from snakecdysis import __version__ return __version__ @property def latest_version(self): """The latest workflow version, read on repository""" self.__update_latest_file() return self._latest_version # SNAKEMAKE SETTINGS @property def snakefile(self) -> Path: """Path to the main snakemake file. Search on INSTALL_PATH/snakefiles/snakefile """ return self._snakefile @snakefile.setter def snakefile(self, snake_path): if not Path(snake_path).exists(): raise FileNotFoundError(f"ERROR the snakefile '{snake_path}' doesn't exist") self._snakefile = Path(snake_path) @property def snakemake_scripts(self) -> Path: """Path to the scripts used on snakemake rules. Search on INSTALL_PATH/snakemake_scripts """ return self._snakemake_scripts @snakemake_scripts.setter def snakemake_scripts(self, script_path) -> Path: if not Path(script_path).exists(): raise FileNotFoundError(f"ERROR the snakemake_scripts '{script_path}' doesn't exist") self._snakemake_scripts = Path(script_path) @property def default_profile(self) -> Path: """Path to the directory of config.yaml default profile. install on INSTALL_PATH/default_profile """ return self._default_profile @default_profile.setter def default_profile(self, default_profile) -> Path: # if not Path(default_profile).exists(): # raise NotADirectoryError(f"ERROR the default_profile '{default_profile}' doesn't exist") self._default_profile = Path(default_profile) @property def get_active_profile(self): """return the Path to current profile config""" if self.user_profile_config.exists(): return self.user_profile_config.parent elif self.default_profile.exists(): return self.default_profile else: return "No install" @property def git_configfile_path(self) -> Path: """Path to the directory of default configfile.yaml file. default to INSTALL_PATH/install_files/configfile.yaml """ return self._git_configfile_path @git_configfile_path.setter def git_configfile_path(self, git_configfile_path) -> Path: if not Path(git_configfile_path).exists(): raise FileNotFoundError(f"ERROR the git_configfile_path '{git_configfile_path}' doesn't exist") self._git_configfile_path = Path(git_configfile_path) @property def git_tools_path(self) -> Path: """Path to the directory of default tools_path.yaml file. default to INSTALL_PATH/install_files/tools_path.yaml """ return self._git_tools_path @git_tools_path.setter def git_tools_path(self, git_tools_path) -> Path: if not Path(git_tools_path).exists(): raise FileNotFoundError(f"ERROR the git_tools_path '{git_tools_path}' doesn't exist") self._git_tools_path = Path(git_tools_path) @property def user_tools_path(self) -> Path: """Path to the user tools path setting. default to ~/.config/SOFTNAME/tools_path.yaml """ return self.home_config_path.joinpath("tools_path.yaml") @property def user_profile_config(self) -> Path: """Path to the user profile config path setting. default to ~/.config/SOFTNAME/config.yaml """ return self.home_config_path.joinpath("config.yaml")
[docs] def get_last_version(self) -> str: """Function for know the last version of program (can be GitHub or GitLab repository) """ from urllib.request import urlopen from re import search try: if "github" in self.git_url: HTML = urlopen(f"{self.git_url}/tags").read().decode('utf-8') str_search = f"{self.git_url.replace('https://github.com', '')}/releases/tag/.*" last_release = search(str_search, HTML).group(0).split('"')[0].split("/")[-1] self._latest_version = last_release else: from gitlab import Gitlab project_name_with_namespace = "/".join(self.git_url.split("//")[1].split("/")[1:]) # print(project_name_with_namespace) gl = Gitlab(self.git_url.replace(f'/{project_name_with_namespace}', "")) project = gl.projects.get(f"{project_name_with_namespace}", lazy=True) releases_list = project.releases.list() if not releases_list: last_release = "There aren’t any releases" else: last_release = releases_list[0].name self._latest_version = last_release except Exception as e: print(e) if "403" in f"{e}": secho(f"\n Forbidden access to release version, please check setting on repository\n\n", fg="red") last_release = "Forbidden access" self._latest_version = last_release elif "404" in f"{e}": secho(f"\n Project Not Found! please check setting URL:{self.git_url}\n\n", fg="red") last_release = "Project Not Found" self._latest_version = last_release else: secho(f"\n Unable to check release:{e}\n\n", fg="red") last_release = "Unable" self._latest_version = last_release
def __update_latest_file(self, delta_days=30): from datetime import datetime, timedelta if not Path(self._lasted_file).exists(): self.get_last_version() self._lasted_file.parent.mkdir(exist_ok=True,parents=True) self._lasted_file.open("w").write(self._latest_version) else: # get modification time file_mod_time = datetime.fromtimestamp(self._lasted_file.stat().st_mtime) # This is a datetime.datetime object! now = datetime.today() max_delay = timedelta(days=delta_days) if now - file_mod_time > max_delay: #print(f"CRITICAL:last modified on {file_mod_time}. Threshold set to {max_delay} minutes.") self.get_last_version() self._lasted_file.open("w").write(self._latest_version) elif self._lasted_file.exists(): self._latest_version = self._lasted_file.open("r").read().strip()
[docs] def get_epilog(self) -> str: """Function for know the last version of program (can be GitHub or GitLab repository) check every 30 days to skip request of if no internet connection Return: epilogTool print at the end of header """ self.__update_latest_file() epilogTools = "\n" if str(self.version) != self._latest_version: if self._latest_version < str(self.version): epilogTools = click.style( f"\n ** NOTE: This {self.soft_name} version ({self.version}) is higher than the production version ({self._latest_version}), you are using a dev version\n", fg="yellow", bold=True) elif self._latest_version > str(self.version) and self._latest_version != "There aren’t any releases": epilogTools = click.style( f"\n ** NOTE: The Latest version of {self.soft_name} {self._latest_version} is available at {self.git_url}\n", fg="yellow", underline=True) elif self._latest_version == "There aren’t any releases": epilogTools = click.style(f"\n ** NOTE: There aren’t any releases at the moment\n", fg="red", underline=False) else: epilogTools = click.style(f"\n ** NOTE: Can't check if new release are available\n", fg="red", underline=False) return epilogTools
[docs] def create_bash_completion(self): """Add bash completion for version > 4.4""" major, minor = None, None bashrc_file = Path("~/.bashrc").expanduser().as_posix() # test shell version for fix correction output = subprocess.run( ["bash", "-c", "echo ${BASH_VERSION}"], stdout=subprocess.PIPE ) match = re.search(r"^(\d+)\.(\d+)\.\d+", output.stdout.decode()) if match is not None: major, minor = match.groups() # if major < "4" or (major == "4" and minor < "4"): # click.secho(f"\n WARNING Shell completion is not supported for Bash versions older than 4.4.\n\n", # fg="red", nl=False) # Test if completion script exist if not self.bash_completion_script.exists(): build_completion = f"_{self.soft_name.upper()}_COMPLETE=bash_source {self.soft_name} > {self.bash_completion_script.as_posix()}" process = subprocess.run(build_completion, shell=True, check=False, stdout=False, stderr=False) # fix header print with self.bash_completion_script.open("r") as bash_soft_open: new_file = "" start = False for line in bash_soft_open: if f"_{self.soft_name}_completion" in line: start = True if start: if "complete -o nosort" in line and major < "4" or (major == "4" and minor < "4"): line = line.replace("-o nosort", "") new_file += line with self.bash_completion_script.open("w") as bash_soft_open: bash_soft_open.write(new_file) # add on bashrc path_bash = None with open(bashrc_file, "r") as bash_file_read: for line in bash_file_read: if f"{self.soft_name.upper()}" in line: path_bash = bash_file_read.readline().strip() if path_bash: load = f"{path_bash[2:]}" if f"{self.bash_completion_script}" != load: click.secho( f"\n WARNING autocompletion for {self.soft_name.upper()} already found on {bashrc_file}, with other path please fix the good:", fg="red", nl=False) click.secho( f"\n Load on bashrc: {load}\n New install: {self.bash_completion_script.as_posix()}", fg='bright_red') else: with open(bashrc_file, "a") as bash_file_open: append_bashrc = f"\n#Add autocompletion for {self.soft_name.upper()}\n. {self.bash_completion_script.as_posix()}" bash_file_open.write(append_bashrc) click.secho(f"\n INSTALL autocompletion for {self.soft_name.upper()} on {bashrc_file} with command {append_bashrc}", fg="yellow") click.secho(f"\n TODO: enable completion with command:\n source ~/.bashrc", fg="cyan")
[docs] def clean_home(self): """Reset home's parameters of previous installation""" if self.home_config_path.exists(): rmtree(self.home_config_path.as_posix()) if self.default_profile.exists(): rmtree(self.default_profile.as_posix()) if self.bash_completion_script.exists(): self.bash_completion_script.unlink(missing_ok=True)
[docs] def fail(self): """If installation fail, reset already install files""" rmtree(self.default_profile, ignore_errors=True) self.install_mode_file.unlink(missing_ok=True) self.clean_home() click.secho(f"\n INSTALL FAIL remove already install files: {self.default_profile} ", fg="red", err=True) raise SystemExit
[docs] def write_user_tools_path(self): """Check if file is created and then write with modification of apptainer path""" if not self.user_tools_path.exists(): self.user_tools_path.parent.mkdir(parents=True, exist_ok=True) self.user_tools_path.write_text( self.git_tools_path.read_text().replace("INSTALL_PATH", self.install_path.as_posix()))
[docs] def check_and_download_apptainer(self): """Download apptainer/apptainer files if provided""" # check if already download if true pop from list self.write_user_tools_path() SINGULARITY_URL_FILES_DOWNLOAD = [] for url, path_install in self.apptainer_url_files: if not Path(path_install).exists(): Path(path_install).parent.mkdir(parents=True, exist_ok=True) SINGULARITY_URL_FILES_DOWNLOAD.append((url, path_install)) click.secho(f" File: {path_install} is being downloaded.", fg="yellow", nl=True) else: click.secho(f" File: {path_install} already downloaded, done.", fg="yellow", nl=True) multiprocessing_download(SINGULARITY_URL_FILES_DOWNLOAD) click.secho( f"\n WARNING please check if binding is active on your apptainer/apptainer configuration, see https://apptainer.org/docs/user/main/bind_paths_and_mounts.html\n" f" Or use: --apptainer-args '--bind $HOME' on the command line", fg="bright_red")
@property def get_active_tools_path(self): """return the active path of tools config""" if self.user_tools_path.exists(): return self.user_tools_path else: return self.git_tools_path
[docs] def get_tool_configfile(self): """Test path of tools_path.yaml on default install path, home or argument""" self.tools_config = load_configfile(self.get_active_tools_path)
[docs] def tools_version_to_df(self, csv_file=None, active_tools_list=None, output_file=None): """check how tools install and get version to save on file""" if not csv_file: click.secho("\n CODE error, please provide file or active_tools_list", fg="red", err=True) raise SystemExit self.get_tool_configfile() run_all = False if not active_tools_list: run_all = True df = pd.read_csv(csv_file) df.fillna('', inplace=True) df.sort_values(by=df.columns[0], axis=0, inplace=True) df.reset_index(inplace=True,drop=True) dico_version_cmd = df.to_dict(orient="index") dico_version_value = df.drop(columns=[df.columns[2]]).to_dict(orient="index") try: with progressbar(dico_version_cmd.items()) as bar: for index, dico in bar: tool = dico[df.columns[0]] secondary = dico[df.columns[1]] cmd_version = dico[df.columns[2]] #print(f"\n{tool}, {secondary} {type(secondary)}, {cmd_version}\n") if run_all: active_tools_list.append(tool) active_tools_list.append(secondary) if tool in active_tools_list or secondary in active_tools_list: if self.tools_mode == "apptainer": sif_path = self.tools_config["APPTAINER"]["TOOLS"] run_version_cmd = f"apptainer exec {sif_path} {cmd_version}" elif self.tools_mode == "env-modules": tools_module = self.tools_config["ENV-MODULES"] if secondary and secondary.upper() in tools_module.keys(): run_version_cmd = f"module purge && module load {tools_module[secondary.upper()]} && {cmd_version}" else: run_version_cmd = f"module purge && module load {tools_module[tool.upper()]} && {cmd_version}" process = subprocess.run(run_version_cmd, shell=True, check=False, capture_output=True, encoding="UTF-8") if int(process.returncode) >= 1: raise SystemExit(f"Error with command line:\n'{run_version_cmd}'\n'{process.stderr.rstrip()=}'\n{process.returncode=}\n{process.stdout.rstrip()=}") else: dico_version_value[index]["Version"] = process.stdout.strip() else: dico_version_value.pop(index) except UnboundLocalError as e: raise SystemExit(f"Error getting mode tools:\n'{self.tools_mode=}\n{self.install_path=}\n{self.install_mode_file=}\n{self.install_mode=}\n") except KeyError as e: raise SystemExit(f"Error getting Tool '{e}\n, not on avail list: {tools_module.keys()}\n") df_version = pd.DataFrame.from_dict(dico_version_value, orient="index") desired_order = [df.columns[0], df.columns[1], 'Version', df.columns[3]] df_version = df_version[desired_order] if output_file: # check parent dir Path(output_file).parent.mkdir(exist_ok=True, parents=True) with open(output_file, "w") as out: df_version.to_csv(out, sep=",", index=False) return df_version
def __repr__(self): return f"{self.__dict__}"
[docs] class SnakEcdysis(SnakeInstaller): """ test generic wrapper class Args: soft_path (str): The path of wrapped workflow installation url (str): Url of versioning repository (GitHub or GitLab) docs (str): Url of documentation description_tool (str):The header print on terminal when run programme. Please add string values 'VERSION', 'GIT_URL' and 'DOCS' and wrapper automatically replace by th good values apptainer_ur_files (list(tuple()): List of tuple with downloaded url and install destination with INSTALL_PATH. like INSTALL_PATH/containers/Apptainer.CulebrONT_tools.sif datatest_url_files (tuple): Tuple with 2 values, first the url of datatest, second download name. """ def __init__(self, dico_tool=None, workflow=None, config=None, **kargs): if dico_tool: super().__init__(**dico_tool) else: super().__init__(**kargs) # workflow is available only in __init__ # print("\n".join(list(workflow.__dict__.keys()))) # print(workflow.__dict__) self.tools_config = None if workflow.overwrite_configfiles: self.path_config = workflow.overwrite_configfiles[0] else: self.path_config = None self.config = config self.get_tool_configfile()
[docs] def get_config_value(self, level1, level2=None, level3=None): """get value on config_file""" if level3: return self.config[level1][level2][level3] elif level2: return self.config[level1][level2] else: return self.config[level1]
[docs] def set_config_value(self, level1, level2=None, value=None, level3=None): """Set config value on config_file""" if level3: self.config[level1][level2][level3] = value elif level2: self.config[level1][level2] = value else: self.config[level1] = value
[docs] def write_config(self, path): """Write the corrected config file to path""" p = Path(path).parent p.mkdir(parents=True, exist_ok=True) with open(path, "w") as config_open: config_open.write(self.export_use_yaml)
[docs] def check_dir_or_string(self, level1, level2, mandatory=(), level3=None, check_string=False): """Check if the specified directory or string exists and is valid in the configuration. Parameters: - level1 (str): First level in the configuration hierarchy. - level2 (str): Second level in the configuration hierarchy. - mandatory (tuple): Tuple of mandatory items for the tool. - level3 (str): Optional third level in the configuration hierarchy. - check_string (bool): Flag to indicate whether to check for a string value. Raises: - NotADirectoryError: If the specified path does not exist or is not a valid directory. - ValueError: If the path is empty when it is expected to be a string. """ path_value = self.get_config_value(level1=level1, level2=level2, level3=level3) if path_value: path = Path(path_value).resolve().as_posix() + "/" # it is a path if path_value != "" and "/" in path_value: if (not Path(path).exists() or not Path(path).is_dir()) and level2 not in ["OUTPUT"]: raise NotADirectoryError( f'CONFIG FILE CHECKING FAIL : in section:{level1}, {f"subsection:{level2} directory:{level3}" if level3 else f"directory:{level2}"}, "{path}" {"does not exist" if not Path(path).exists() else "is not a valid directory"}') else: self.set_config_value(level1=level1, level2=level2, level3=level3, value=path) # it is not a path elif path_value and "/" not in path_value and check_string: self.set_config_value(level1=level1, level2=level2, level3=level3, value=path_value) # it is empty elif path_value and "/" not in path_value and check_string: raise ValueError( f'CONFIG FILE CHECKING FAIL : in section:{level1}, {f"subsection:{level2} value:{level3}" if level3 else f"value:{level2}"}, "{path_value}" is empty') elif len(mandatory) > 0: raise NotADirectoryError( f'CONFIG FILE CHECKING FAIL : in section:{level1}, {f"subsection:{level2} directory:{level3}" if level3 else f"directory:{level2}"}, "{path_value}" {"does not exist" if not Path(path_value).exists() else "is not a valid directory"} but is mandatory for tool: "{",".join(mandatory)}"')
[docs] def check_file_or_string(self, level1, level2, mandatory=(), level3=None, check_string=False): """Check if path is a file if not empty :return absolute path file""" path_value = self.get_config_value(level1=level1, level2=level2, level3=level3) path = Path(path_value).resolve().as_posix() # it is a path if path_value != "" and "/" in path_value: if not Path(path).exists() or not Path(path).is_file(): raise FileNotFoundError( f'CONFIG FILE CHECKING FAIL : in section:{level1}, {f"subsection:{level2}, file {level3}" if level3 else f"file {level2}"}, "{path}" {"does not exist" if not Path(path).exists() else "is not a valid file"}') else: self.set_config_value(level1=level1, level2=level2, level3=level3, value=path) # it is not a path elif path_value != "" and "/" not in path_value and check_string: self.set_config_value(level1=level1, level2=level2, level3=level3, value=path_value) # it is empty elif path_value == "" and "/" not in path_value and check_string: raise ValueError( f'CONFIG FILE CHECKING FAIL : in section:{level1}, {f"subsection:{level2}, value {level3}" if level3 else f"value:{level2}"}, "{path_value}" is empty') elif len(mandatory) > 0: raise FileNotFoundError( f'CONFIG FILE CHECKING FAIL : in section:{level1} , {f"subsection:{level2}, value {level3}" if level3 else f"value:{level2}"}, "{path_value}" {"does not exist" if not Path(path_value).exists() else "is not a valid file"} but is mandatory for tool: "{",".join(mandatory)}"')
@property def export_use_yaml(self): """Use to print a dump config.yaml with corrected parameters""" def represent_dictionary_order(yamldef, dict_data): return yamldef.represent_mapping('tag:yaml.org,2002:map', dict_data.items()) def setup_yaml(): yaml.add_representer(OrderedDict, represent_dictionary_order) setup_yaml() return yaml.dump(self.config, default_flow_style=False, sort_keys=False, indent=4) @property def string_to_dag(self): """ return command line for rule graph """ return f"""snakemake -s {self.snakefile} --profile {self.get_active_profile} --rulegraph""" def __repr__(self): return f"{self.__class__}({pprint.pprint(self.__dict__)})"