""" This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program.  If not, see . Copyright © 2019 Cloud Linux Software Inc. This software is also available under ImunifyAV commercial license, see """ import asyncio import os import time from datetime import datetime, timedelta from logging import getLogger from pathlib import Path from defence360agent.api import inactivity from defence360agent.contracts.messages import MessageType from defence360agent.internals.global_scope import g from defence360agent.mr_proper import BaseCleaner from defence360agent.subsys import persistent_state from defence360agent.utils import nice_iterator, split_for_chunk from imav.malwarelib.config import ( MalwareHitStatus, MalwareScanResourceType, MalwareScanType, ) from imav.malwarelib.model import MalwareHistory, MalwareHit, MalwareScan from imav.subsys import realtime_av logger = getLogger(__name__) class OutdatedScansCleaner(BaseCleaner): PERIOD = timedelta(days=1).total_seconds() CLEANUP_LIMIT_DELTA = timedelta(days=30) LOCK_FILE = ( persistent_state.PERSISTENT_STATE_DIR / ".malware-scans-cleaner.lock" ) @classmethod async def cleanup(cls) -> None: cleanup_time_limit = int( (datetime.now() - cls.CLEANUP_LIMIT_DELTA).timestamp() ) deleted = ( MalwareScan.delete() .where(MalwareScan.started < cleanup_time_limit) .execute() ) logger.info("Cleaned %s outdated scans", deleted) class OutdatedHistoryCleaner(BaseCleaner): PERIOD = timedelta(days=1).total_seconds() CLEANUP_LIMIT_DELTA = timedelta(days=30) LOCK_FILE = ( persistent_state.PERSISTENT_STATE_DIR / ".malware-history-cleaner.lock" ) @classmethod async def cleanup(cls) -> None: keep_time_threshold = int( (datetime.now() - cls.CLEANUP_LIMIT_DELTA).timestamp() ) deleted = ( MalwareHistory.delete() .where(MalwareHistory.ctime < keep_time_threshold) .execute() ) logger.info("Cleaned %s outdated malware history", deleted) class OutdatedHitsCleaner(BaseCleaner): PERIOD = int( os.getenv( "IMUNIFY360_OUTDATED_HITS_CHECK_INTERVAL", timedelta(days=1).total_seconds(), ) ) FILE_NAME = os.getenv( "MALWARE_HITS_CLEANER_LOCK_FILE", ".malware-hits-cleaner.lock" ) LOCK_FILE = persistent_state.PERSISTENT_STATE_DIR / FILE_NAME REALTIME_SCAN_THRESHOLD = timedelta(minutes=10).total_seconds() CHUNK_SIZE = 1000 @classmethod async def _cleanup(cls) -> None: """Rescan irrelevant malware hits""" to_rescan = [] not_exist_hits = [] hits = ( MalwareHit.select() .where( MalwareHit.status == MalwareHitStatus.FOUND, MalwareHit.resource_type == MalwareScanResourceType.FILE.value, ) .order_by(MalwareHit.timestamp.asc()) ) async for hit in nice_iterator(hits, chunk_size=cls.CHUNK_SIZE): orig_file_path = Path(hit.orig_file) try: file_ctime = orig_file_path.stat().st_ctime if hit.timestamp < file_ctime: # rescan the modified files after scanning, # they may not be infected anymore realtime_threshold = ( time.time() - cls.REALTIME_SCAN_THRESHOLD ) # don't scan file twice if ( not realtime_av.should_be_running() or file_ctime < realtime_threshold ): to_rescan.append(hit.orig_file) except FileNotFoundError: not_exist_hits.append(hit.id) except OSError as exc: logger.warning("Can't check file due to %s", exc) if to_rescan: for files in split_for_chunk(to_rescan, chunk_size=cls.CHUNK_SIZE): logger.info("Rescan %s outdated malware files", len(files)) await g.sink.process_message( MessageType.MalwareRescanFiles( files=files, type=MalwareScanType.RESCAN_OUTDATED ) ) # delete db entries for non-existent files for hits_to_delete in split_for_chunk( not_exist_hits, chunk_size=cls.CHUNK_SIZE ): deleted = ( MalwareHit.delete() .where(MalwareHit.id.in_(hits_to_delete)) .where(MalwareHit.status == MalwareHitStatus.FOUND) .execute() ) logger.info("Deleted %s not exist malware hits", deleted) # don't block the whole loop for too long, # return control to the loop every iteration await asyncio.sleep(0) @classmethod async def cleanup(cls): with inactivity.track.task("malware hits relevance check"): await cls._cleanup()