"""
This program is free software: you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License,
or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see .
Copyright © 2019 Cloud Linux Software Inc.
This software is also available under ImunifyAV commercial license,
see
Migrate scan/cleanup/restore history from revisium extension for Plesk"""
import hashlib
import json
import itertools
import logging
import os
from abc import abstractmethod
from collections import defaultdict
from dataclasses import dataclass
from glob import iglob
from pathlib import Path
from uuid import uuid4
from defence360agent.utils import file_hash_and_size
from imav.malwarelib.cleanup.storage import CleanupStorage
from imav.migration_utils.revisium import (
find_revisium_db,
get_domain_docroot,
get_revisium_db,
get_vhosts_dir,
)
from imav.migration_utils.other import batched, get_owner, skip_for_im360
logger = logging.getLogger(__name__)
class db:
MalwareScan = None
MalwareHit = None
MalwareHistory = None
@classmethod
def init(cls, migrator):
cls.MalwareScan = migrator.orm["malware_scans"]
cls.MalwareHit = migrator.orm["malware_hits"]
cls.MalwareHistory = migrator.orm["malware_history"]
@classmethod
def insert_many(cls, model, data):
for batch in batched(data, n=1000):
model.insert_many(list(batch)).execute()
@dataclass(eq=False)
class Hit:
path: Path
signature: str
hash: str = None
size: int = None
def __hash__(self):
return hash(self.path)
def __eq__(self, other):
return self.path == other.path
def get_report(report_path: Path) -> dict:
try:
with report_path.open() as f:
return json.load(f)
except Exception as exc:
logger.error("Can't get report %s due to %s", report_path, exc)
return {}
class Report:
REPORT_FILE = None
TASK = None
def __init__(self, report_dir: Path, owner: str, timestamp: int):
self.path = report_dir / self.REPORT_FILE
self.default_owner = owner
self.common_history_info = {
"resource_type": "file",
"cause": "on-demand",
"initiator": "root",
"ctime": timestamp,
} # we cannot determine who or how the scan was started
@property
def row_data(self):
return get_report(self.path)
@abstractmethod
def update_history(self):
pass
class Scan(Report):
REPORT_FILE = "result_scan.ser"
TASK = "scan"
def update_history(self, hits: dict):
for section, malicious in self.row_data["data"].items():
if section == "vulners": # ignore suspicious hits
continue
for data in malicious:
hit = Hit(
path=Path(data["fn"]),
signature=data["sn"],
hash=data["sha256"],
size=data["sz"],
)
hits[hit].append(
{
"path": str(hit.path),
"event": "found",
"file_owner": (
get_owner(hit.path) or self.default_owner
),
"file_user": get_owner(hit.path) or self.default_owner,
**self.common_history_info,
}
)
class Cure(Report):
REPORT_FILE = "result_cure.ser"
TASK = "cure"
def _get_status(self, status_code):
return {
1: "failed_to_cleanup", # unknown
2: "cleanup_done",
3: "cleanup_removed",
4: "failed_to_cleanup", # too big
5: "failed_to_cleanup", # failed to read
6: "failed_to_cleanup", # failed to write
}.get(status_code)
def update_history(self, hits):
for file, data in self.row_data["data"].items():
hit = Hit(path=Path(file), signature=data["sig"])
hits[hit].append(
{
"path": file,
"event": self._get_status(data["status"]),
"file_owner": get_owner(hit.path) or self.default_owner,
"file_user": get_owner(hit.path) or self.default_owner,
**self.common_history_info,
}
)
class Undo(Cure):
TASK = "undo"
def _get_status(self, *args, **kwargs):
return "restore_original"
def save_history(hits: dict):
data = itertools.chain.from_iterable(hits.values())
db.insert_many(db.MalwareHistory, data)
def save_hits(scan, hits: dict):
db_hits = []
for hit, history in hits.items():
if hit.path.exists():
orig_file = CleanupStorage.path / CleanupStorage.storage_name(
hit.path
)
if orig_file.exists():
hit.hash, hit.size = file_hash_and_size(
str(orig_file), hashlib.sha256
)
if hit.hash is None or hit.size is None: # skip if not enough data
continue
last_change = sorted(history, key=lambda r: r["ctime"])[-1]
db_hits.append(
{
"scanid": scan.scanid,
"user": get_owner(hit.path),
"orig_file": str(hit.path),
"type": hit.signature,
"malicious": True,
"hash": hit.hash,
"size": hit.size,
"status": (
"found"
if last_change["event"] == "restore_original"
else last_change["event"]
),
}
)
db.insert_many(db.MalwareHit, db_hits)
def get_extra_scan_data(domain_id: str):
# some data is stored in a separate SQLITE db
db = get_revisium_db()
cursor = db.execute_sql(
"SELECT last_scan_cnt_files, error_code FROM pool WHERE domain_id = ?",
(int(domain_id),),
)
db_data = cursor.fetchone()
return {"total_resources": db_data[0], "error": db_data[1]}
def create_malware_scan(scan_path: str, scan_result: dict):
scan_stats = scan_result["stats"]
return db.MalwareScan.create(
scanid=uuid4().hex,
started=int(scan_stats["end_time"] - scan_stats["exec_time"]),
completed=scan_stats["end_time"],
type="on-demand",
total_resources=scan_result["total_resources"],
path=scan_path,
error=scan_result["error"],
total_malicious=scan_result["cnt_cureable"],
resource_type="file",
)
def process_domain_scan_results(domain_results_path: Path):
_, domain_id = domain_results_path.name.split(".revisium")
scan_path = get_domain_docroot(domain_id)
owner = Path(scan_path).owner()
tasks = get_report(domain_results_path / "task.result")
# undo operation doesn't have its own report,
# so we rely on cure report only. If cure operation occurred after undo
# we cannot determine exactly which files were recovered
if {Cure.TASK, Undo.TASK} <= tasks.keys() and (
tasks[Undo.TASK]["stats"]["end_time"]
< tasks[Cure.TASK]["stats"]["end_time"]
):
tasks.pop(Undo.TASK)
# create a new MalwareScan instance for each scanned docroot
scan = create_malware_scan(
scan_path=scan_path,
scan_result={**tasks["scan"], **get_extra_scan_data(domain_id)},
)
# store operations history for the current domain
hits = defaultdict(list)
for report in [Scan, Cure, Undo]:
if report.TASK in tasks:
report(
report_dir=domain_results_path,
owner=owner,
timestamp=tasks[report.TASK]["stats"]["end_time"],
).update_history(hits)
save_history(hits)
save_hits(scan, hits)
@skip_for_im360
def migrate(migrator, database, fake=False, **kwargs):
if fake or not find_revisium_db():
return
db.init(migrator) # use the same connection to store results
scan_results_dir_pattern = os.path.join(
get_vhosts_dir(),
"*/.revisium_antivirus_cache/.revisium*",
)
for domain_results_path in iglob(scan_results_dir_pattern):
try:
process_domain_scan_results(Path(domain_results_path))
except Exception as exc:
logger.exception(
"Can't process revisium reports in %s due to %s",
domain_results_path,
exc,
)
@skip_for_im360
def rollback(migrator, database, fake=False, **kwargs):
pass