"""
This program is free software: you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License,
or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see .
Copyright © 2019 Cloud Linux Software Inc.
This software is also available under ImunifyAV commercial license,
see
"""
import asyncio
import json
import logging
import os
import subprocess
import time
from contextlib import suppress
from pathlib import Path
from defence360agent.contracts.config import (
Core as CoreConfig,
Malware as MalwareConfig,
MalwareScanIntensity,
MalwareSignatures,
get_rapid_rescan_frequency,
)
from defence360agent.contracts.license import LicenseCLN
from defence360agent.utils import resource_limits
from imav.contracts.config import MalwareTune
from imav.malwarelib.config import (
AIBOLIT_SCAN_INTENSITY_KEY,
MalwareScanType,
)
from imav.malwarelib.scan import ScanFailedError
from imav.malwarelib.scan.ai_bolit import AIBOLIT, AIBOLIT_PATH
from imav.malwarelib.scan.ai_bolit.detached import AiBolitDetachedDir
from imav.malwarelib.scan.ai_bolit.report import parse_report_json
from imav.malwarelib.scan.crontab import crontab_path, in_crontab
from imav.malwarelib.utils import get_memory
logger = logging.getLogger(__name__)
class AiBolitError(ScanFailedError):
pass
class AiBolit:
def __init__(self, scan_id=None):
self.cmd = None
self.scan_id = scan_id
def _cmd(
self,
filename,
intensity_ram,
progress_path,
*,
scan_type: str,
scan_path=None,
scan_id=None,
db_dir=None,
detect_elf=None,
exclude_patterns=None,
follow_symlinks=None,
file_patterns=None,
use_filters=True,
json_report_path=None,
csv_report_path=None,
):
"""
:param detect_elf: True - detect as malicious
False - detect as suspicious
None - do nothing
"""
self.scan_id = scan_id
cmd = [
"/opt/ai-bolit/wrapper",
AIBOLIT_PATH,
"--smart",
"--deobfuscate",
"--avdb",
MalwareSignatures.AI_BOLIT_HOSTER,
"--no-html",
"--memory",
get_memory(intensity_ram),
"--progress",
progress_path,
*(["--use-filters"] if use_filters else []),
*(
["--use-heuristics"]
if detect_elf is True
else ["--use-heuristics-suspicious"]
if detect_elf is False
else []
),
*(
# Note: AI-BOLIT will check that HyperScan DB version
# is the same as `--avdb` (and will skip HS with a warning
# if they differ), so we don't have to do any
# race-condition-prone checks here in the Agent.
["--hs", MalwareSignatures.AI_BOLIT_HYPERSCAN]
if MalwareConfig.HYPERSCAN
else []
),
]
if scan_path and filename or (not scan_path and not filename):
raise TypeError(
"Ai-Bolit cmd generation error, cannot select from finder "
"and filelist."
"scan_path: {}, filename: {}".format(scan_path, filename)
)
in_crontabs = False
if scan_path is not None:
if not MalwareConfig.CRONTABS_SCAN_ENABLED:
exclude_crontab = [os.path.join(str(crontab_path()), "*")]
if exclude_patterns:
exclude_crontab.append(exclude_patterns)
exclude_patterns = ",".join(exclude_crontab)
else:
in_crontabs = in_crontab(Path(scan_path))
# Finder
cmd.extend(
[
"--path",
scan_path,
*(["--follow-symlink"] if follow_symlinks else []),
*(
["--ignore-filenames", exclude_patterns]
if exclude_patterns
else []
),
*(
["--only-filepaths", file_patterns]
if file_patterns is not None
else []
),
"--ignore-quarantine",
"--use-template-in-path",
"--skip-imunify360-storage",
]
)
if not in_crontabs:
cmd.extend(["--skip-system-owner"])
else:
# Filelist
cmd.extend(["--listing", filename])
if scan_type == MalwareScanType.MODSEC:
return cmd
cmd.append("--with-suspicious")
cmd.extend(["--size", str(MalwareConfig.MAX_SIGNATURE_SIZE_TO_SCAN)])
if MalwareConfig.CLOUD_ASSISTED_SCAN:
if db_dir is not None and not in_crontabs:
cmd.extend(
[
"--rapid-account-scan",
db_dir,
"--rapid-scan-rescan-frequency",
str(get_rapid_rescan_frequency()),
]
)
cmd.extend(
[
"--cloudscan-size",
str(MalwareConfig.MAX_CLOUDSCAN_SIZE_TO_SCAN),
]
)
if scan_type in (
MalwareScanType.BACKGROUND,
MalwareScanType.ON_DEMAND,
MalwareScanType.USER,
):
cmd.append("--encode-b64-fn")
cmd.extend(["--detached", scan_id])
if MalwareTune.USE_JSON_REPORT:
cmd.extend(["--json_report", json_report_path])
else:
cmd.extend(["--csv_report", csv_report_path])
# Do not print progress data to stdout,
# because special terminal characters clutter the output
# and we don't actually need it.
# NOTE: The typo is in ai-bolit (should be "quiet").
cmd.append("--quite")
else:
cmd.extend(["--json_report", ".", "--json-stdout"])
logger.info(cmd)
return cmd
@staticmethod
def get_updated_environment():
if MalwareConfig.CLOUD_ASSISTED_SCAN:
environment = os.environ.copy()
environment["CLOUD_ASSIST"] = str(LicenseCLN.get_server_id())
return environment
return None
@staticmethod
def _generate_progress_file():
return os.path.join(
CoreConfig.TMPDIR,
"progress_file_{}".format(int(time.time() * 10e6)),
)
async def scan(
self,
file,
*,
scan_type: str,
intensity_cpu=None,
intensity_io=None,
intensity_ram=None,
detect_elf=None,
use_filters=True,
scan_id=None,
db_dir=None,
scan_path=None,
exclude_patterns=None,
follow_symlinks=None,
file_patterns=None,
**_,
):
"""
:param file: path to file with list of paths to scan
:param intensity_cpu: [inverse] niceness level of the scan.
The higher the number the more priority the process
gets (more cpu)
:param intensity_io: [inverse] ioniceness level of the scan.
Higher number means more disk time may be provided
in a given period
:param intensity_ram: memory value
:param detect_elf: enable binary malware (elf) detection
:param use_filters: apply ignore filters to list of scanning files
:param scan_type: type of scan
:param scan_id: id of scan
:param db_dir: path to rapid scan database
:param scan_path: str with scan path (templates allowed)
:param exclude_patterns: patterns of filenames to ignore
:param follow_symlinks: bool, if True -> follow symlinks
:param file_patterns: patterns of filenames to scan
:raise CancelledError: when scan was cancelled
:return iterator: parsed report
"""
self.scan_id = scan_id
intensity_cpu = intensity_cpu or MalwareScanIntensity.CPU
intensity_io = intensity_io or MalwareScanIntensity.IO
intensity_ram = intensity_ram or MalwareScanIntensity.RAM
detached = scan_type in (
MalwareScanType.ON_DEMAND,
MalwareScanType.BACKGROUND,
MalwareScanType.USER,
)
if detached:
assert scan_id
with AiBolitDetachedDir(
self.scan_id,
tmp_listing_file=file,
) as work_dir:
cmd = self._cmd(
str(work_dir.listing_file) if file else None,
intensity_ram,
str(work_dir.progress_file),
scan_type=scan_type,
scan_id=scan_id,
db_dir=db_dir,
detect_elf=detect_elf,
exclude_patterns=exclude_patterns,
follow_symlinks=follow_symlinks,
scan_path=scan_path,
file_patterns=file_patterns,
json_report_path=str(work_dir.json_report_path),
csv_report_path=str(work_dir.csv_report_path),
)
scan_info = {"cmd": cmd, "scan_type": scan_type}
with work_dir.scan_info_file.open(mode="w") as f:
json.dump(scan_info, f)
with work_dir.log_file.open(
"w"
) as l_f, work_dir.err_file.open("w") as e_f:
await resource_limits.create_subprocess(
cmd,
intensity_cpu=intensity_cpu,
intensity_io=intensity_io,
key=AIBOLIT_SCAN_INTENSITY_KEY[scan_type],
start_new_session=True,
stdout=l_f,
stderr=e_f,
cwd=str(work_dir),
env=self.get_updated_environment(),
)
return {}
self.cmd = self._cmd(
file.name if file is not None else None,
intensity_ram,
self._generate_progress_file(),
scan_type=scan_type,
scan_id=scan_id,
db_dir=db_dir,
detect_elf=detect_elf,
exclude_patterns=exclude_patterns,
follow_symlinks=follow_symlinks,
scan_path=scan_path,
file_patterns=file_patterns,
use_filters=use_filters,
)
logger.debug("Executing %s", " ".join(self.cmd))
self.proc = await resource_limits.create_subprocess(
self.cmd,
intensity_cpu=intensity_cpu,
intensity_io=intensity_io,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
cwd=CoreConfig.TMPDIR,
env=self.get_updated_environment(),
key=AIBOLIT_SCAN_INTENSITY_KEY[scan_type],
)
try:
self.out, self.err = await self.proc.communicate()
except asyncio.CancelledError:
with suppress(ProcessLookupError):
self.proc.terminate()
raise
try:
report = json.loads(self.out.decode())
except json.JSONDecodeError as err:
raise AiBolitError(
message="JSONDecodeError",
command=self.cmd,
return_code=self.proc.returncode,
out=self.out,
err=self.err,
scan_id=self.scan_id,
path=scan_path,
) from err
logger.debug("%s returned %s", AIBOLIT, report)
# TODO: use base64-encoded paths for non-detached scans too
return parse_report_json(report, base64_path=False)