import logging import os import pwd import shutil from glob import glob from pathlib import Path from typing import Tuple, Union from peewee import CharField, Model # Avoiding imav.malwarelib.utils.quar_fileops imports from defence360agent.model.simplification import FilenameField from defence360agent.subsys.panels.hosting_panel import HostingPanel logger = logging.getLogger(__name__) QUAR_NAME = ".imunify.quarantined" DEF_QUAR = "/var/imunify360" QUARANTINED = "quarantined" QUARANTINE_PARENTS = [DEF_QUAR, "/var/www", "/home*"] def get_model(db): """ Model stub for migration because we can't use migrator.orm[] due to custom field FilenameField """ class MalwareHit(Model): class Meta: db_table = "malware_hits" database = db orig_file = FilenameField(null=False) status = CharField() return MalwareHit def migrate(_migrator, database, fake=False, delete_function=None, *_, **__): if fake: return # For unit-tests delete_function = delete_function or delete_quarantine_folder model = get_model(database) quarantined = model.select().where(model.status == QUARANTINED) # Remove all known quarantine storages for hit in quarantined: path_to_delete, _ = find_quar(hit.orig_file) delete_function(path_to_delete) hit.delete_instance() # Remove possible quarantine storages for parent in QUARANTINE_PARENTS: for path_to_delete in glob(os.path.join(parent, QUAR_NAME)): delete_function(path_to_delete) def rollback(*_, **__): pass def delete_quarantine_folder(quarantine_path: Union[str, Path]): quarantine_path = Path(quarantine_path) if ( quarantine_path.name == QUAR_NAME and quarantine_path == quarantine_path.resolve() ): logger.info("Deleting quarantine folder %s", quarantine_path) shutil.rmtree(quarantine_path, ignore_errors=True) def find_quar(source: str) -> Tuple[Path, Path]: """ Find file in quarantine by source path. This function is copied from agent code since it is to be removed. """ file = Path(source) default_result = Path(DEF_QUAR) / QUAR_NAME, file.relative_to(Path("/")) user = None parent = None for path in file.parents: try: user = pwd.getpwuid(path.stat().st_uid) except FileNotFoundError: continue except KeyError: return default_result else: parent = path break # Prevent storing quarantine in '/' if user is None or user.pw_name == "root": return default_result resolved_place = parent.resolve() / file.relative_to(parent) try: base_dir = HostingPanel().base_home_dir(user.pw_dir) except (FileNotFoundError, RuntimeError): return default_result try: relative = resolved_place.relative_to(base_dir) except ValueError: return default_result return base_dir / QUAR_NAME, relative