"""
This program is free software: you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License,
or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see .
Copyright © 2019 Cloud Linux Software Inc.
This software is also available under ImunifyAV commercial license,
see
"""
import asyncio
import os
import time
from datetime import datetime, timedelta
from logging import getLogger
from pathlib import Path
from defence360agent.api import inactivity
from defence360agent.contracts.messages import MessageType
from defence360agent.internals.global_scope import g
from defence360agent.mr_proper import BaseCleaner
from defence360agent.subsys import persistent_state
from defence360agent.utils import nice_iterator, split_for_chunk
from imav.malwarelib.config import (
MalwareHitStatus,
MalwareScanResourceType,
MalwareScanType,
)
from imav.malwarelib.model import MalwareHistory, MalwareHit, MalwareScan
from imav.subsys import realtime_av
logger = getLogger(__name__)
class OutdatedScansCleaner(BaseCleaner):
PERIOD = timedelta(days=1).total_seconds()
CLEANUP_LIMIT_DELTA = timedelta(days=30)
LOCK_FILE = (
persistent_state.PERSISTENT_STATE_DIR / ".malware-scans-cleaner.lock"
)
@classmethod
async def cleanup(cls) -> None:
cleanup_time_limit = int(
(datetime.now() - cls.CLEANUP_LIMIT_DELTA).timestamp()
)
deleted = (
MalwareScan.delete()
.where(MalwareScan.started < cleanup_time_limit)
.execute()
)
logger.info("Cleaned %s outdated scans", deleted)
class OutdatedHistoryCleaner(BaseCleaner):
PERIOD = timedelta(days=1).total_seconds()
CLEANUP_LIMIT_DELTA = timedelta(days=30)
LOCK_FILE = (
persistent_state.PERSISTENT_STATE_DIR / ".malware-history-cleaner.lock"
)
@classmethod
async def cleanup(cls) -> None:
keep_time_threshold = int(
(datetime.now() - cls.CLEANUP_LIMIT_DELTA).timestamp()
)
deleted = (
MalwareHistory.delete()
.where(MalwareHistory.ctime < keep_time_threshold)
.execute()
)
logger.info("Cleaned %s outdated malware history", deleted)
class OutdatedHitsCleaner(BaseCleaner):
PERIOD = int(
os.getenv(
"IMUNIFY360_OUTDATED_HITS_CHECK_INTERVAL",
timedelta(days=1).total_seconds(),
)
)
FILE_NAME = os.getenv(
"MALWARE_HITS_CLEANER_LOCK_FILE", ".malware-hits-cleaner.lock"
)
LOCK_FILE = persistent_state.PERSISTENT_STATE_DIR / FILE_NAME
REALTIME_SCAN_THRESHOLD = timedelta(minutes=10).total_seconds()
CHUNK_SIZE = 1000
@classmethod
async def _cleanup(cls) -> None:
"""Rescan irrelevant malware hits"""
to_rescan = []
not_exist_hits = []
hits = (
MalwareHit.select()
.where(
MalwareHit.status == MalwareHitStatus.FOUND,
MalwareHit.resource_type == MalwareScanResourceType.FILE.value,
)
.order_by(MalwareHit.timestamp.asc())
)
async for hit in nice_iterator(hits, chunk_size=cls.CHUNK_SIZE):
orig_file_path = Path(hit.orig_file)
try:
file_ctime = orig_file_path.stat().st_ctime
if hit.timestamp < file_ctime:
# rescan the modified files after scanning,
# they may not be infected anymore
realtime_threshold = (
time.time() - cls.REALTIME_SCAN_THRESHOLD
) # don't scan file twice
if (
not realtime_av.should_be_running()
or file_ctime < realtime_threshold
):
to_rescan.append(hit.orig_file)
except FileNotFoundError:
not_exist_hits.append(hit.id)
except OSError as exc:
logger.warning("Can't check file due to %s", exc)
if to_rescan:
for files in split_for_chunk(to_rescan, chunk_size=cls.CHUNK_SIZE):
logger.info("Rescan %s outdated malware files", len(files))
await g.sink.process_message(
MessageType.MalwareRescanFiles(
files=files, type=MalwareScanType.RESCAN_OUTDATED
)
)
# delete db entries for non-existent files
for hits_to_delete in split_for_chunk(
not_exist_hits, chunk_size=cls.CHUNK_SIZE
):
deleted = (
MalwareHit.delete()
.where(MalwareHit.id.in_(hits_to_delete))
.where(MalwareHit.status == MalwareHitStatus.FOUND)
.execute()
)
logger.info("Deleted %s not exist malware hits", deleted)
# don't block the whole loop for too long,
# return control to the loop every iteration
await asyncio.sleep(0)
@classmethod
async def cleanup(cls):
with inactivity.track.task("malware hits relevance check"):
await cls._cleanup()