""" pyznap.utils ~~~~~~~~~~~~~~ Helper functions. :copyright: (c) 2018-2019 by Yannick Boetzel. :license: GPLv3, see LICENSE for more details. """ import glob import logging import os import re from configparser import ( ConfigParser, DuplicateOptionError, DuplicateSectionError, MissingSectionHeaderError, NoOptionError, ) from subprocess import PIPE, CalledProcessError, TimeoutExpired from process import run from ssh import SSHException SNAPSHOT_TYPES = ("frequent", "hourly", "daily", "weekly", "monthly", "yearly") def exists(executable="", ssh=None): """Tests if an executable exists on the system. Parameters: ---------- executable : {str}, optional Name of the executable to test (the default is an empty string) ssh : {SSH}, optional Open ssh connection (the default is None, which means check is done locally) Returns ------- bool True if executable exists, False if not """ logger = logging.getLogger(__name__) name_log = "{:s}@{:s}".format(ssh.user, ssh.host) if ssh else "localhost" cmd = ["which", executable] try: retcode = run( cmd, stdout=PIPE, stderr=PIPE, timeout=5, universal_newlines=True, ssh=ssh ).returncode except (TimeoutExpired, SSHException) as err: logger.error( "Error while checking if {:s} exists on {:s}: '{}'".format( executable, name_log, err ) ) return False return not bool(retcode) # return False if retcode != 0 def read_config(path): """Reads a config file and outputs a list of dicts with the given snapshot strategy. Parameters: ---------- path : {str} Path to the config file Raises ------ FileNotFoundError If path does not exist Returns ------- list of dict Full config list containing all strategies for different filesystems """ logger = logging.getLogger(__name__) if os.path.isfile(path): cfgfiles = path else: cfgfiles = glob.glob(os.path.expanduser(path)) if cfgfiles == []: logger.error( "Error while loading config: File {:s} does not exist.".format(path) ) return None parser = ConfigParser() try: parser.read(cfgfiles) # files = parser.read(cfgfiles) # logger.info("Parsed configs: " + str(files)) except ( MissingSectionHeaderError, DuplicateSectionError, DuplicateOptionError, ) as e: logger.error("Error while loading config: {}".format(e)) return None config = [] options = [ "key", "snap", "clean", "dest", "dest_keys", "compress", "exclude", "raw_send", "resume", "dest_auto_create", "retries", "retry_interval", "ignore_not_existing", "send_last_snapshot", "max_depth", "snap_exclude_property", "send_exclude_property", ] options += list(SNAPSHOT_TYPES) for section in parser.sections(): dic = {} config.append(dic) dic["name"] = "" if section == "//" else section for option in options: try: value = parser.get(section, option) except NoOptionError: dic[option] = None else: if option in ["key"]: dic[option] = value if os.path.isfile(value) else None elif option in SNAPSHOT_TYPES: dic[option] = int(value) elif option in ["max_depth"]: dic[option] = int(value) if value and value != "no" else -1 elif option in ["snap", "clean", "ignore_not_existing"]: dic[option] = {"yes": True, "no": False}.get(value.lower(), None) elif option in ["snap_exclude_property", "send_exclude_property"]: dic[option] = value.strip() if value.strip() else False elif option in ["dest", "compress", "send_last_snapshot"]: dic[option] = [i.strip() for i in value.split(",")] elif option in ["dest_keys"]: dic[option] = [ i.strip() if os.path.isfile(i.strip()) else None for i in value.split(",") ] elif option in ["exclude"]: dic[option] = [ [i.strip() for i in s.strip().split(" ")] if s.strip() else None for s in value.split(",") ] elif option in ["raw_send", "resume", "dest_auto_create"]: dic[option] = [ {"yes": True, "no": False}.get(i.strip().lower(), None) for i in value.split(",") ] elif option in ["retries", "retry_interval"]: dic[option] = [int(i) for i in value.split(",")] # Sort by pathname - must be before propagation config = sorted(config, key=lambda entry: entry["name"].split("/")) # Find closest parent for child in config: child["_parent"] = None for parent in reversed(config): if child["name"].startswith(parent["name"] + "/") or ( parent["name"] == "" and parent["name"] != child["name"] ): child["_parent"] = parent["name"] break # Pass through values recursively for parent in config: for child in config: if parent["name"] == child["_parent"]: child_parent = "/".join( child["name"].split("/")[:-1] ) # get parent of child filesystem if child_parent.startswith(parent["name"]): for option in [ "key", "snap", "clean", "ignore_not_existing", "send_last_snapshot", "max_depth", "snap_exclude_property", "send_exclude_property", ] + list(SNAPSHOT_TYPES): child[option] = ( child[option] if child[option] is not None else parent[option] ) return config def parse_name(value): """Splits a string of the form 'ssh:port:user@host:rpool/data' into its parts separated by ':'. Parameters: ---------- value : {str} String to split up Returns ------- (str, str, str, str, int) Tuple containing the different parts of the string """ if value.startswith("ssh"): _type, port, host, fsname = value.split(":", maxsplit=3) port = int(port) if port else 22 user, host = host.split("@", maxsplit=1) else: _type, user, host, port = "local", None, None, None fsname = value return _type, fsname, user, host, port def check_recv(fsname, ssh=None): """Checks if there is already a 'zfs receive' for that dataset ongoing Parameters ---------- fsname : str Name of the dataset ssh : SSH, optional Open ssh connection (the default is None, which means check is done locally) Returns ------- bool True if there is a 'zfs receive' ongoing or if an error is raised during checking. False if there is no 'zfs receive'. """ logger = logging.getLogger(__name__) fsname_log = "{:s}@{:s}:{:s}".format(ssh.user, ssh.host, fsname) if ssh else fsname try: out = run( ["ps", "-Ao", "args="], stdout=PIPE, stderr=PIPE, timeout=5, universal_newlines=True, ssh=ssh, ).stdout except (TimeoutExpired, SSHException) as err: logger.error( "Error while checking 'zfs receive' on {:s}: '{}'".format(fsname_log, err) ) return True except CalledProcessError as err: logger.error( "Error while checking 'zfs receive' on {:s}: '{:s}'".format( fsname_log, err.stderr.rstrip() ) ) return True else: match = re.search(r"zfs (receive|recv).*({:s})(?=\n)".format(fsname), out) if match: logger.error( "Cannot send to {:s}, process '{:s}' already running".format( fsname_log, match.group() ) ) return True return False def bytes_fmt(num): """Converts bytes to a human readable format Parameters ---------- num : int,float Number of bytes Returns ------- float Human readable format with binary prefixes """ for x in ["B", "K", "M", "G", "T", "P", "E", "Z"]: if num < 1024: return "{:3.1f}{:s}".format(num, x) num /= 1024 else: return "{:3.1f}{:s}".format(num, "Y")