""" This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program.  If not, see . Copyright © 2019 Cloud Linux Software Inc. This software is also available under ImunifyAV commercial license, see """ import asyncio import re import shutil import time import uuid from contextlib import suppress from functools import partial from logging import getLogger from pathlib import Path from typing import Dict, Iterable, List, Tuple from defence360agent import utils from defence360agent.api import inactivity from defence360agent.contracts.config import ( Malware as Config, MyImunifyConfig, ) from defence360agent.contracts.hook_events import HookEvent from defence360agent.contracts.license import LicenseCLN from defence360agent.contracts.messages import MessageType from defence360agent.contracts.permissions import myimunify_protection_enabled from defence360agent.contracts.plugins import ( MessageSink, MessageSource, expect, ) from defence360agent.internals.global_scope import g from defence360agent.utils import ( Scope, nice_iterator, recurring_check, split_for_chunk, ) from defence360agent.utils.common import DAY, MINUTE, rate_limit from imav.malwarelib.cleanup.cleaner import ( CleanupResult, MalwareCleaner, MalwareCleanupProxy, ) from imav.malwarelib.cleanup.storage import CleanupStorage from imav.malwarelib.config import ( MalwareHitStatus, MalwareScanResourceType, MalwareScanType, ) from imav.malwarelib.model import MalwareHistory, MalwareHit from imav.malwarelib.scan import ScanAlreadyCompleteError from imav.malwarelib.scan.mds.cleaner import MalwareDatabaseCleaner from imav.malwarelib.scan.mds.detached import ( MDSDetachedCleanup, MDSDetachedRestore, ) from imav.malwarelib.scan.mds.restore import MalwareDatabaseRestore from imav.malwarelib.subsys.malware import HackerTrapHitsSaver, MalwareAction from imav.malwarelib.utils import malware_response from imav.malwarelib.utils.user_list import ( get_username_by_uid, is_uid, ) logger = getLogger(__name__) COUNT_OF_ATTEMPTS_TO_CLEANUP_PER_DAY = 4 _group_by_status = partial(MalwareHit.group_by_attribute, attribute="status") _group_by_user = partial(MalwareHit.group_by_attribute, attribute="owner") throttled_log_error = rate_limit(period=DAY, on_drop=logger.warning)( logger.error ) def filter_cleanable(hits: Iterable[MalwareHit]) -> Iterable: return (hit for hit in hits if hit.status == MalwareHitStatus.FOUND) class Cleanup(MessageSink, MessageSource): def __init__(self): self._cleanup_task = None self._store_original_task = None self._running = False self._loop = None self._sink = None self._proxy = None self._cleaner = None async def create_source(self, loop, sink): self._loop = loop self._sink = sink self._proxy = MalwareCleanupProxy() self._cleaner = MalwareCleaner(loop=loop, sink=sink) self._cleanup_task = loop.create_task(self.cleanup()) async def create_sink(self, loop): pass async def shutdown(self): if self._cleanup_task: self._cleanup_task.cancel() with suppress(asyncio.CancelledError): await self._cleanup_task @expect(MessageType.MalwareCleanupTask) async def process_cleanup_task(self, message: Dict): cause = message.get("cause") initiator = message.get("initiator") post_action = message.get("post_action") scan_id = message.get("scan_id") standard_only = message.get("standard_only") manual_cleanup = cause is None # In case another scan already found some of the hits # and the cleanup for them has started. origin_hits_num = len(message["hits"]) hits = MalwareHit.refresh_hits( message["hits"], include_scan_info=not manual_cleanup ) hits = filter_cleanable(hits) hits = [ hit for hit in hits if hit.resource_type == MalwareScanResourceType.FILE.value ] if ( not manual_cleanup ): # don't use any limits when run cleanup manually rescan_hits, hits = self._split_hits_by_scan_type( hits, [MalwareScanType.RESCAN, MalwareScanType.RESCAN_OUTDATED] ) rescan_hits = await self._filter_rescan_hits(rescan_hits) hits = rescan_hits + await self._filter_failed_to_cleanup_hits( hits ) if filtered := origin_hits_num - len(hits): logger.info( "%s/%s hits filtered before cleanup", filtered, origin_hits_num, ) self._store_original_task = self._loop.create_task( self._store_original( hits, cause, initiator, post_action, scan_id, standard_only ) ) @staticmethod def _split_hits_by_scan_type( hits: list, scan_types: List[MalwareScanType] ) -> Tuple[list, list]: target_hits, other_hits = [], [] for hit in hits: if hit.scanid.type in scan_types: target_hits.append(hit) else: other_hits.append(hit) return target_hits, other_hits @staticmethod async def _filter_failed_to_cleanup( hits: list, *, time_range: float, allowed_attempts: int ) -> list: hits_to_clean = [] if hits: since = time.time() - time_range failed_cleanup_count = {} for hits_chunk in split_for_chunk(hits, chunk_size=200): failed_cleanup_count.update( dict( MalwareHistory.get_failed_cleanup_events_count( [hit.orig_file for hit in hits_chunk], since=since, ) ) ) await asyncio.sleep(0) for hit in hits: failures = failed_cleanup_count.get(hit.orig_file, 0) if failures >= allowed_attempts: throttled_log_error( "Skip cleanup file '%s', since there are too many " "attempts to cleanup it in %s sec [%s]", hit.orig_file, time_range, failures, ) continue hits_to_clean.append(hit) return hits_to_clean async def _filter_rescan_hits(self, hits: list) -> list: return await self._filter_failed_to_cleanup( hits, time_range=5 * MINUTE, allowed_attempts=2 ) async def _filter_failed_to_cleanup_hits(self, hits: list) -> list: """ Don't try to cleanup the same hit more than *COUNT_OF_ATTEMPTS_TO_CLEANUP_PER_DAY* """ return await self._filter_failed_to_cleanup( hits, time_range=DAY, allowed_attempts=COUNT_OF_ATTEMPTS_TO_CLEANUP_PER_DAY, ) async def _store_original( self, hits, cause, initiator, post_action, scan_id, standard_only ): MalwareHit.set_status(hits, MalwareHitStatus.CLEANUP_STARTED) original_status = _group_by_status(hits) with inactivity.track.task("cleanup_storage"): succeeded, failed, not_exist = await CleanupStorage.store_all(hits) for hit in failed: await self._sink.process_message( MessageType.CleanupFailed( message=( "Failed to store the original from {} to {}".format( hit.orig_file, CleanupStorage.path ) ), timestamp=int(time.time()), ) ) self._add_to_proxy( succeeded, cause, initiator, post_action, scan_id, standard_only ) for status, hit_list in original_status.items(): MalwareHit.set_status([h for h in failed if h in hit_list], status) MalwareHit.delete_instances(not_exist) await MalwareAction.cleanup_failed( not_exist, cause=cause, initiator=initiator ) def _add_to_proxy( self, hits, cause, initiator, post_action, scan_id, standard_only ): standard_only_hits = [] advanced_hits = [] for hit in hits: standard_only_user = decide_if_standard_signatures_only( initiator, standard_only ) if standard_only_user: standard_only_hits.append(hit) else: advanced_hits.append(hit) self._proxy.add( cause, initiator, post_action, scan_id, True, standard_only_hits, ) self._proxy.add( cause, initiator, post_action, scan_id, standard_only, # None if default action otherwise False advanced_hits, ) @staticmethod def _user_hits(hits): user_hits = _group_by_user(hits) return user_hits def _cloud_assisted_hits(self): action_hits = self._proxy.flush() for ( cause, initiator, post_action, scan_id, standard_only, all_hits, ) in action_hits: blacklist = [ hit for hit in all_hits if re.match(r"\w+-BLKH-|cloudhash\.|cld-", hit.type) ] regular_hits = [hit for hit in all_hits if hit not in blacklist] yield ( regular_hits, blacklist, cause, initiator, post_action, scan_id, standard_only, ) async def _start_hook(self, cleanup_id, started, hits): dump = [hit.as_dict() for hit in hits] cleanup_started = HookEvent.MalwareCleanupStarted( cleanup_id=cleanup_id, started=started, total_files=len(hits), DUMP=dump, ) await self._sink.process_message(cleanup_started) async def _clean_files( self, hits, blacklist=None, cause=None, initiator=None, post_action=None, scan_id=None, standard_only=None, ): user_hits = self._user_hits(hits) user_hits_black = self._user_hits(blacklist or []) for user in {*user_hits, *user_hits_black}: hits_regular = user_hits.get(user, []) hits_black = user_hits_black.get(user, []) user_hits_all = hits_regular + hits_black files = [hit.orig_file for hit in hits_regular] black = [hit.orig_file for hit in hits_black] logger.debug("Cleaning files: %s", files + black) cleanup_id = uuid.uuid4().hex started = time.time() if is_uid(user): # non panel user uid = user if not LicenseCLN.is_unlimited(): logger.error( f"Can't clean files for non panel user {uid=}, " "since license is limited" ) await self._sink.process_message( MessageType.MalwareCleanup( hits=user_hits_all, result={}, cleanup_id=cleanup_id, started=started, error="Cleanup failed. License restriction", cause=cause, initiator=initiator, post_action=post_action, scan_id=scan_id, args=[], ) ) continue if not (username := await get_username_by_uid(uid)): logger.error( f"Can't find username for {uid=}. Skip cleanup" ) continue user = username await self._start_hook(cleanup_id, started, user_hits_all) result, error, cmd = await self._cleaner.start( user, files, soft=Config.CLEANUP_TRIM, blacklist=black, standard_only=standard_only, ) await self._sink.process_message( MessageType.MalwareCleanup( hits=user_hits_all, result=result, cleanup_id=cleanup_id, started=started, error=error, cause=cause, initiator=initiator, post_action=post_action, scan_id=scan_id, args=cmd, ) ) async def _cleanup(self): if self._running: return if not self._proxy.hits: self._proxy.reset() return self._running = True with inactivity.track.task("cleanup"): try: data = self._cloud_assisted_hits() for ( all_hits, blacklist, cause, initiator, post_action, scan_id, standard_only, ) in data: await self._clean_files( all_hits, blacklist=blacklist, cause=cause, initiator=initiator, post_action=post_action, scan_id=scan_id, standard_only=standard_only, ) finally: self._running = False @recurring_check(1) async def cleanup(self): await self._cleanup() class ResultProcessor(MessageSink, MessageSource): SCOPE = Scope.AV async def create_sink(self, loop): pass async def create_source(self, loop, sink): self._sink = sink @staticmethod def _set_hit_status(hits: List[MalwareHit], status: str, cleaned_at=None): MalwareHit.set_status(hits, status, cleaned_at) for hit in hits: hit.status = status hit.cleaned_at = cleaned_at @expect(MessageType.MalwareCleanup) async def store_result(self, message): hits: List[MalwareHit] = message["hits"] result: CleanupResult = message["result"] cause = message.get("cause") initiator = message.get("initiator") now = time.time() processed = [hit for hit in hits if hit in result] unprocessed = [hit for hit in hits if hit not in result] not_exist = [] async for hit in nice_iterator(processed, chunk_size=100): # in case if procu2.php tries to clean user file in root dirs, # it will be marked as non-existent due to 'Permission denied' # error which confuses users, so consider it as unable to cleanup. if result[hit].not_exist(): # pragma: no cover if hit.orig_file_path.exists(): unprocessed.append(hit) else: not_exist.append(hit) await MalwareAction.cleanup_unable( unprocessed, cause=cause, initiator=initiator ) requires_myimunify_protection = [ hit for hit in processed if result[hit].requires_myimunify_protection() ] await MalwareAction.cleanup_requires_myimunify_protection( requires_myimunify_protection, cause=cause, initiator=initiator ) self._set_hit_status( requires_myimunify_protection, MalwareHitStatus.CLEANUP_REQUIRES_MYIMUNIFY_PROTECTION, now, ) failed = [hit for hit in processed if result[hit].is_failed()] await MalwareAction.cleanup_failed( failed, cause=cause, initiator=initiator ) cleaned = [hit for hit in processed if result[hit].is_cleaned()] await MalwareAction.cleanup_done( cleaned, cause=cause, initiator=initiator ) self._set_hit_status(cleaned, MalwareHitStatus.CLEANUP_DONE, now) removed = [hit for hit in processed if result[hit].is_removed()] await MalwareAction.cleanup_removed( removed, cause=cause, initiator=initiator ) self._set_hit_status(removed, MalwareHitStatus.CLEANUP_REMOVED, now) MalwareHit.delete_instances(not_exist) for status, hit_list in _group_by_status(unprocessed, failed).items(): self._set_hit_status(hit_list, status) await self.send_failed_to_cleanup_hits_to_mrs(failed) return message async def send_failed_to_cleanup_hits_to_mrs(self, failed_to_cleanup_hits): if failed_to_cleanup_hits: await self._sink.process_message( MessageType.MalwareMRSUpload( hits=[ malware_response.HitInfo(hit.orig_file, hit.hash) for hit in failed_to_cleanup_hits ], upload_reason="cleanup_failure_current", ) ) await self._sink.process_message( MessageType.MalwareMRSUpload( hits=[ malware_response.HitInfo( str(CleanupStorage.get_hit_store_path(hit)), hit.hash, ) for hit in failed_to_cleanup_hits ], upload_reason="cleanup_failure_original", ) ) class StorageController(MessageSink): """Remove old backed up files from storage""" def __init__(self): self._clear_task = None self._keep = Config.CLEANUP_KEEP async def create_sink(self, loop): self._clear_task = loop.create_task(self.daily_clear()) async def shutdown(self): if self._clear_task: self._clear_task.cancel() with suppress(asyncio.CancelledError): await self._clear_task async def _clear(self): now = time.time() keep_hits = now - self._keep * DAY keep_orig = now - (self._keep + 1) * DAY # keep files one more day MalwareHit.delete().where(MalwareHit.cleaned_at < keep_hits).execute() cleared = await CleanupStorage.clear(keep_orig) if cleared: logger.info( "Cleanup storage have cleaned. Files removed: %s", cleared ) @expect(MessageType.ConfigUpdate) @utils.log_error_and_ignore() async def config_updated(self, _): if self._keep != Config.CLEANUP_KEEP: self._keep = Config.CLEANUP_KEEP await self._clear() @recurring_check(DAY) async def daily_clear(self): await self._clear() def decide_if_standard_signatures_only(user, standard_only): """Root user or user with MyImunify can use advanced signatures""" if not MyImunifyConfig.ENABLED: return False if user is None or user == "root" or myimunify_protection_enabled(user): return standard_only return True class ResultProcessorIm360(ResultProcessor): """Imunify360 specialization of ResultProcessor, which removes all cleaned and removed files from HackerTrap """ SCOPE = Scope.IM360 @expect(MessageType.MalwareCleanup) async def store_result(self, message): message = await super().store_result(message) to_remove = [ Path(hit) for hit, state in message["result"].items() if (state.is_cleaned() or state.is_removed()) ] await HackerTrapHitsSaver.update_sa_hits([], to_remove) class CleanupDb(MessageSink): SCOPE = Scope.IM360 def __init__(self): self._loop = None @staticmethod async def _start_cleaner(path, app_name): cleanup_id = uuid.uuid4().hex await MalwareDatabaseCleaner(cleanup_id, path, app_name).start() async def _cleanup_next(self): if ( MalwareHit.db_hits_under_cleanup().exists() or ( next_hit := MalwareHit.db_hits_pending_cleanup() .order_by(MalwareHit.timestamp.asc()) .first() ) is None ): return logger.info( "Cleaning hit: (%s::%s)", next_hit.orig_file, next_hit.app_name ) MalwareHit.set_status([next_hit], MalwareHitStatus.CLEANUP_STARTED) await self._start_cleaner(next_hit.orig_file, next_hit.app_name) async def create_sink(self, loop): self._loop = loop await self._cleanup_next() @expect(MessageType.MalwareCleanupTask) async def process_cleanup_task(self, message): hits = MalwareHit.refresh_hits(message["hits"]) hits_to_clean = filter_cleanable(hits) db_hits = [ hit for hit in hits_to_clean if hit.resource_type == MalwareScanResourceType.DB.value ] if not db_hits: return MalwareHit.set_status(db_hits, MalwareHitStatus.CLEANUP_PENDING) await self._cleanup_next() @expect(MessageType.MalwareCleanComplete) async def parse_cleanup_results(self, message): clean_id = message["scan_id"] detached_cleanup = MDSDetachedCleanup(clean_id) try: cleanup_outcome = await detached_cleanup.complete() except ScanAlreadyCompleteError: # This happens when AV is woken up by AiBolit. See DEF-11078. logger.warning( "Cannot complete cleanup %s, assuming it is already complete", clean_id, ) return finally: shutil.rmtree( str(detached_cleanup.detached_dir), ignore_errors=True ) await g.sink.process_message(cleanup_outcome) @expect(MessageType.MalwareDatabaseCleanup) async def update_cleaned_hits_status( self, message: MessageType.MalwareDatabaseCleanup ): cleaned_hits = MalwareHit.db_hits_under_cleanup_in(message.succeeded) failed_hits = MalwareHit.db_hits_under_cleanup_in(message.failed) MalwareHit.set_status( cleaned_hits, MalwareHitStatus.CLEANUP_DONE, time.time() ) MalwareHit.set_status(failed_hits, MalwareHitStatus.FOUND) await self._cleanup_next() @expect(MessageType.MalwareDatabaseCleanupFailed) async def update_failed_hits_status(self, message): """ Clear the queue when the cleanup fails, set hits' status back to infected """ # We assume here that all CLEANUP_STARTED hits are part of the # same cleanup operation hits = MalwareHit.db_hits_under_cleanup() MalwareHit.set_status(hits, MalwareHitStatus.FOUND) await self._cleanup_next() @expect(MessageType.MalwareDatabaseCleanup) async def save_cleanup_events_in_history( self, message: MessageType.MalwareDatabaseCleanup ): cause = None initiator = None cleaned_hits = MalwareHit.get_db_hits(message.succeeded) await MalwareAction.cleanup_done( cleaned_hits, cause=cause, initiator=initiator ) failed_hits = MalwareHit.get_db_hits(message.failed) await MalwareAction.cleanup_failed( failed_hits, cause=cause, initiator=initiator ) class RestoreOriginalDb(MessageSink): SCOPE = Scope.IM360 def __init__(self): self.loop = None @staticmethod async def _restore_next(): if ( MalwareHit.db_hits_under_cleanup_restore().exists() or ( hit_to_restore := MalwareHit.db_hits_pending_cleanup_restore() .order_by(MalwareHit.timestamp.asc()) .first() ) is None ): return logger.info( "Restoring from cleanup hit: (%s::%s)", hit_to_restore.orig_file, hit_to_restore.app_name, ) await MalwareDatabaseRestore( path=hit_to_restore.orig_file, app_name=hit_to_restore.app_name ).restore() MalwareHit.set_status( [hit_to_restore], MalwareHitStatus.CLEANUP_RESTORE_STARTED ) async def create_sink(self, loop): self.loop = loop await self._restore_next() @staticmethod def _filter_under_restore( hits: Iterable[MalwareHit], ) -> Iterable[MalwareHit]: return ( hit for hit in hits if hit.status == MalwareHitStatus.CLEANUP_RESTORE_STARTED ) @expect(MessageType.MalwareDatabaseRestoreTask) async def queue_db_restore(self, message): MalwareHit.set_status( MalwareHit.db_hits() .where(MalwareHit.orig_file == message.path) .where(MalwareHit.app_name == message.app_name), MalwareHitStatus.CLEANUP_RESTORE_PENDING, ) await self._restore_next() @expect(MessageType.MalwareRestoreComplete) async def parse_restore_results(self, message): restore_id = message["scan_id"] detached_restore = MDSDetachedRestore(restore_id) try: restore_message = await detached_restore.complete() except ScanAlreadyCompleteError: # This happens when AV is woken up by AiBolit. See DEF-11078. logger.warning( "Cannot complete restore %s, assuming it is already complete", restore_id, ) return finally: shutil.rmtree( str(detached_restore.detached_dir), ignore_errors=True ) await g.sink.process_message(restore_message) @expect(MessageType.MalwareDatabaseRestore) async def update_restored_hits_status(self, message): restored_hits = MalwareHit.get_db_hits(message.succeeded) MalwareHit.set_status( self._filter_under_restore(restored_hits), MalwareHitStatus.FOUND ) await self._restore_next() @expect(MessageType.MalwareDatabaseRestore) async def save_restore_events_in_history(self, message): cause = message.get("cause") initiator = message.get("initiator") restored_hits = MalwareHit.get_db_hits(message.succeeded) for hit in restored_hits: # FIXME: change cleanup_restored_original to accept multiple # values await MalwareAction.cleanup_restored_original( path=hit.orig_file, app_name=hit.app_name, resource_type=MalwareScanResourceType.DB.value, file_owner=hit.owner, file_user=hit.user, initiator=initiator, cause=cause, db_host=hit.db_host, db_port=hit.db_port, db_name=hit.db_name, ) failed_hits = MalwareHit.get_db_hits(message.failed) for hit in failed_hits: await MalwareAction.cleanup_failed_restore( path=hit.orig_file, app_name=hit.app_name, resource_type=MalwareScanResourceType.DB.value, file_owner=hit.owner, file_user=hit.user, initiator=initiator, cause=cause, db_host=hit.db_host, db_port=hit.db_port, db_name=hit.db_name, ) @expect(MessageType.MalwareDatabaseRestoreFailed) async def update_failed_hits_status(self, message): """ Clear the queue when the restore fails, set hits' status back to cleanup_done """ hits = MalwareHit.db_hits_under_restoration() MalwareHit.set_status(hits, MalwareHitStatus.CLEANUP_DONE) await self._restore_next()