"""
This program is free software: you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License,
or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see .
Copyright © 2019 Cloud Linux Software Inc.
This software is also available under ImunifyAV commercial license,
see
"""
import asyncio
import re
import shutil
import time
import uuid
from contextlib import suppress
from functools import partial
from logging import getLogger
from pathlib import Path
from typing import Dict, Iterable, List, Tuple
from defence360agent import utils
from defence360agent.api import inactivity
from defence360agent.contracts.config import (
Malware as Config,
MyImunifyConfig,
)
from defence360agent.contracts.hook_events import HookEvent
from defence360agent.contracts.license import LicenseCLN
from defence360agent.contracts.messages import MessageType
from defence360agent.contracts.permissions import myimunify_protection_enabled
from defence360agent.contracts.plugins import (
MessageSink,
MessageSource,
expect,
)
from defence360agent.internals.global_scope import g
from defence360agent.utils import (
Scope,
nice_iterator,
recurring_check,
split_for_chunk,
)
from defence360agent.utils.common import DAY, MINUTE, rate_limit
from imav.malwarelib.cleanup.cleaner import (
CleanupResult,
MalwareCleaner,
MalwareCleanupProxy,
)
from imav.malwarelib.cleanup.storage import CleanupStorage
from imav.malwarelib.config import (
MalwareHitStatus,
MalwareScanResourceType,
MalwareScanType,
)
from imav.malwarelib.model import MalwareHistory, MalwareHit
from imav.malwarelib.scan import ScanAlreadyCompleteError
from imav.malwarelib.scan.mds.cleaner import MalwareDatabaseCleaner
from imav.malwarelib.scan.mds.detached import (
MDSDetachedCleanup,
MDSDetachedRestore,
)
from imav.malwarelib.scan.mds.restore import MalwareDatabaseRestore
from imav.malwarelib.subsys.malware import HackerTrapHitsSaver, MalwareAction
from imav.malwarelib.utils import malware_response
from imav.malwarelib.utils.user_list import (
get_username_by_uid,
is_uid,
)
logger = getLogger(__name__)
COUNT_OF_ATTEMPTS_TO_CLEANUP_PER_DAY = 4
_group_by_status = partial(MalwareHit.group_by_attribute, attribute="status")
_group_by_user = partial(MalwareHit.group_by_attribute, attribute="owner")
throttled_log_error = rate_limit(period=DAY, on_drop=logger.warning)(
logger.error
)
def filter_cleanable(hits: Iterable[MalwareHit]) -> Iterable:
return (hit for hit in hits if hit.status == MalwareHitStatus.FOUND)
class Cleanup(MessageSink, MessageSource):
def __init__(self):
self._cleanup_task = None
self._store_original_task = None
self._running = False
self._loop = None
self._sink = None
self._proxy = None
self._cleaner = None
async def create_source(self, loop, sink):
self._loop = loop
self._sink = sink
self._proxy = MalwareCleanupProxy()
self._cleaner = MalwareCleaner(loop=loop, sink=sink)
self._cleanup_task = loop.create_task(self.cleanup())
async def create_sink(self, loop):
pass
async def shutdown(self):
if self._cleanup_task:
self._cleanup_task.cancel()
with suppress(asyncio.CancelledError):
await self._cleanup_task
@expect(MessageType.MalwareCleanupTask)
async def process_cleanup_task(self, message: Dict):
cause = message.get("cause")
initiator = message.get("initiator")
post_action = message.get("post_action")
scan_id = message.get("scan_id")
standard_only = message.get("standard_only")
manual_cleanup = cause is None
# In case another scan already found some of the hits
# and the cleanup for them has started.
origin_hits_num = len(message["hits"])
hits = MalwareHit.refresh_hits(
message["hits"], include_scan_info=not manual_cleanup
)
hits = filter_cleanable(hits)
hits = [
hit
for hit in hits
if hit.resource_type == MalwareScanResourceType.FILE.value
]
if (
not manual_cleanup
): # don't use any limits when run cleanup manually
rescan_hits, hits = self._split_hits_by_scan_type(
hits, [MalwareScanType.RESCAN, MalwareScanType.RESCAN_OUTDATED]
)
rescan_hits = await self._filter_rescan_hits(rescan_hits)
hits = rescan_hits + await self._filter_failed_to_cleanup_hits(
hits
)
if filtered := origin_hits_num - len(hits):
logger.info(
"%s/%s hits filtered before cleanup",
filtered,
origin_hits_num,
)
self._store_original_task = self._loop.create_task(
self._store_original(
hits, cause, initiator, post_action, scan_id, standard_only
)
)
@staticmethod
def _split_hits_by_scan_type(
hits: list, scan_types: List[MalwareScanType]
) -> Tuple[list, list]:
target_hits, other_hits = [], []
for hit in hits:
if hit.scanid.type in scan_types:
target_hits.append(hit)
else:
other_hits.append(hit)
return target_hits, other_hits
@staticmethod
async def _filter_failed_to_cleanup(
hits: list, *, time_range: float, allowed_attempts: int
) -> list:
hits_to_clean = []
if hits:
since = time.time() - time_range
failed_cleanup_count = {}
for hits_chunk in split_for_chunk(hits, chunk_size=200):
failed_cleanup_count.update(
dict(
MalwareHistory.get_failed_cleanup_events_count(
[hit.orig_file for hit in hits_chunk],
since=since,
)
)
)
await asyncio.sleep(0)
for hit in hits:
failures = failed_cleanup_count.get(hit.orig_file, 0)
if failures >= allowed_attempts:
throttled_log_error(
"Skip cleanup file '%s', since there are too many "
"attempts to cleanup it in %s sec [%s]",
hit.orig_file,
time_range,
failures,
)
continue
hits_to_clean.append(hit)
return hits_to_clean
async def _filter_rescan_hits(self, hits: list) -> list:
return await self._filter_failed_to_cleanup(
hits, time_range=5 * MINUTE, allowed_attempts=2
)
async def _filter_failed_to_cleanup_hits(self, hits: list) -> list:
"""
Don't try to cleanup the same hit more than
*COUNT_OF_ATTEMPTS_TO_CLEANUP_PER_DAY*
"""
return await self._filter_failed_to_cleanup(
hits,
time_range=DAY,
allowed_attempts=COUNT_OF_ATTEMPTS_TO_CLEANUP_PER_DAY,
)
async def _store_original(
self, hits, cause, initiator, post_action, scan_id, standard_only
):
MalwareHit.set_status(hits, MalwareHitStatus.CLEANUP_STARTED)
original_status = _group_by_status(hits)
with inactivity.track.task("cleanup_storage"):
succeeded, failed, not_exist = await CleanupStorage.store_all(hits)
for hit in failed:
await self._sink.process_message(
MessageType.CleanupFailed(
message=(
"Failed to store the original from {} to {}".format(
hit.orig_file, CleanupStorage.path
)
),
timestamp=int(time.time()),
)
)
self._add_to_proxy(
succeeded, cause, initiator, post_action, scan_id, standard_only
)
for status, hit_list in original_status.items():
MalwareHit.set_status([h for h in failed if h in hit_list], status)
MalwareHit.delete_instances(not_exist)
await MalwareAction.cleanup_failed(
not_exist, cause=cause, initiator=initiator
)
def _add_to_proxy(
self, hits, cause, initiator, post_action, scan_id, standard_only
):
standard_only_hits = []
advanced_hits = []
for hit in hits:
standard_only_user = decide_if_standard_signatures_only(
initiator, standard_only
)
if standard_only_user:
standard_only_hits.append(hit)
else:
advanced_hits.append(hit)
self._proxy.add(
cause,
initiator,
post_action,
scan_id,
True,
standard_only_hits,
)
self._proxy.add(
cause,
initiator,
post_action,
scan_id,
standard_only, # None if default action otherwise False
advanced_hits,
)
@staticmethod
def _user_hits(hits):
user_hits = _group_by_user(hits)
return user_hits
def _cloud_assisted_hits(self):
action_hits = self._proxy.flush()
for (
cause,
initiator,
post_action,
scan_id,
standard_only,
all_hits,
) in action_hits:
blacklist = [
hit
for hit in all_hits
if re.match(r"\w+-BLKH-|cloudhash\.|cld-", hit.type)
]
regular_hits = [hit for hit in all_hits if hit not in blacklist]
yield (
regular_hits,
blacklist,
cause,
initiator,
post_action,
scan_id,
standard_only,
)
async def _start_hook(self, cleanup_id, started, hits):
dump = [hit.as_dict() for hit in hits]
cleanup_started = HookEvent.MalwareCleanupStarted(
cleanup_id=cleanup_id,
started=started,
total_files=len(hits),
DUMP=dump,
)
await self._sink.process_message(cleanup_started)
async def _clean_files(
self,
hits,
blacklist=None,
cause=None,
initiator=None,
post_action=None,
scan_id=None,
standard_only=None,
):
user_hits = self._user_hits(hits)
user_hits_black = self._user_hits(blacklist or [])
for user in {*user_hits, *user_hits_black}:
hits_regular = user_hits.get(user, [])
hits_black = user_hits_black.get(user, [])
user_hits_all = hits_regular + hits_black
files = [hit.orig_file for hit in hits_regular]
black = [hit.orig_file for hit in hits_black]
logger.debug("Cleaning files: %s", files + black)
cleanup_id = uuid.uuid4().hex
started = time.time()
if is_uid(user): # non panel user
uid = user
if not LicenseCLN.is_unlimited():
logger.error(
f"Can't clean files for non panel user {uid=}, "
"since license is limited"
)
await self._sink.process_message(
MessageType.MalwareCleanup(
hits=user_hits_all,
result={},
cleanup_id=cleanup_id,
started=started,
error="Cleanup failed. License restriction",
cause=cause,
initiator=initiator,
post_action=post_action,
scan_id=scan_id,
args=[],
)
)
continue
if not (username := await get_username_by_uid(uid)):
logger.error(
f"Can't find username for {uid=}. Skip cleanup"
)
continue
user = username
await self._start_hook(cleanup_id, started, user_hits_all)
result, error, cmd = await self._cleaner.start(
user,
files,
soft=Config.CLEANUP_TRIM,
blacklist=black,
standard_only=standard_only,
)
await self._sink.process_message(
MessageType.MalwareCleanup(
hits=user_hits_all,
result=result,
cleanup_id=cleanup_id,
started=started,
error=error,
cause=cause,
initiator=initiator,
post_action=post_action,
scan_id=scan_id,
args=cmd,
)
)
async def _cleanup(self):
if self._running:
return
if not self._proxy.hits:
self._proxy.reset()
return
self._running = True
with inactivity.track.task("cleanup"):
try:
data = self._cloud_assisted_hits()
for (
all_hits,
blacklist,
cause,
initiator,
post_action,
scan_id,
standard_only,
) in data:
await self._clean_files(
all_hits,
blacklist=blacklist,
cause=cause,
initiator=initiator,
post_action=post_action,
scan_id=scan_id,
standard_only=standard_only,
)
finally:
self._running = False
@recurring_check(1)
async def cleanup(self):
await self._cleanup()
class ResultProcessor(MessageSink, MessageSource):
SCOPE = Scope.AV
async def create_sink(self, loop):
pass
async def create_source(self, loop, sink):
self._sink = sink
@staticmethod
def _set_hit_status(hits: List[MalwareHit], status: str, cleaned_at=None):
MalwareHit.set_status(hits, status, cleaned_at)
for hit in hits:
hit.status = status
hit.cleaned_at = cleaned_at
@expect(MessageType.MalwareCleanup)
async def store_result(self, message):
hits: List[MalwareHit] = message["hits"]
result: CleanupResult = message["result"]
cause = message.get("cause")
initiator = message.get("initiator")
now = time.time()
processed = [hit for hit in hits if hit in result]
unprocessed = [hit for hit in hits if hit not in result]
not_exist = []
async for hit in nice_iterator(processed, chunk_size=100):
# in case if procu2.php tries to clean user file in root dirs,
# it will be marked as non-existent due to 'Permission denied'
# error which confuses users, so consider it as unable to cleanup.
if result[hit].not_exist(): # pragma: no cover
if hit.orig_file_path.exists():
unprocessed.append(hit)
else:
not_exist.append(hit)
await MalwareAction.cleanup_unable(
unprocessed, cause=cause, initiator=initiator
)
requires_myimunify_protection = [
hit
for hit in processed
if result[hit].requires_myimunify_protection()
]
await MalwareAction.cleanup_requires_myimunify_protection(
requires_myimunify_protection, cause=cause, initiator=initiator
)
self._set_hit_status(
requires_myimunify_protection,
MalwareHitStatus.CLEANUP_REQUIRES_MYIMUNIFY_PROTECTION,
now,
)
failed = [hit for hit in processed if result[hit].is_failed()]
await MalwareAction.cleanup_failed(
failed, cause=cause, initiator=initiator
)
cleaned = [hit for hit in processed if result[hit].is_cleaned()]
await MalwareAction.cleanup_done(
cleaned, cause=cause, initiator=initiator
)
self._set_hit_status(cleaned, MalwareHitStatus.CLEANUP_DONE, now)
removed = [hit for hit in processed if result[hit].is_removed()]
await MalwareAction.cleanup_removed(
removed, cause=cause, initiator=initiator
)
self._set_hit_status(removed, MalwareHitStatus.CLEANUP_REMOVED, now)
MalwareHit.delete_instances(not_exist)
for status, hit_list in _group_by_status(unprocessed, failed).items():
self._set_hit_status(hit_list, status)
await self.send_failed_to_cleanup_hits_to_mrs(failed)
return message
async def send_failed_to_cleanup_hits_to_mrs(self, failed_to_cleanup_hits):
if failed_to_cleanup_hits:
await self._sink.process_message(
MessageType.MalwareMRSUpload(
hits=[
malware_response.HitInfo(hit.orig_file, hit.hash)
for hit in failed_to_cleanup_hits
],
upload_reason="cleanup_failure_current",
)
)
await self._sink.process_message(
MessageType.MalwareMRSUpload(
hits=[
malware_response.HitInfo(
str(CleanupStorage.get_hit_store_path(hit)),
hit.hash,
)
for hit in failed_to_cleanup_hits
],
upload_reason="cleanup_failure_original",
)
)
class StorageController(MessageSink):
"""Remove old backed up files from storage"""
def __init__(self):
self._clear_task = None
self._keep = Config.CLEANUP_KEEP
async def create_sink(self, loop):
self._clear_task = loop.create_task(self.daily_clear())
async def shutdown(self):
if self._clear_task:
self._clear_task.cancel()
with suppress(asyncio.CancelledError):
await self._clear_task
async def _clear(self):
now = time.time()
keep_hits = now - self._keep * DAY
keep_orig = now - (self._keep + 1) * DAY # keep files one more day
MalwareHit.delete().where(MalwareHit.cleaned_at < keep_hits).execute()
cleared = await CleanupStorage.clear(keep_orig)
if cleared:
logger.info(
"Cleanup storage have cleaned. Files removed: %s", cleared
)
@expect(MessageType.ConfigUpdate)
@utils.log_error_and_ignore()
async def config_updated(self, _):
if self._keep != Config.CLEANUP_KEEP:
self._keep = Config.CLEANUP_KEEP
await self._clear()
@recurring_check(DAY)
async def daily_clear(self):
await self._clear()
def decide_if_standard_signatures_only(user, standard_only):
"""Root user or user with MyImunify can use advanced signatures"""
if not MyImunifyConfig.ENABLED:
return False
if user is None or user == "root" or myimunify_protection_enabled(user):
return standard_only
return True
class ResultProcessorIm360(ResultProcessor):
"""Imunify360 specialization of ResultProcessor, which removes all
cleaned and removed files from HackerTrap
"""
SCOPE = Scope.IM360
@expect(MessageType.MalwareCleanup)
async def store_result(self, message):
message = await super().store_result(message)
to_remove = [
Path(hit)
for hit, state in message["result"].items()
if (state.is_cleaned() or state.is_removed())
]
await HackerTrapHitsSaver.update_sa_hits([], to_remove)
class CleanupDb(MessageSink):
SCOPE = Scope.IM360
def __init__(self):
self._loop = None
@staticmethod
async def _start_cleaner(path, app_name):
cleanup_id = uuid.uuid4().hex
await MalwareDatabaseCleaner(cleanup_id, path, app_name).start()
async def _cleanup_next(self):
if (
MalwareHit.db_hits_under_cleanup().exists()
or (
next_hit := MalwareHit.db_hits_pending_cleanup()
.order_by(MalwareHit.timestamp.asc())
.first()
)
is None
):
return
logger.info(
"Cleaning hit: (%s::%s)", next_hit.orig_file, next_hit.app_name
)
MalwareHit.set_status([next_hit], MalwareHitStatus.CLEANUP_STARTED)
await self._start_cleaner(next_hit.orig_file, next_hit.app_name)
async def create_sink(self, loop):
self._loop = loop
await self._cleanup_next()
@expect(MessageType.MalwareCleanupTask)
async def process_cleanup_task(self, message):
hits = MalwareHit.refresh_hits(message["hits"])
hits_to_clean = filter_cleanable(hits)
db_hits = [
hit
for hit in hits_to_clean
if hit.resource_type == MalwareScanResourceType.DB.value
]
if not db_hits:
return
MalwareHit.set_status(db_hits, MalwareHitStatus.CLEANUP_PENDING)
await self._cleanup_next()
@expect(MessageType.MalwareCleanComplete)
async def parse_cleanup_results(self, message):
clean_id = message["scan_id"]
detached_cleanup = MDSDetachedCleanup(clean_id)
try:
cleanup_outcome = await detached_cleanup.complete()
except ScanAlreadyCompleteError:
# This happens when AV is woken up by AiBolit. See DEF-11078.
logger.warning(
"Cannot complete cleanup %s, assuming it is already complete",
clean_id,
)
return
finally:
shutil.rmtree(
str(detached_cleanup.detached_dir), ignore_errors=True
)
await g.sink.process_message(cleanup_outcome)
@expect(MessageType.MalwareDatabaseCleanup)
async def update_cleaned_hits_status(
self, message: MessageType.MalwareDatabaseCleanup
):
cleaned_hits = MalwareHit.db_hits_under_cleanup_in(message.succeeded)
failed_hits = MalwareHit.db_hits_under_cleanup_in(message.failed)
MalwareHit.set_status(
cleaned_hits, MalwareHitStatus.CLEANUP_DONE, time.time()
)
MalwareHit.set_status(failed_hits, MalwareHitStatus.FOUND)
await self._cleanup_next()
@expect(MessageType.MalwareDatabaseCleanupFailed)
async def update_failed_hits_status(self, message):
"""
Clear the queue when the cleanup fails,
set hits' status back to infected
"""
# We assume here that all CLEANUP_STARTED hits are part of the
# same cleanup operation
hits = MalwareHit.db_hits_under_cleanup()
MalwareHit.set_status(hits, MalwareHitStatus.FOUND)
await self._cleanup_next()
@expect(MessageType.MalwareDatabaseCleanup)
async def save_cleanup_events_in_history(
self, message: MessageType.MalwareDatabaseCleanup
):
cause = None
initiator = None
cleaned_hits = MalwareHit.get_db_hits(message.succeeded)
await MalwareAction.cleanup_done(
cleaned_hits, cause=cause, initiator=initiator
)
failed_hits = MalwareHit.get_db_hits(message.failed)
await MalwareAction.cleanup_failed(
failed_hits, cause=cause, initiator=initiator
)
class RestoreOriginalDb(MessageSink):
SCOPE = Scope.IM360
def __init__(self):
self.loop = None
@staticmethod
async def _restore_next():
if (
MalwareHit.db_hits_under_cleanup_restore().exists()
or (
hit_to_restore := MalwareHit.db_hits_pending_cleanup_restore()
.order_by(MalwareHit.timestamp.asc())
.first()
)
is None
):
return
logger.info(
"Restoring from cleanup hit: (%s::%s)",
hit_to_restore.orig_file,
hit_to_restore.app_name,
)
await MalwareDatabaseRestore(
path=hit_to_restore.orig_file, app_name=hit_to_restore.app_name
).restore()
MalwareHit.set_status(
[hit_to_restore], MalwareHitStatus.CLEANUP_RESTORE_STARTED
)
async def create_sink(self, loop):
self.loop = loop
await self._restore_next()
@staticmethod
def _filter_under_restore(
hits: Iterable[MalwareHit],
) -> Iterable[MalwareHit]:
return (
hit
for hit in hits
if hit.status == MalwareHitStatus.CLEANUP_RESTORE_STARTED
)
@expect(MessageType.MalwareDatabaseRestoreTask)
async def queue_db_restore(self, message):
MalwareHit.set_status(
MalwareHit.db_hits()
.where(MalwareHit.orig_file == message.path)
.where(MalwareHit.app_name == message.app_name),
MalwareHitStatus.CLEANUP_RESTORE_PENDING,
)
await self._restore_next()
@expect(MessageType.MalwareRestoreComplete)
async def parse_restore_results(self, message):
restore_id = message["scan_id"]
detached_restore = MDSDetachedRestore(restore_id)
try:
restore_message = await detached_restore.complete()
except ScanAlreadyCompleteError:
# This happens when AV is woken up by AiBolit. See DEF-11078.
logger.warning(
"Cannot complete restore %s, assuming it is already complete",
restore_id,
)
return
finally:
shutil.rmtree(
str(detached_restore.detached_dir), ignore_errors=True
)
await g.sink.process_message(restore_message)
@expect(MessageType.MalwareDatabaseRestore)
async def update_restored_hits_status(self, message):
restored_hits = MalwareHit.get_db_hits(message.succeeded)
MalwareHit.set_status(
self._filter_under_restore(restored_hits), MalwareHitStatus.FOUND
)
await self._restore_next()
@expect(MessageType.MalwareDatabaseRestore)
async def save_restore_events_in_history(self, message):
cause = message.get("cause")
initiator = message.get("initiator")
restored_hits = MalwareHit.get_db_hits(message.succeeded)
for hit in restored_hits:
# FIXME: change cleanup_restored_original to accept multiple
# values
await MalwareAction.cleanup_restored_original(
path=hit.orig_file,
app_name=hit.app_name,
resource_type=MalwareScanResourceType.DB.value,
file_owner=hit.owner,
file_user=hit.user,
initiator=initiator,
cause=cause,
db_host=hit.db_host,
db_port=hit.db_port,
db_name=hit.db_name,
)
failed_hits = MalwareHit.get_db_hits(message.failed)
for hit in failed_hits:
await MalwareAction.cleanup_failed_restore(
path=hit.orig_file,
app_name=hit.app_name,
resource_type=MalwareScanResourceType.DB.value,
file_owner=hit.owner,
file_user=hit.user,
initiator=initiator,
cause=cause,
db_host=hit.db_host,
db_port=hit.db_port,
db_name=hit.db_name,
)
@expect(MessageType.MalwareDatabaseRestoreFailed)
async def update_failed_hits_status(self, message):
"""
Clear the queue when the restore fails,
set hits' status back to cleanup_done
"""
hits = MalwareHit.db_hits_under_restoration()
MalwareHit.set_status(hits, MalwareHitStatus.CLEANUP_DONE)
await self._restore_next()