"""
This program is free software: you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License,
or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see .
Copyright © 2019 Cloud Linux Software Inc.
This software is also available under ImunifyAV commercial license,
see
"""
import asyncio
import base64
import logging
import os
import re
import shutil
from pathlib import Path
from typing import Callable, Iterable, List, Set, Tuple
from defence360agent.contracts.config import ANTIVIRUS_MODE, Malware
from defence360agent.subsys.panels.hosting_panel import HostingPanel
from defence360agent.utils import check_run
from imav.malwarelib.model import MalwareIgnorePath
from imav.malwarelib.scan.crontab import crontab_path
logger = logging.getLogger(__name__)
# location of admin provided watched and ignored paths
_ADMIN_PATH = Path("/etc/sysconfig/imunify360/malware-filters-admin-conf")
# location of internal configs, shipped with imunify360-firewall
_INTERNAL_PATH = Path("/var/imunify360/files/realtime-av-conf/v1")
# location of processed configs
_PROCESSED_PATH = _ADMIN_PATH / "processed"
_PD_NAME = "pd-combined.txt"
_INTERNAL_NAME = "av-internal.txt"
_ADMIN_NAME = "av-admin.txt"
_ADMIN_PATHS_NAME = "av-admin-paths.txt"
_IGNORED_SUB_DIR = "ignored"
_MAX_PATTERN_LENGTH = 64000
_SERVICE = "imunify-realtime-av"
_PD_PREPARE = "/usr/bin/i360-exclcomp"
class PatternLengthError(Exception):
"""Raised when pattern's length is too big."""
pass
def _save_basedirs(dir: Path, basedirs: Set[str]) -> None:
"""Save list of basedirs in a file inside dir."""
with (dir / "basedirs-list.txt").open("w") as f:
for basedir in sorted(basedirs):
f.write(os.path.realpath(basedir) + "\n")
def _split_paths(paths: List[str]) -> Tuple[List[str], List[str]]:
"""Split paths into two lists: absolute and relative.
Relative paths start with +. This + sign is removed from resulting path."""
absolute, relative = [], []
for path in paths:
if path.startswith("+"):
relative.append(path[1:])
else:
absolute.append(path)
return absolute, relative
def _read_list(path: Path) -> List[str]:
"""Read file at path and return its lines as a list.
Empty lines or lines starting with '#' symbol are skipped. Lines are
stripped of leading and trailing whitespace. If the file does not exist,
empty list is returned."""
try:
with path.open() as f:
lines = [line.strip() for line in f]
return [x for x in lines if len(x) > 0 and not x.startswith("#")]
except FileNotFoundError:
return []
class _Watched(list):
"""Holds a list of watched glob patterns ready to be saved."""
def __init__(self, w: List[str], basedirs: Set[str]) -> None:
super().__init__()
absolute, relative = _split_paths(w)
self.extend(
os.path.realpath(p)
for p in absolute + self._extend_relative(relative, basedirs)
if self._is_valid(p)
)
@staticmethod
def _is_valid(pattern: str) -> bool:
"""Return True if watched pattern is valid."""
if not pattern.startswith("/"):
logger.warning(
"skipping watched path %s: not starts with /", pattern
)
return False
return True
@staticmethod
def _extend_relative(paths: List[str], basedirs: Set[str]) -> List[str]:
"""Join basedirs with all paths and return resulting list."""
extended = []
for path in paths:
for basedir in basedirs:
extended.append(os.path.join(basedir, path))
return extended
def save(self, path: Path) -> None:
"""Save watched list at specified path."""
with path.open("w") as f:
f.write("\n".join(self))
class _Ignored(str):
"""Holds a list of ignored regexp patterns ready to be saved."""
@staticmethod
def _is_valid_relative(pattern: str) -> bool:
"""Return True if relative ignored pattern is valid."""
if pattern.startswith("^"):
logger.warning(
"skipping relative ignored path %s: starts with ^", pattern
)
return False
return True
@staticmethod
def _remove_leading_slash(pattern: str) -> str:
"""Remove leading slash from pattern, if present."""
if pattern.startswith("/"):
return pattern[1:]
return pattern
@staticmethod
def _compiles(pattern: str) -> bool:
"""Return True if pattern successfully compiles as regexp."""
try:
re.compile(pattern)
return True
except Exception:
logger.warning(
"skipping ignored pattern %s: invalid regex", pattern
)
return False
@classmethod
def from_patterns(
cls, patterns: List[str], basedirs: Set[str]
) -> "_Ignored":
"""Build single ignored regexp from given patterns and basedirs."""
absolute, relative = _split_paths(patterns)
absolute = [p for p in absolute if cls._compiles(p)]
relative = [
cls._remove_leading_slash(p)
for p in relative
if cls._is_valid_relative(p) and cls._compiles(p)
]
if len(basedirs) > 0 and len(relative) > 0:
relative_pattern = "^(?:{})/(?:{})".format(
"|".join(basedirs), "|".join(relative)
)
absolute.append(relative_pattern)
pat = "|".join(absolute)
if pat == "":
pat = "^$"
return _Ignored(pat)
def save(self, path: Path):
"""Save ignored list at specified path."""
if len(self) > _MAX_PATTERN_LENGTH:
raise PatternLengthError(
"{} pattern is too long ({})".format(path, len(self))
)
with path.open("w") as f:
f.write(self)
def _read_configs(panel: str, name: str) -> Tuple[List[str], List[str]]:
"""Read internal and admin lists from files with given name."""
common_dir = _INTERNAL_PATH / "common"
internal = _read_list(common_dir / name)
panel_path = _INTERNAL_PATH / panel.lower()
if panel_path.exists():
internal.extend(_read_list(panel_path / name))
return internal, _read_list(_ADMIN_PATH / name)
class _WatchedCtx:
def __init__(self, internal: _Watched, admin: _Watched) -> None:
self.internal = internal
self.admin = admin
def save(self, dir: Path) -> None:
w = dir / "watched"
w.mkdir(exist_ok=True)
self.internal.save(w / _INTERNAL_NAME)
self.admin.save(w / _ADMIN_NAME)
def _watched_context(
panel_name: str, basedirs: Set[str], *, extra: Iterable[str]
) -> _WatchedCtx:
internal_watched, admin_watched = _read_configs(panel_name, "watched.txt")
internal_watched.extend(extra)
return _WatchedCtx(
_Watched(internal_watched, basedirs), _Watched(admin_watched, basedirs)
)
class _IgnoredCtx:
def __init__(
self, internal: _Ignored, admin: _Ignored, pd: _Ignored
) -> None:
self.internal = internal
self.admin = admin
self.pd = pd
def save(self, dir: Path) -> None:
w = dir / _IGNORED_SUB_DIR
w.mkdir(exist_ok=True)
self.internal.save(w / _INTERNAL_NAME)
self.admin.save(w / _ADMIN_NAME)
self.pd.save(w / _PD_NAME)
def _ignored_context(panel_name: str, basedirs: Set[str]) -> _IgnoredCtx:
internal_ignored, admin_ignored = _read_configs(panel_name, "ignored.txt")
return _IgnoredCtx(
_Ignored.from_patterns(internal_ignored, basedirs),
_Ignored.from_patterns(admin_ignored, basedirs),
_Ignored.from_patterns(internal_ignored + admin_ignored, basedirs),
)
def _admin_ignored_paths(dir: Path) -> None:
ignored_paths = MalwareIgnorePath.path_list()
ignored_paths_base64 = b"".join(
base64.b64encode(os.fsencode(path)) + b"\n" for path in ignored_paths
)
target = dir / _IGNORED_SUB_DIR / _ADMIN_PATHS_NAME
target.write_bytes(ignored_paths_base64)
def _contain_changes(dir1: Path, dir2: Path) -> bool:
"""Compare content of two folders if files in this directory are the
same return False."""
for file in dir1.iterdir():
if file.is_dir():
if _contain_changes(file, dir2 / file.name):
return True
if not file.is_file():
continue
other = dir2 / file.name
if not other.exists():
return True
if file.read_bytes() != other.read_bytes():
return True
return False
def _save_configs(dir: Path, savers: List[Callable[[Path], None]]) -> bool:
"""Save configs in directory dir using saves callable.
Each function in savers will be called with single dir argument."""
temp = dir.with_suffix(".tmp")
if temp.exists():
shutil.rmtree(str(temp))
temp.mkdir()
for save in savers:
save(temp)
if dir.exists():
backup = dir.with_name(".backup")
if backup.exists():
shutil.rmtree(str(backup))
dir.rename(backup)
try:
temp.rename(dir)
except Exception:
backup.rename(dir)
raise
return _contain_changes(dir, backup)
else:
temp.rename(dir)
return True
def _update_pd_symlink() -> None:
target = _PROCESSED_PATH / _IGNORED_SUB_DIR / _PD_NAME
source = _ADMIN_PATH / _PD_NAME
try:
# source.exists() returns False for broken symlink.
# so call lstat() and if it throws exception, source does not exist.
_ = source.lstat()
except FileNotFoundError:
source.symlink_to(target)
else:
if not (
source.is_symlink() and os.readlink(str(source)) == str(target)
):
source.unlink()
source.symlink_to(target)
def generate_configs() -> bool:
"""Generate new malware paths filters config."""
panel = HostingPanel()
basedirs = panel.basedirs()
extra_watched = set()
if Malware.CRONTABS_SCAN_ENABLED:
extra_watched.add(str(crontab_path()))
changed = _save_configs(
_PROCESSED_PATH,
[
lambda dir: _save_basedirs(dir, {*basedirs, *extra_watched}),
_watched_context(panel.NAME, basedirs, extra=extra_watched).save,
_ignored_context(panel.NAME, basedirs).save,
_admin_ignored_paths,
],
)
_update_pd_symlink()
return changed
async def reload_services() -> None: # pragma: no cover
tasks = [
check_run(["service", _SERVICE, "restart"]),
check_run([_PD_PREPARE]),
]
for t in tasks:
try:
await t
except asyncio.CancelledError:
raise
except Exception as e:
logger.warning("realtime_av.reload_services exception: %s", e)
def should_be_running() -> bool:
return not ANTIVIRUS_MODE and Malware.INOTIFY_ENABLED