""" This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program.  If not, see . Copyright © 2019 Cloud Linux Software Inc. This software is also available under ImunifyAV commercial license, see Migrate scan/cleanup/restore history from revisium extension for Plesk""" import hashlib import json import itertools import logging import os from abc import abstractmethod from collections import defaultdict from dataclasses import dataclass from glob import iglob from pathlib import Path from uuid import uuid4 from defence360agent.utils import file_hash_and_size from imav.malwarelib.cleanup.storage import CleanupStorage from imav.migration_utils.revisium import ( find_revisium_db, get_domain_docroot, get_revisium_db, get_vhosts_dir, ) from imav.migration_utils.other import batched, get_owner, skip_for_im360 logger = logging.getLogger(__name__) class db: MalwareScan = None MalwareHit = None MalwareHistory = None @classmethod def init(cls, migrator): cls.MalwareScan = migrator.orm["malware_scans"] cls.MalwareHit = migrator.orm["malware_hits"] cls.MalwareHistory = migrator.orm["malware_history"] @classmethod def insert_many(cls, model, data): for batch in batched(data, n=1000): model.insert_many(list(batch)).execute() @dataclass(eq=False) class Hit: path: Path signature: str hash: str = None size: int = None def __hash__(self): return hash(self.path) def __eq__(self, other): return self.path == other.path def get_report(report_path: Path) -> dict: try: with report_path.open() as f: return json.load(f) except Exception as exc: logger.error("Can't get report %s due to %s", report_path, exc) return {} class Report: REPORT_FILE = None TASK = None def __init__(self, report_dir: Path, owner: str, timestamp: int): self.path = report_dir / self.REPORT_FILE self.default_owner = owner self.common_history_info = { "resource_type": "file", "cause": "on-demand", "initiator": "root", "ctime": timestamp, } # we cannot determine who or how the scan was started @property def row_data(self): return get_report(self.path) @abstractmethod def update_history(self): pass class Scan(Report): REPORT_FILE = "result_scan.ser" TASK = "scan" def update_history(self, hits: dict): for section, malicious in self.row_data["data"].items(): if section == "vulners": # ignore suspicious hits continue for data in malicious: hit = Hit( path=Path(data["fn"]), signature=data["sn"], hash=data["sha256"], size=data["sz"], ) hits[hit].append( { "path": str(hit.path), "event": "found", "file_owner": ( get_owner(hit.path) or self.default_owner ), "file_user": get_owner(hit.path) or self.default_owner, **self.common_history_info, } ) class Cure(Report): REPORT_FILE = "result_cure.ser" TASK = "cure" def _get_status(self, status_code): return { 1: "failed_to_cleanup", # unknown 2: "cleanup_done", 3: "cleanup_removed", 4: "failed_to_cleanup", # too big 5: "failed_to_cleanup", # failed to read 6: "failed_to_cleanup", # failed to write }.get(status_code) def update_history(self, hits): for file, data in self.row_data["data"].items(): hit = Hit(path=Path(file), signature=data["sig"]) hits[hit].append( { "path": file, "event": self._get_status(data["status"]), "file_owner": get_owner(hit.path) or self.default_owner, "file_user": get_owner(hit.path) or self.default_owner, **self.common_history_info, } ) class Undo(Cure): TASK = "undo" def _get_status(self, *args, **kwargs): return "restore_original" def save_history(hits: dict): data = itertools.chain.from_iterable(hits.values()) db.insert_many(db.MalwareHistory, data) def save_hits(scan, hits: dict): db_hits = [] for hit, history in hits.items(): if hit.path.exists(): orig_file = CleanupStorage.path / CleanupStorage.storage_name( hit.path ) if orig_file.exists(): hit.hash, hit.size = file_hash_and_size( str(orig_file), hashlib.sha256 ) if hit.hash is None or hit.size is None: # skip if not enough data continue last_change = sorted(history, key=lambda r: r["ctime"])[-1] db_hits.append( { "scanid": scan.scanid, "user": get_owner(hit.path), "orig_file": str(hit.path), "type": hit.signature, "malicious": True, "hash": hit.hash, "size": hit.size, "status": ( "found" if last_change["event"] == "restore_original" else last_change["event"] ), } ) db.insert_many(db.MalwareHit, db_hits) def get_extra_scan_data(domain_id: str): # some data is stored in a separate SQLITE db db = get_revisium_db() cursor = db.execute_sql( "SELECT last_scan_cnt_files, error_code FROM pool WHERE domain_id = ?", (int(domain_id),), ) db_data = cursor.fetchone() return {"total_resources": db_data[0], "error": db_data[1]} def create_malware_scan(scan_path: str, scan_result: dict): scan_stats = scan_result["stats"] return db.MalwareScan.create( scanid=uuid4().hex, started=int(scan_stats["end_time"] - scan_stats["exec_time"]), completed=scan_stats["end_time"], type="on-demand", total_resources=scan_result["total_resources"], path=scan_path, error=scan_result["error"], total_malicious=scan_result["cnt_cureable"], resource_type="file", ) def process_domain_scan_results(domain_results_path: Path): _, domain_id = domain_results_path.name.split(".revisium") scan_path = get_domain_docroot(domain_id) owner = Path(scan_path).owner() tasks = get_report(domain_results_path / "task.result") # undo operation doesn't have its own report, # so we rely on cure report only. If cure operation occurred after undo # we cannot determine exactly which files were recovered if {Cure.TASK, Undo.TASK} <= tasks.keys() and ( tasks[Undo.TASK]["stats"]["end_time"] < tasks[Cure.TASK]["stats"]["end_time"] ): tasks.pop(Undo.TASK) # create a new MalwareScan instance for each scanned docroot scan = create_malware_scan( scan_path=scan_path, scan_result={**tasks["scan"], **get_extra_scan_data(domain_id)}, ) # store operations history for the current domain hits = defaultdict(list) for report in [Scan, Cure, Undo]: if report.TASK in tasks: report( report_dir=domain_results_path, owner=owner, timestamp=tasks[report.TASK]["stats"]["end_time"], ).update_history(hits) save_history(hits) save_hits(scan, hits) @skip_for_im360 def migrate(migrator, database, fake=False, **kwargs): if fake or not find_revisium_db(): return db.init(migrator) # use the same connection to store results scan_results_dir_pattern = os.path.join( get_vhosts_dir(), "*/.revisium_antivirus_cache/.revisium*", ) for domain_results_path in iglob(scan_results_dir_pattern): try: process_domain_scan_results(Path(domain_results_path)) except Exception as exc: logger.exception( "Can't process revisium reports in %s due to %s", domain_results_path, exc, ) @skip_for_im360 def rollback(migrator, database, fake=False, **kwargs): pass