""" This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program.  If not, see . Copyright © 2019 Cloud Linux Software Inc. This software is also available under ImunifyAV commercial license, see """ import re from asyncio import CancelledError, Queue from contextlib import suppress from logging import getLogger from typing import List from defence360agent.api import inactivity from defence360agent.contracts.config import Malware as Config from defence360agent.contracts.license import LicenseError from defence360agent.contracts.messages import MessageType from defence360agent.contracts.plugins import ( MessageSink, MessageSource, expect, ) from defence360agent.utils import recurring_check from imav.malwarelib.utils import malware_response logger = getLogger(__name__) class MRSUploader(MessageSink, MessageSource): ERR_MSG = "Failed to submit a file" SUSP_PATTERN = re.compile(r"(?:suspicious\..+|[CS]MW-SUS-.+|SMW-HEUR-ELF)") def __init__(self): self._upload_queue = Queue() async def create_source(self, loop, sink): self._sink = sink self._loop = loop self._upload_task = loop.create_task(self.upload()) async def create_sink(self, loop): pass async def shutdown(self): self._upload_task.cancel() with suppress(CancelledError): await self._upload_task def _separate_hits_by_type(self, results) -> tuple: malicious = [] suspicious = [] extended_suspicious = [] for file, data in results.items(): is_extended_suspicious = False is_suspicious = False is_malicious = False for hit in data["hits"]: is_extended_suspicious |= hit.get("extended_suspicious", False) is_suspicious |= bool( hit["suspicious"] and self.SUSP_PATTERN.match(hit["matches"]) ) is_malicious |= not hit["suspicious"] hit_info = malware_response.HitInfo(file, data["hash"]) if is_extended_suspicious: extended_suspicious.append(hit_info) elif is_suspicious: suspicious.append(hit_info) elif is_malicious: malicious.append(hit_info) return malicious, suspicious, extended_suspicious @expect(MessageType.MalwareScan) async def process_scan(self, message): results = message["results"] if results is None: return if not Config.SEND_FILES: logger.info("Uploading files to MRS is disabled") return ( malicious_hits, suspicious_hits, extended_suspicious_hits, ) = self._separate_hits_by_type(results) if malicious_hits: await self._sink.process_message( MessageType.MalwareMRSUpload( hits=malicious_hits, upload_reason="malicious" ) ) if suspicious_hits: # Move uploading to another task await self._sink.process_message( MessageType.MalwareMRSUpload( hits=suspicious_hits, upload_reason="suspicious" ) ) if extended_suspicious_hits: # pragma: no cover await self._sink.process_message( MessageType.MalwareMRSUpload( hits=extended_suspicious_hits, upload_reason="extended-suspicious", ) ) errors = message["summary"].get("errors") if errors: error_hits = [ malware_response.HitInfo(hit["file"], hit["hash"]) for hit in errors ] await self._sink.process_message( MessageType.MalwareMRSUpload( hits=error_hits, upload_reason="scan_error" ) ) @recurring_check(0) async def upload(self): while True: files, upload_reason, message = await self._upload_queue.get() try: await self._upload_files(files, upload_reason, message) finally: self._upload_queue.task_done() async def _upload_files(self, files, upload_reason, message): with inactivity.track.task("mrs_upload"): for file in files: try: await malware_response.upload_with_retries( file, upload_reason=upload_reason ) except LicenseError as e: logger.warning("Cannot process message %s: %s", message, e) break except FileNotFoundError as e: err = "{}. {}".format(self.ERR_MSG, e.strerror) logger.warning("%s: %s", err, e.filename) @expect(MessageType.MalwareMRSUpload) async def process_hits(self, message): hits: List[malware_response.HitInfo] = message["hits"] upload_reason = message.get("upload_reason", "suspicious") unknown_hashes = await malware_response.check_known_hashes( self._loop, (hit.hash for hit in hits), upload_reason ) if not unknown_hashes: logger.info("All files are known to MRS. Skipping uploading...") return files = (hit.file for hit in hits if hit.hash in unknown_hashes) self._upload_queue.put_nowait((files, upload_reason, message))