""" This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program.  If not, see . Copyright © 2019 Cloud Linux Software Inc. This software is also available under ImunifyAV commercial license, see """ import asyncio import json import logging import os import subprocess import time from contextlib import suppress from pathlib import Path from defence360agent.contracts.config import ( Core as CoreConfig, Malware as MalwareConfig, MalwareScanIntensity, MalwareSignatures, get_rapid_rescan_frequency, ) from defence360agent.contracts.license import LicenseCLN from defence360agent.utils import resource_limits from imav.contracts.config import MalwareTune from imav.malwarelib.config import ( AIBOLIT_SCAN_INTENSITY_KEY, MalwareScanType, ) from imav.malwarelib.scan import ScanFailedError from imav.malwarelib.scan.ai_bolit import AIBOLIT, AIBOLIT_PATH from imav.malwarelib.scan.ai_bolit.detached import AiBolitDetachedDir from imav.malwarelib.scan.ai_bolit.report import parse_report_json from imav.malwarelib.scan.crontab import crontab_path, in_crontab from imav.malwarelib.utils import get_memory logger = logging.getLogger(__name__) class AiBolitError(ScanFailedError): pass class AiBolit: def __init__(self, scan_id=None): self.cmd = None self.scan_id = scan_id def _cmd( self, filename, intensity_ram, progress_path, *, scan_type: str, scan_path=None, scan_id=None, db_dir=None, detect_elf=None, exclude_patterns=None, follow_symlinks=None, file_patterns=None, use_filters=True, json_report_path=None, csv_report_path=None, ): """ :param detect_elf: True - detect as malicious False - detect as suspicious None - do nothing """ self.scan_id = scan_id cmd = [ "/opt/ai-bolit/wrapper", AIBOLIT_PATH, "--smart", "--deobfuscate", "--avdb", MalwareSignatures.AI_BOLIT_HOSTER, "--no-html", "--memory", get_memory(intensity_ram), "--progress", progress_path, *(["--use-filters"] if use_filters else []), *( ["--use-heuristics"] if detect_elf is True else ["--use-heuristics-suspicious"] if detect_elf is False else [] ), *( # Note: AI-BOLIT will check that HyperScan DB version # is the same as `--avdb` (and will skip HS with a warning # if they differ), so we don't have to do any # race-condition-prone checks here in the Agent. ["--hs", MalwareSignatures.AI_BOLIT_HYPERSCAN] if MalwareConfig.HYPERSCAN else [] ), ] if scan_path and filename or (not scan_path and not filename): raise TypeError( "Ai-Bolit cmd generation error, cannot select from finder " "and filelist." "scan_path: {}, filename: {}".format(scan_path, filename) ) in_crontabs = False if scan_path is not None: if not MalwareConfig.CRONTABS_SCAN_ENABLED: exclude_crontab = [os.path.join(str(crontab_path()), "*")] if exclude_patterns: exclude_crontab.append(exclude_patterns) exclude_patterns = ",".join(exclude_crontab) else: in_crontabs = in_crontab(Path(scan_path)) # Finder cmd.extend( [ "--path", scan_path, *(["--follow-symlink"] if follow_symlinks else []), *( ["--ignore-filenames", exclude_patterns] if exclude_patterns else [] ), *( ["--only-filepaths", file_patterns] if file_patterns is not None else [] ), "--ignore-quarantine", "--use-template-in-path", "--skip-imunify360-storage", ] ) if not in_crontabs: cmd.extend(["--skip-system-owner"]) else: # Filelist cmd.extend(["--listing", filename]) if scan_type == MalwareScanType.MODSEC: return cmd cmd.append("--with-suspicious") cmd.extend(["--size", str(MalwareConfig.MAX_SIGNATURE_SIZE_TO_SCAN)]) if MalwareConfig.CLOUD_ASSISTED_SCAN: if db_dir is not None and not in_crontabs: cmd.extend( [ "--rapid-account-scan", db_dir, "--rapid-scan-rescan-frequency", str(get_rapid_rescan_frequency()), ] ) cmd.extend( [ "--cloudscan-size", str(MalwareConfig.MAX_CLOUDSCAN_SIZE_TO_SCAN), ] ) if scan_type in ( MalwareScanType.BACKGROUND, MalwareScanType.ON_DEMAND, MalwareScanType.USER, ): cmd.append("--encode-b64-fn") cmd.extend(["--detached", scan_id]) if MalwareTune.USE_JSON_REPORT: cmd.extend(["--json_report", json_report_path]) else: cmd.extend(["--csv_report", csv_report_path]) # Do not print progress data to stdout, # because special terminal characters clutter the output # and we don't actually need it. # NOTE: The typo is in ai-bolit (should be "quiet"). cmd.append("--quite") else: cmd.extend(["--json_report", ".", "--json-stdout"]) logger.info(cmd) return cmd @staticmethod def get_updated_environment(): if MalwareConfig.CLOUD_ASSISTED_SCAN: environment = os.environ.copy() environment["CLOUD_ASSIST"] = str(LicenseCLN.get_server_id()) return environment return None @staticmethod def _generate_progress_file(): return os.path.join( CoreConfig.TMPDIR, "progress_file_{}".format(int(time.time() * 10e6)), ) async def scan( self, file, *, scan_type: str, intensity_cpu=None, intensity_io=None, intensity_ram=None, detect_elf=None, use_filters=True, scan_id=None, db_dir=None, scan_path=None, exclude_patterns=None, follow_symlinks=None, file_patterns=None, **_, ): """ :param file: path to file with list of paths to scan :param intensity_cpu: [inverse] niceness level of the scan. The higher the number the more priority the process gets (more cpu) :param intensity_io: [inverse] ioniceness level of the scan. Higher number means more disk time may be provided in a given period :param intensity_ram: memory value :param detect_elf: enable binary malware (elf) detection :param use_filters: apply ignore filters to list of scanning files :param scan_type: type of scan :param scan_id: id of scan :param db_dir: path to rapid scan database :param scan_path: str with scan path (templates allowed) :param exclude_patterns: patterns of filenames to ignore :param follow_symlinks: bool, if True -> follow symlinks :param file_patterns: patterns of filenames to scan :raise CancelledError: when scan was cancelled :return iterator: parsed report """ self.scan_id = scan_id intensity_cpu = intensity_cpu or MalwareScanIntensity.CPU intensity_io = intensity_io or MalwareScanIntensity.IO intensity_ram = intensity_ram or MalwareScanIntensity.RAM detached = scan_type in ( MalwareScanType.ON_DEMAND, MalwareScanType.BACKGROUND, MalwareScanType.USER, ) if detached: assert scan_id with AiBolitDetachedDir( self.scan_id, tmp_listing_file=file, ) as work_dir: cmd = self._cmd( str(work_dir.listing_file) if file else None, intensity_ram, str(work_dir.progress_file), scan_type=scan_type, scan_id=scan_id, db_dir=db_dir, detect_elf=detect_elf, exclude_patterns=exclude_patterns, follow_symlinks=follow_symlinks, scan_path=scan_path, file_patterns=file_patterns, json_report_path=str(work_dir.json_report_path), csv_report_path=str(work_dir.csv_report_path), ) scan_info = {"cmd": cmd, "scan_type": scan_type} with work_dir.scan_info_file.open(mode="w") as f: json.dump(scan_info, f) with work_dir.log_file.open( "w" ) as l_f, work_dir.err_file.open("w") as e_f: await resource_limits.create_subprocess( cmd, intensity_cpu=intensity_cpu, intensity_io=intensity_io, key=AIBOLIT_SCAN_INTENSITY_KEY[scan_type], start_new_session=True, stdout=l_f, stderr=e_f, cwd=str(work_dir), env=self.get_updated_environment(), ) return {} self.cmd = self._cmd( file.name if file is not None else None, intensity_ram, self._generate_progress_file(), scan_type=scan_type, scan_id=scan_id, db_dir=db_dir, detect_elf=detect_elf, exclude_patterns=exclude_patterns, follow_symlinks=follow_symlinks, scan_path=scan_path, file_patterns=file_patterns, use_filters=use_filters, ) logger.debug("Executing %s", " ".join(self.cmd)) self.proc = await resource_limits.create_subprocess( self.cmd, intensity_cpu=intensity_cpu, intensity_io=intensity_io, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=CoreConfig.TMPDIR, env=self.get_updated_environment(), key=AIBOLIT_SCAN_INTENSITY_KEY[scan_type], ) try: self.out, self.err = await self.proc.communicate() except asyncio.CancelledError: with suppress(ProcessLookupError): self.proc.terminate() raise try: report = json.loads(self.out.decode()) except json.JSONDecodeError as err: raise AiBolitError( message="JSONDecodeError", command=self.cmd, return_code=self.proc.returncode, out=self.out, err=self.err, scan_id=self.scan_id, path=scan_path, ) from err logger.debug("%s returned %s", AIBOLIT, report) # TODO: use base64-encoded paths for non-detached scans too return parse_report_json(report, base64_path=False)