"""
This program is free software: you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License,
or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see .
Copyright © 2019 Cloud Linux Software Inc.
This software is also available under ImunifyAV commercial license,
see
"""
import asyncio
import json
import logging
import os
import re
from contextlib import suppress
from defence360agent.contracts.messages import MessageType
from defence360agent.contracts.plugins import MessageSource
from imav.malwarelib.config import MalwareScanType
from imav.malwarelib.scan.ai_bolit.report import parse_report_json
from imav.malwarelib.scan.scan_result import ScanResult
from imav.malwarelib.subsys.ainotify import Inotify, Watcher
from defence360agent.utils import create_task_and_log_exceptions, Scope
logger = logging.getLogger(__name__)
class AibolitResultsScan(MessageSource):
"""
Plugin to handle generated ai-bolit scan reports.
Checks the contents of the *RESULT_SCAN_DIR* for the presence ai-bolit
report files that match the *REPORT_FILE_MASK* pattern
processes and deletes them.
"""
SCOPE = Scope.IM360
RESULT_SCAN_DIR = "/var/imunify360/aibolit/resident/out"
REPORT_FILE_MASK = re.compile(r"^(?P[0-9a-f-]{36})\.report$")
def __init__(self):
self._watcher = None
self._init_task = None
async def create_source(self, loop, sink):
self._loop = loop
self._sink = sink
self._init_task = create_task_and_log_exceptions(
self._loop, self._init_handling_and_setup_watcher
)
async def shutdown(self):
if self._init_task is not None:
self._init_task.cancel()
await self._init_task
self._shutdown_watcher()
def _setup_watcher(self):
# target directory must exist
os.makedirs(self.RESULT_SCAN_DIR, mode=0o700, exist_ok=True)
self._watcher = Watcher(
self._loop, coro_callback=self._handle_incoming_report
)
# path in bytes
self._watcher.watch(
path=self.RESULT_SCAN_DIR.encode(), mask=Inotify.MOVED_TO
)
def _shutdown_watcher(self):
if self._watcher is not None:
self._watcher.close()
def _get_scan_result_from_report(self, report: dict) -> ScanResult:
report_summary = report["summary"]
path = [hit["file_name"] for hit in parse_report_json(report)]
scan_id = report_summary.get("scan_id")
scan_result = ScanResult(
path, scan_id=scan_id, scan_type=MalwareScanType.REALTIME
)
scan_result.total_files = report_summary.get("total_files")
scan_result.errors = report_summary.get("errors", [])
end_time = report_summary.get("report_time")
scan_time = report_summary.get("scan_time")
start_time = end_time - scan_time
scan_result.set_start_stop(start_time, end_time)
scan_result.scans = [parse_report_json(report)]
return scan_result
async def _handle_report(self, report: dict):
try:
scan_result = self._get_scan_result_from_report(report)
result = await scan_result.get()
await self._sink.process_message(
MessageType.MalwareScan(**result.to_dict())
)
except asyncio.CancelledError:
raise
except Exception as exc:
logger.exception(
"Error '{!r}' occurred while processing "
"ai-bolit report: {}".format(exc, report)
)
async def handle_report_file(self, path: str):
match_file = self.REPORT_FILE_MASK.match(os.path.basename(path))
if match_file and os.path.isfile(path):
try:
with open(path) as f:
report = json.load(f)
except json.JSONDecodeError as exc:
logger.warning(
"Problem with parsing %s aibolit report: %s", path, exc
)
else:
report["summary"].setdefault(
"scan_id", match_file.group("uuid")
)
await self._handle_report(report)
with suppress(FileNotFoundError):
os.unlink(path)
async def _handle_incoming_report(self, event):
logger.info("Inotify event: %s", event)
report_file = os.path.join(
os.fsdecode(event.path), os.fsdecode(event.name)
)
await self.handle_report_file(report_file)
async def _handle_existing_reports(self):
if os.path.exists(self.RESULT_SCAN_DIR):
with os.scandir(self.RESULT_SCAN_DIR) as it:
for entry in it:
await self.handle_report_file(entry.path)
async def _init_handling_and_setup_watcher(self):
await self._handle_existing_reports()
self._setup_watcher()