""" This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program.  If not, see . Copyright © 2019 Cloud Linux Software Inc. This software is also available under ImunifyAV commercial license, see """ import asyncio import base64 import logging import os import re import shutil from pathlib import Path from typing import Callable, Iterable, List, Set, Tuple from defence360agent.contracts.config import ANTIVIRUS_MODE, Malware from defence360agent.subsys.panels.hosting_panel import HostingPanel from defence360agent.utils import check_run from imav.malwarelib.model import MalwareIgnorePath from imav.malwarelib.scan.crontab import crontab_path logger = logging.getLogger(__name__) # location of admin provided watched and ignored paths _ADMIN_PATH = Path("/etc/sysconfig/imunify360/malware-filters-admin-conf") # location of internal configs, shipped with imunify360-firewall _INTERNAL_PATH = Path("/var/imunify360/files/realtime-av-conf/v1") # location of processed configs _PROCESSED_PATH = _ADMIN_PATH / "processed" _PD_NAME = "pd-combined.txt" _INTERNAL_NAME = "av-internal.txt" _ADMIN_NAME = "av-admin.txt" _ADMIN_PATHS_NAME = "av-admin-paths.txt" _IGNORED_SUB_DIR = "ignored" _MAX_PATTERN_LENGTH = 64000 _SERVICE = "imunify-realtime-av" _PD_PREPARE = "/usr/bin/i360-exclcomp" class PatternLengthError(Exception): """Raised when pattern's length is too big.""" pass def _save_basedirs(dir: Path, basedirs: Set[str]) -> None: """Save list of basedirs in a file inside dir.""" with (dir / "basedirs-list.txt").open("w") as f: for basedir in sorted(basedirs): f.write(os.path.realpath(basedir) + "\n") def _split_paths(paths: List[str]) -> Tuple[List[str], List[str]]: """Split paths into two lists: absolute and relative. Relative paths start with +. This + sign is removed from resulting path.""" absolute, relative = [], [] for path in paths: if path.startswith("+"): relative.append(path[1:]) else: absolute.append(path) return absolute, relative def _read_list(path: Path) -> List[str]: """Read file at path and return its lines as a list. Empty lines or lines starting with '#' symbol are skipped. Lines are stripped of leading and trailing whitespace. If the file does not exist, empty list is returned.""" try: with path.open() as f: lines = [line.strip() for line in f] return [x for x in lines if len(x) > 0 and not x.startswith("#")] except FileNotFoundError: return [] class _Watched(list): """Holds a list of watched glob patterns ready to be saved.""" def __init__(self, w: List[str], basedirs: Set[str]) -> None: super().__init__() absolute, relative = _split_paths(w) self.extend( os.path.realpath(p) for p in absolute + self._extend_relative(relative, basedirs) if self._is_valid(p) ) @staticmethod def _is_valid(pattern: str) -> bool: """Return True if watched pattern is valid.""" if not pattern.startswith("/"): logger.warning( "skipping watched path %s: not starts with /", pattern ) return False return True @staticmethod def _extend_relative(paths: List[str], basedirs: Set[str]) -> List[str]: """Join basedirs with all paths and return resulting list.""" extended = [] for path in paths: for basedir in basedirs: extended.append(os.path.join(basedir, path)) return extended def save(self, path: Path) -> None: """Save watched list at specified path.""" with path.open("w") as f: f.write("\n".join(self)) class _Ignored(str): """Holds a list of ignored regexp patterns ready to be saved.""" @staticmethod def _is_valid_relative(pattern: str) -> bool: """Return True if relative ignored pattern is valid.""" if pattern.startswith("^"): logger.warning( "skipping relative ignored path %s: starts with ^", pattern ) return False return True @staticmethod def _remove_leading_slash(pattern: str) -> str: """Remove leading slash from pattern, if present.""" if pattern.startswith("/"): return pattern[1:] return pattern @staticmethod def _compiles(pattern: str) -> bool: """Return True if pattern successfully compiles as regexp.""" try: re.compile(pattern) return True except Exception: logger.warning( "skipping ignored pattern %s: invalid regex", pattern ) return False @classmethod def from_patterns( cls, patterns: List[str], basedirs: Set[str] ) -> "_Ignored": """Build single ignored regexp from given patterns and basedirs.""" absolute, relative = _split_paths(patterns) absolute = [p for p in absolute if cls._compiles(p)] relative = [ cls._remove_leading_slash(p) for p in relative if cls._is_valid_relative(p) and cls._compiles(p) ] if len(basedirs) > 0 and len(relative) > 0: relative_pattern = "^(?:{})/(?:{})".format( "|".join(basedirs), "|".join(relative) ) absolute.append(relative_pattern) pat = "|".join(absolute) if pat == "": pat = "^$" return _Ignored(pat) def save(self, path: Path): """Save ignored list at specified path.""" if len(self) > _MAX_PATTERN_LENGTH: raise PatternLengthError( "{} pattern is too long ({})".format(path, len(self)) ) with path.open("w") as f: f.write(self) def _read_configs(panel: str, name: str) -> Tuple[List[str], List[str]]: """Read internal and admin lists from files with given name.""" common_dir = _INTERNAL_PATH / "common" internal = _read_list(common_dir / name) panel_path = _INTERNAL_PATH / panel.lower() if panel_path.exists(): internal.extend(_read_list(panel_path / name)) return internal, _read_list(_ADMIN_PATH / name) class _WatchedCtx: def __init__(self, internal: _Watched, admin: _Watched) -> None: self.internal = internal self.admin = admin def save(self, dir: Path) -> None: w = dir / "watched" w.mkdir(exist_ok=True) self.internal.save(w / _INTERNAL_NAME) self.admin.save(w / _ADMIN_NAME) def _watched_context( panel_name: str, basedirs: Set[str], *, extra: Iterable[str] ) -> _WatchedCtx: internal_watched, admin_watched = _read_configs(panel_name, "watched.txt") internal_watched.extend(extra) return _WatchedCtx( _Watched(internal_watched, basedirs), _Watched(admin_watched, basedirs) ) class _IgnoredCtx: def __init__( self, internal: _Ignored, admin: _Ignored, pd: _Ignored ) -> None: self.internal = internal self.admin = admin self.pd = pd def save(self, dir: Path) -> None: w = dir / _IGNORED_SUB_DIR w.mkdir(exist_ok=True) self.internal.save(w / _INTERNAL_NAME) self.admin.save(w / _ADMIN_NAME) self.pd.save(w / _PD_NAME) def _ignored_context(panel_name: str, basedirs: Set[str]) -> _IgnoredCtx: internal_ignored, admin_ignored = _read_configs(panel_name, "ignored.txt") return _IgnoredCtx( _Ignored.from_patterns(internal_ignored, basedirs), _Ignored.from_patterns(admin_ignored, basedirs), _Ignored.from_patterns(internal_ignored + admin_ignored, basedirs), ) def _admin_ignored_paths(dir: Path) -> None: ignored_paths = MalwareIgnorePath.path_list() ignored_paths_base64 = b"".join( base64.b64encode(os.fsencode(path)) + b"\n" for path in ignored_paths ) target = dir / _IGNORED_SUB_DIR / _ADMIN_PATHS_NAME target.write_bytes(ignored_paths_base64) def _contain_changes(dir1: Path, dir2: Path) -> bool: """Compare content of two folders if files in this directory are the same return False.""" for file in dir1.iterdir(): if file.is_dir(): if _contain_changes(file, dir2 / file.name): return True if not file.is_file(): continue other = dir2 / file.name if not other.exists(): return True if file.read_bytes() != other.read_bytes(): return True return False def _save_configs(dir: Path, savers: List[Callable[[Path], None]]) -> bool: """Save configs in directory dir using saves callable. Each function in savers will be called with single dir argument.""" temp = dir.with_suffix(".tmp") if temp.exists(): shutil.rmtree(str(temp)) temp.mkdir() for save in savers: save(temp) if dir.exists(): backup = dir.with_name(".backup") if backup.exists(): shutil.rmtree(str(backup)) dir.rename(backup) try: temp.rename(dir) except Exception: backup.rename(dir) raise return _contain_changes(dir, backup) else: temp.rename(dir) return True def _update_pd_symlink() -> None: target = _PROCESSED_PATH / _IGNORED_SUB_DIR / _PD_NAME source = _ADMIN_PATH / _PD_NAME try: # source.exists() returns False for broken symlink. # so call lstat() and if it throws exception, source does not exist. _ = source.lstat() except FileNotFoundError: source.symlink_to(target) else: if not ( source.is_symlink() and os.readlink(str(source)) == str(target) ): source.unlink() source.symlink_to(target) def generate_configs() -> bool: """Generate new malware paths filters config.""" panel = HostingPanel() basedirs = panel.basedirs() extra_watched = set() if Malware.CRONTABS_SCAN_ENABLED: extra_watched.add(str(crontab_path())) changed = _save_configs( _PROCESSED_PATH, [ lambda dir: _save_basedirs(dir, {*basedirs, *extra_watched}), _watched_context(panel.NAME, basedirs, extra=extra_watched).save, _ignored_context(panel.NAME, basedirs).save, _admin_ignored_paths, ], ) _update_pd_symlink() return changed async def reload_services() -> None: # pragma: no cover tasks = [ check_run(["service", _SERVICE, "restart"]), check_run([_PD_PREPARE]), ] for t in tasks: try: await t except asyncio.CancelledError: raise except Exception as e: logger.warning("realtime_av.reload_services exception: %s", e) def should_be_running() -> bool: return not ANTIVIRUS_MODE and Malware.INOTIFY_ENABLED