"""
This program is free software: you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License,
or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see .
Copyright © 2019 Cloud Linux Software Inc.
This software is also available under ImunifyAV commercial license,
see
"""
import asyncio
import binascii
import functools
import os
import pwd
import shutil
import time
from collections import defaultdict
from logging import getLogger
from pathlib import Path
from typing import (
Callable,
Collection,
Dict,
Iterable,
List,
TYPE_CHECKING,
Tuple,
TypeVar,
Union,
cast,
)
from peewee import IntegrityError
from defence360agent.contracts.config import (
Core,
HackerTrap,
MyImunifyConfig,
UserType,
choose_use_backups_start_from_date,
choose_value_from_config,
should_try_autorestore_malicious,
)
from defence360agent.contracts.permissions import (
MS_CONFIG_DEFAULT_ACTION_EDIT,
has_permission,
myimunify_protection_enabled,
)
from defence360agent.internals.global_scope import g
from defence360agent.model.simplification import run_in_executor
from defence360agent.subsys import web_server
from defence360agent.subsys.panels import hosting_panel
from defence360agent.subsys.panels.base import (
ModsecVendorsError,
PanelException,
)
from defence360agent.utils import (
COPY_TO_MODSEC_MAXTRIES,
LazyLock,
atomic_rewrite,
base64_decode_filename,
base64_encode_filename,
log_failed_to_copy_to_modsec,
retry_on,
safe_sequence,
)
from imav.contracts.messages import (
MalwareCleanupRevert,
MalwareCleanupTask,
)
from imav.malwarelib.config import (
ADDED_TO_IGNORE,
CLEANUP,
CLEANUP_DONE,
CLEANUP_ON_SCHEDULE,
CLEANUP_REMOVED,
DELETED_FROM_IGNORE,
FAILED_TO_CLEANUP,
FAILED_TO_DELETE_FROM_IGNORE,
FAILED_TO_IGNORE,
FAILED_TO_RESTORE_FROM_BACKUP,
FAILED_TO_RESTORE_ORIGINAL,
FAILED_TO_STORE_ORIGINAL,
FOUND,
MalwareEvent,
MalwareEventPostponed,
MalwareHitStatus,
MalwareScanResourceType,
MalwareScanType,
NOTIFY,
REQUIRES_MYIMUNIFY_PROTECTION,
RESTORED_FROM_BACKUP,
RESTORED_ORIGINAL,
SUBMITTED_FOR_ANALYSIS,
UNABLE_TO_CLEANUP,
)
from imav.malwarelib.model import (
MalwareHistory,
MalwareHit,
MalwareHitAlternate,
MalwareIgnorePath,
MalwareScan,
)
from imav.malwarelib.scan.mds.report import MalwareDatabaseHitInfo
from imav.malwarelib.subsys.restore_from_backup import restore_files
from imav.malwarelib.utils import hash_path
from imav.malwarelib.utils.submit import submit_in_background
from imav.plugins.event_hook_executor import detected_hook
if TYPE_CHECKING:
from imav.malwarelib.cleanup.storage import RestoreReport
logger = getLogger(__name__)
#: a type for generic path functions
PathLike = Union[str, bytes, os.PathLike]
#: Declare type variable, to be used in generic functions
T = TypeVar("T")
#: Type variable for generic apply_default_action function
HitInfoType = TypeVar(
"HitInfoType", MalwareHitAlternate, MalwareDatabaseHitInfo
)
def update_malware_history(coro):
"""Decorator responsible for logging malware events into DB"""
@functools.wraps(coro)
async def async_wrapper(
cls,
path,
file_owner,
file_user,
initiator=None,
cause=None,
resource_type=None,
app_name=None,
db_host=None,
db_port=None,
db_name=None,
table_name=None,
table_field=None,
table_row_inf=None,
scan_id=None,
**kwargs,
):
result = await coro(
cls,
path=path,
file_owner=file_owner,
file_user=file_user,
initiator=initiator or UserType.ROOT,
app_name=app_name,
resource_type=resource_type,
db_host=db_host,
db_port=db_port,
db_name=db_name,
scan_id=scan_id,
**kwargs,
)
await run_in_executor(
asyncio.get_event_loop(),
lambda: MalwareHistory.save_event(
event=result.title,
path=path,
app_name=app_name,
resource_type=resource_type,
file_owner=file_owner,
file_user=file_user,
initiator=initiator,
cause=cause,
db_host=db_host,
db_port=db_port,
db_name=db_name,
table_name=table_name,
table_field=table_field,
table_row_inf=table_row_inf,
scan_id=scan_id,
),
)
return result
@functools.wraps(coro)
def wrapper(
cls,
path,
file_owner,
file_user,
initiator=None,
cause=None,
resource_type=None,
app_name=None,
db_host=None,
db_port=None,
db_name=None,
table_name=None,
table_field=None,
table_row_inf=None,
scan_id=None,
**kwargs,
):
result = coro( # coro is a ordinary function here
cls,
path=path,
file_owner=file_owner,
file_user=file_user,
initiator=initiator or UserType.ROOT,
app_name=app_name,
resource_type=resource_type,
db_host=db_host,
db_port=db_port,
db_name=db_name,
scan_id=scan_id,
**kwargs,
)
MalwareHistory.save_event(
event=result.title,
path=path,
app_name=app_name,
resource_type=resource_type,
file_owner=file_owner,
file_user=file_user,
initiator=initiator,
cause=cause,
db_host=db_host,
db_port=db_port,
db_name=db_name,
table_name=table_name,
table_field=table_field,
table_row_inf=table_row_inf,
scan_id=scan_id,
)
return result
return async_wrapper if asyncio.iscoroutinefunction(coro) else wrapper
def multiple_update_malware_history(coro):
"""
Decorator responsible for logging multiple malware events into DB at once.
Decorated function accepts an iterable of `MalwareHit`s.
"""
async def wrapper(
cls, hits: Iterable[MalwareHit], initiator=None, cause=None
):
results = await asyncio.gather(
*(
coro(
cls,
path=hit.orig_file,
file_owner=hit.owner,
file_user=hit.user,
)
for hit in hits
)
)
if not results:
return results
MalwareHistory.save_events(
[
{
"event": result.title,
"path": hit.orig_file,
"resource_type": hit.resource_type,
"app_name": hit.app_name,
"file_owner": hit.owner,
"file_user": hit.user,
"cause": cause or MalwareScanType.MANUAL,
"initiator": initiator or UserType.ROOT,
"db_host": hit.db_host,
"db_port": hit.db_port,
"db_name": hit.db_name,
"scan_id": hit.scanid,
}
for hit, result in zip(hits, results)
]
)
return results
return wrapper
def bulk_update_malware_history(coro):
"""
Decorator responsible for logging multiple malware events into DB at once.
Decorated function accepts an iterable of `MalwareHit`s.
"""
async def wrapper(
cls, hits: Iterable[MalwareHit], cause=None, initiator=None, **kwargs
):
hit_results = await coro(cls, hits, **kwargs)
if not hit_results:
return hit_results
MalwareHistory.save_events(
[
{
"event": result.title,
"path": hit.orig_file,
"file_owner": hit.owner,
"file_user": hit.user,
"cause": cause or MalwareScanType.MANUAL,
"initiator": initiator or UserType.ROOT,
}
for hit, result in hit_results.items()
]
)
return hit_results
return wrapper
def choose_action_for_malicious(username: str) -> Tuple[str, str]:
if MyImunifyConfig.ENABLED:
if not myimunify_protection_enabled(username):
return NOTIFY, username
if has_permission(MS_CONFIG_DEFAULT_ACTION_EDIT, username):
return choose_value_from_config(
"MALWARE_SCANNING", "default_action", username
)
return choose_value_from_config("MALWARE_SCANNING", "default_action")
class MalwareAction:
"""
Responsible for manipulations with malware files.
As long as each handler function is wrapped in `update_malware_history`,
arguments should be passed in kwargs form.
"""
_CALLBACK = defaultdict(set)
@classmethod
async def run_callbacks_for(cls, method_name, path, title):
"""Execute callback for specific action"""
for callback in cls._CALLBACK[method_name]:
try:
await callback(path, MalwareEvent(title))
except asyncio.CancelledError:
raise
except Exception as e:
logger.exception(
"Error '{!r}' happened when run callback {} for"
"MalwareAction {} method".format(e, callback, method_name)
)
@classmethod
def add_callback(cls, method_name, coro):
cls._CALLBACK[method_name].add(coro)
@classmethod
@update_malware_history
async def submit_for_analysis(
cls, path, type, reason=None, **_
) -> MalwareEvent:
submit_in_background(path, type, reason)
return MalwareEvent(SUBMITTED_FOR_ANALYSIS)
@classmethod
@update_malware_history
async def ignore(cls, path, resource_type, **_) -> MalwareEvent:
try:
await run_in_executor(
asyncio.get_event_loop(),
lambda: MalwareIgnorePath.create(
path=path, resource_type=resource_type
),
)
except IntegrityError:
title = FAILED_TO_IGNORE
else:
title = ADDED_TO_IGNORE
return MalwareEvent(title)
@classmethod
@update_malware_history
def delete_from_ignore_sync(cls, path, **_) -> MalwareEvent:
deleted = (
MalwareIgnorePath.delete()
.where(MalwareIgnorePath.path == path)
.execute()
)
return MalwareEvent(
DELETED_FROM_IGNORE if deleted else FAILED_TO_DELETE_FROM_IGNORE
)
@classmethod
@update_malware_history
async def notify(cls, *_, **__):
# TODO: should be sending email here, but not implemented yet
return MalwareEvent(FOUND)
@classmethod
@update_malware_history
async def cleanup_failed_restore(cls, *_, **__):
return MalwareEvent(FAILED_TO_RESTORE_ORIGINAL)
@classmethod
@update_malware_history
async def cleanup_failed_store(cls, *_, **__):
return MalwareEvent(FAILED_TO_STORE_ORIGINAL)
@classmethod
@update_malware_history
async def cleanup_restored_original(
cls, *_, initiator: str, report: "RestoreReport" = None, **__
):
if report and (sink := g.get("sink")):
report.initiator = initiator
await sink.process_message(MalwareCleanupRevert(report.to_dict()))
return MalwareEvent(RESTORED_ORIGINAL)
@classmethod
@multiple_update_malware_history
async def cleanup_unable(cls, *_, **__):
return MalwareEvent(UNABLE_TO_CLEANUP)
@classmethod
@multiple_update_malware_history
async def cleanup_done(cls, path, *_, **__):
await cls.run_callbacks_for("cleanup", path, CLEANUP_DONE)
return MalwareEvent(CLEANUP_DONE)
@classmethod
@multiple_update_malware_history
async def cleanup_removed(cls, *_, **__):
return MalwareEvent(CLEANUP_REMOVED)
@classmethod
@multiple_update_malware_history
async def cleanup_failed(cls, *_, **__):
return MalwareEvent(FAILED_TO_CLEANUP)
@classmethod
@multiple_update_malware_history
async def cleanup_requires_myimunify_protection(cls, *_, **__):
return MalwareEvent(REQUIRES_MYIMUNIFY_PROTECTION)
@classmethod
async def apply_default_action(
cls,
hits: List[MalwareHitAlternate],
initiator=None,
cause=None,
sink=None,
**kwargs,
) -> List[Tuple[MalwareHitAlternate, MalwareEvent, str, bool]]:
"""Perform action with malware which user set in the config"""
results = []
for h in hits:
_, config_owner = choose_action_for_malicious(username=h.user)
event = await cls.notify(
file_owner=h.owner,
file_user=h.user,
path=h.orig_file,
initiator=initiator or config_owner,
cause=cause,
**kwargs,
)
results.append((h, event, NOTIFY, False))
return results
@classmethod
async def multiple(cls, action, hits):
"""
Apply the action to multiple hits
:param action: thr action to apply
:param hits: list of hits
"""
for hit in hits:
await action(hit.orig_file, hit.user)
@classmethod
def _get_tmp_dir(cls, file_owner):
hp = hosting_panel.HostingPanel()
try:
user = pwd.getpwnam(file_owner)
except (KeyError, TypeError):
return Core.TMPDIR
try:
tmp_dir = str(hp.base_home_dir(user.pw_dir))
except (RuntimeError, FileNotFoundError):
return Core.TMPDIR
return tmp_dir
@classmethod
def _split_hits_on_restore(cls, hits):
to_restore = []
not_restore = []
for hit in hits:
path = hit.orig_file
file_ctime = None
try:
file_ctime = int(os.path.getctime(path))
except FileNotFoundError:
logger.warning(
"File %s not found during restore from backup process",
safe_sequence.path(path),
)
if (
file_ctime is None
or MalwareHistory.select()
.where(
MalwareHistory.path == path,
MalwareHistory.event == FAILED_TO_RESTORE_FROM_BACKUP,
MalwareHistory.ctime >= file_ctime,
)
.first()
is None
):
to_restore.append(hit)
else:
not_restore.append(hit)
return to_restore, not_restore
@classmethod
@bulk_update_malware_history
async def restore_from_backup(
cls, hits, **kwargs
) -> Dict[MalwareHit, MalwareEvent]:
to_restore, not_restore = cls._split_hits_on_restore(hits)
for f in not_restore:
logger.warning(
"File %s wasn't restored from backup"
", because last restore attempt failed",
safe_sequence.path(f.orig_file),
)
user_hits = {} # Dict[str, List[MalwareHit]]
for hit in to_restore:
user_hits.setdefault(hit.user, []).append(hit)
res = {} # type: Dict[MalwareHit, MalwareEvent]
for user, _hits in user_hits.items():
res.update(
await cls._restore_from_backup(
_hits, file_owner=user, **kwargs
)
)
res.update(
(hit, MalwareEvent(FAILED_TO_RESTORE_FROM_BACKUP))
for hit in not_restore
)
return res
@classmethod
async def _restore_from_backup(
cls, hits, file_owner, sink=None, **_
) -> List[Tuple[MalwareHit, MalwareEvent]]:
paths = [h.orig_file for h in hits]
tmp_dir = cls._get_tmp_dir(file_owner)
restored, failed = await restore_files(
files=paths,
until=choose_use_backups_start_from_date(file_owner),
sink=sink,
tmp_dir=tmp_dir,
)
res = []
restored_hits = [h for h in hits if h.orig_file in restored]
failed_hits = [h for h in hits if h.orig_file in failed]
for p in restored:
safe_path = safe_sequence.path(p)
logger.info("File %s was restored from backup", safe_path)
title = RESTORED_FROM_BACKUP
res.extend([(rh, MalwareEvent(title)) for rh in restored_hits])
for p in failed:
safe_path = safe_sequence.path(p)
logger.warning("File %s wasn't restored from backup", safe_path)
title = FAILED_TO_RESTORE_FROM_BACKUP
res.extend([(fh, MalwareEvent(title)) for fh in failed_hits])
return res
def subscribe_to_malware_action(action, coro):
MalwareAction.add_callback(action, coro)
class HackerTrapHitsSaver:
BASE_DIR = HackerTrap.DIR
BASE_PD_DIR = HackerTrap.DIR_PD
NAME = HackerTrap.NAME
MAX_HITS_COUNT = 1000 # lets do 1000 files for now, see how it works
SECONDS_BEFORE_CLEAN = 24 * 60 * 60 # 24 hours between cleanups
STANDALONE_MARK = "-SA-"
LOCK = LazyLock()
@classmethod
def _filepath(cls, filename=None) -> Path:
name = filename or cls.NAME
return Path(cls.BASE_DIR, name)
@classmethod
def _clean_filepath(cls) -> Path:
return Path(cls.BASE_DIR, cls.NAME + ".clean")
@classmethod
def _write(cls, file_list: List[Path], filename=None):
try:
atomic_rewrite(
cls._filepath(filename),
b"\n".join(base64_encode_filename(name) for name in file_list),
backup=False,
allow_empty_content=True,
permissions=0o644,
)
except OSError as oe:
logger.error("Unable to write HackerTrap file: %r", oe)
@classmethod
def _extend(cls, file_list: List[T], files_to_add: List[T]) -> List[T]:
"""
adds files_to_add to file_list
the method has side_effect (file_list will be modified)
yet, given that it is private class method -- we can do it
:param file_list: existing files
:param files_to_add: files to add
:return: joined list, limited to MAX_HITS_COUNT
"""
file_set = set(file_list) # we will use it to speed up lookups
_file_list = file_list.copy()
for file in files_to_add:
# if we are re-adding file, re-add it at the bottom,
# so it doesn't rotate out too fast
if file in file_set:
_file_list.remove(file)
_file_list.append(file)
return _file_list[-cls.MAX_HITS_COUNT :]
@staticmethod
def _clean_list(file_list: Iterable[PathLike]) -> List[PathLike]:
"""
This method checks if any of the files on the list is present
and removes that entry from the list
:param file_list: list of files
:return: new list of files, in the same order, with files that exist
skipped
"""
return [file for file in file_list if not os.path.exists(file)]
@classmethod
def _should_clean(cls, file_mtime, current_time):
return current_time - file_mtime > cls.SECONDS_BEFORE_CLEAN
@classmethod
def _clean_file(cls, file_list: Iterable[PathLike]):
"""
We will use extra file to track last time we cleaned
For that we will use mtime of that file
:param file_list: list to clean
:return: cleaned list
"""
p = cls._clean_filepath()
if p.exists():
if cls._should_clean(p.stat().st_mtime, time.time()):
p.write_bytes(b"")
file_list = cls._clean_list(file_list)
else:
p.write_bytes(b"")
return file_list
@classmethod
def _read(cls, filename=None, skip_exists=True) -> List[Path]:
try:
file_list: List[bytes] = (
cls._filepath(filename).read_bytes().split()
)
decoded_file_list: List[Path] = []
for file in file_list:
try:
decoded_file_list.append(base64_decode_filename(file))
except binascii.Error as e:
logger.error(
"Can't decode filepath [%r] with error [%r]", file, e
)
return (
cls._clean_file(decoded_file_list)
if skip_exists
else decoded_file_list
)
except FileNotFoundError:
return []
@classmethod
async def add_hits(cls, files_to_add: List[Path], *args, **kwargs):
"""Same behavior as for separate hit."""
await cls._add_hits(files_to_add, *args, **kwargs)
await cls.update_sa_hits(files_to_add=[], files_to_remove=files_to_add)
@classmethod
async def _add_hits(cls, files_to_add: List[Path], *args, **kwargs):
try:
file_list: List[Path] = cls._read()
result: List[Path] = cls._extend(file_list, files_to_add)
cls._write(result)
await cls._copy_to_modsec_rules(cls.NAME)
except OSError as oe:
logger.error("Unable to read HackerTrap file %r", oe)
@classmethod
async def add_hit(cls, file_to_add: Path, *args, **kwargs):
"""When storing separate hit it needs to be added to
malware_found_b64.list
and excluded from malware_sa_found_b64.list as well from
proactive/dangerous/[hash]"""
return await cls.add_hits([file_to_add])
@classmethod
async def init(cls):
await cls.add_hits([])
@classmethod
@retry_on(
FileNotFoundError,
max_tries=COPY_TO_MODSEC_MAXTRIES,
on_error=log_failed_to_copy_to_modsec,
silent=True,
)
async def _copy_to_modsec_rules(cls, malware_list_name):
hp = hosting_panel.HostingPanel()
try:
vendor = await hp.get_i360_vendor_name()
except (ModsecVendorsError, PanelException) as e:
logger.warning(str(e))
return False
try:
target = await hp.build_vendor_file_path(vendor, malware_list_name)
except ModsecVendorsError as e:
logger.exception("Can't get malware found list file: %s", e)
return False
found_list = Path(HackerTrap.DIR, malware_list_name)
target_tmp = target.with_suffix(target.suffix + ".tmp")
if (
target.exists()
and target.stat().st_size == found_list.stat().st_size
and target.read_bytes() == found_list.read_bytes()
):
logger.info("Nothing to update")
return False
try:
shutil.copy(str(found_list), str(target_tmp))
target_tmp.rename(target)
return True
except FileNotFoundError as e:
raise e
except OSError as e:
logger.error("Failed to copy malware found list: %s", e)
return False
@classmethod
def _get_exists_hash_files(cls):
with os.scandir(cls.BASE_PD_DIR) as it:
return [entry.name for entry in it if entry.is_file()]
@classmethod
def _create_hash_files(cls, files):
for fname in files:
(Path(cls.BASE_PD_DIR) / Path(fname)).touch(0o644)
@classmethod
def _remove_hash_files(cls, files):
for fname in files:
(Path(cls.BASE_PD_DIR) / Path(fname)).unlink()
@classmethod
def _update_sa_hash_files(cls):
"""
SA hits stored for PD as sha256 hash of full path in
HackerTrap.DIR_PD. Not more than MAX_HITS_COUNT files in dir.
Remove older (by mtime) files first.
"""
try:
saved_files_list = cls._read(
filename=HackerTrap.SA_NAME, skip_exists=False
)
hash_file_list = [
hash_path(path) for path in saved_files_list if path
]
exists_hash_file_list = cls._get_exists_hash_files()
files_to_create = set(hash_file_list) - set(exists_hash_file_list)
files_to_delete = set(exists_hash_file_list) - set(hash_file_list)
cls._create_hash_files(files_to_create)
cls._remove_hash_files(files_to_delete)
except OSError as e:
logger.warning(
"HackerTrap error: %r%s",
e,
f" ({e.filename!r})" if e.filename else "",
)
@classmethod
def _update_sa_hit_list(
cls, files_to_add: List[Path], files_to_remove: List[Path]
) -> bool:
"""
Update file of malware standalone list.
Return True if malware standalone list was changed otherwise False.
"""
try:
saved_list: List[Path] = cls._read(
filename=HackerTrap.SA_NAME, skip_exists=False
)
extended_list: List[Path] = cls._extend(saved_list, files_to_add)
updated_list = [
path for path in extended_list if path not in files_to_remove
]
if updated_list != saved_list:
cls._write(updated_list, filename=HackerTrap.SA_NAME)
return True
except OSError as e:
logger.error("HackerTrap error: %s", e)
return False
@classmethod
async def update_sa_hits(
cls, files_to_add: List[Path], files_to_remove: List[Path]
):
if files_to_add or files_to_remove:
async with cls.LOCK:
if cls._update_sa_hit_list(files_to_add, files_to_remove):
if await cls._copy_to_modsec_rules(HackerTrap.SA_NAME):
await web_server.graceful_restart()
cls._update_sa_hash_files()
@classmethod
async def reset_sa_hits(cls):
"""
Re-populate HackerTrap records using data from database
"""
# WARN: It is critically important to check the 'resource_type'!
# In some cases when scanning DB for malwares the results contain
# '-SA-' mark in the 'type' column. For instance:
# SMW-SA-20634-php.bkdr.wp.fakeplugin-0
# What happens next:
# 1) New 'MalwareHit' records appear, with 'resource_type'=="DB" and
# 'orig_file'=="path to a root directory".
# 2) The config 'malware_standalone_b64.list' gets these paths to root
# directories, instead of paths to scripts.
# 3) The action 'pmFromFile' in the modsec rule 77316817 (and some
# others) matches 'SCRIPT_FILENAME' variable with lines in the config.
# 4) The matching in the modsec module is not a strict comparison,
# but the occurrence of a string within a string.
# For instance, when the config contains the line:
# /home/domain/public_html
# Than all the paths are match with it:
# /home/domain/public_html/admin.php
# /home/domain/public_html/cms/main.php
# As the result of all above, the modsec rule makes false-positive
# conclusion and blocks the request.
# To prevent that, the 'resource_type' must be equal to 'FILE'.
resource_type = MalwareScanResourceType.FILE.value
async with cls.LOCK:
files = (
MalwareHit.select(MalwareHit.orig_file)
.where(
# Only standalone malicious files that was found,
# but not yet cleared/restored
MalwareHit.status.in_(
[
MalwareHitStatus.FOUND,
MalwareHitStatus.CLEANUP_STARTED,
MalwareHitStatus.RESTORE_FROM_BACKUP_STARTED,
]
),
MalwareHit.malicious,
MalwareHit.type.contains(cls.STANDALONE_MARK),
MalwareHit.resource_type == resource_type,
)
.order_by(MalwareHit.timestamp.desc())
.limit(cls.MAX_HITS_COUNT)
.tuples()
)
cls._write(
[os.fsencode(f) for [f] in files], filename=HackerTrap.SA_NAME
)
if await cls._copy_to_modsec_rules(HackerTrap.SA_NAME):
await web_server.graceful_restart()
cls._update_sa_hash_files()
class MalwareActionIm360(MalwareAction):
@classmethod
def _get_handler(cls, action) -> Callable:
possible_actions = {
NOTIFY: cls.notify,
CLEANUP: cls.postpone(
MalwareCleanupTask,
post_action=cls.detect,
action=CLEANUP,
),
CLEANUP_ON_SCHEDULE: cls.postpone(
MalwareCleanupTask,
post_action=cls.detect,
action=CLEANUP_ON_SCHEDULE,
),
}
try:
result = possible_actions[action]
except KeyError:
result = possible_actions[NOTIFY]
logger.error(
"There is no such action '%s'. Config is invalid", action
)
return result
@staticmethod
def postpone(message, **kwargs):
async def wrapper(*_, initiator, cause, **__):
return MalwareEventPostponed(
message, initiator=initiator, cause=cause, **kwargs
)
return wrapper
@classmethod
async def detect(cls, scan_id, sink, **_):
scan = MalwareScan.get(scanid=scan_id)
await detected_hook(
sink,
scan_id,
scan.type,
scan.started,
scan.path,
scan.total_resources,
)
@classmethod
@bulk_update_malware_history
async def restore_from_backup(
cls, hits, **kwargs
) -> Dict[MalwareHit, MalwareEvent]:
to_restore, not_restore = cls._split_hits_on_restore(hits)
for f in not_restore:
logger.warning(
"File %s wasn't restored from backup"
", because last restore attempt failed",
safe_sequence.path(f.orig_file),
)
user_hits = {} # type: Dict[str, List[MalwareHit]]
for hit in to_restore:
user_hits.setdefault(hit.user, []).append(hit)
res = {} # type: Dict[MalwareHit, MalwareEvent]
for user, _hits in user_hits.items():
res.update(
await cls._restore_from_backup(
_hits, file_owner=user, **kwargs
)
)
res.update(
(hit, MalwareEvent(FAILED_TO_RESTORE_FROM_BACKUP))
for hit in not_restore
)
return res
@classmethod
def _split_hits_on_restore(cls, hits):
to_restore = []
not_restore = []
for hit in hits:
path = hit.orig_file
file_ctime = None
try:
file_ctime = int(os.path.getctime(path))
except FileNotFoundError:
logger.warning(
"File %s not found during restore from backup process",
safe_sequence.path(path),
)
if (
file_ctime is None
or MalwareHistory.select()
.where(
MalwareHistory.path == path,
MalwareHistory.event == FAILED_TO_RESTORE_FROM_BACKUP,
MalwareHistory.ctime >= file_ctime,
)
.first()
is None
):
to_restore.append(hit)
else:
not_restore.append(hit)
return to_restore, not_restore
@classmethod
async def _restore_from_backup(
cls, hits, file_owner, sink=None, **_
) -> List[Tuple[MalwareHit, MalwareEvent]]:
paths = [h.orig_file for h in hits]
tmp_dir = cls._get_tmp_dir(file_owner)
restored, failed = await restore_files(
files=paths,
until=choose_use_backups_start_from_date(file_owner),
sink=sink,
tmp_dir=tmp_dir,
)
res = []
restored_hits = [h for h in hits if h.orig_file in restored]
failed_hits = [h for h in hits if h.orig_file in failed]
for p in restored:
safe_path = safe_sequence.path(p)
logger.info("File %s was restored from backup", safe_path)
title = RESTORED_FROM_BACKUP
res.extend([(rh, MalwareEvent(title)) for rh in restored_hits])
for p in failed:
safe_path = safe_sequence.path(p)
logger.warning("File %s wasn't restored from backup", safe_path)
title = FAILED_TO_RESTORE_FROM_BACKUP
res.extend([(fh, MalwareEvent(title)) for fh in failed_hits])
return res
@classmethod
def _get_tmp_dir(cls, file_owner):
hp = hosting_panel.HostingPanel()
try:
user = pwd.getpwnam(file_owner)
except (KeyError, TypeError):
return Core.TMPDIR
try:
tmp_dir = str(hp.base_home_dir(user.pw_dir))
except (RuntimeError, FileNotFoundError):
return Core.TMPDIR
return tmp_dir
@classmethod
async def apply_default_action(
cls,
hits: Collection[HitInfoType],
initiator=None,
cause=None,
sink=None,
resource_type=None,
**kwargs,
) -> List[Tuple[HitInfoType, MalwareEvent, str, bool]]:
"""Perform action with malware which user set in the config"""
to_restore = [
hit
for hit in hits
if should_try_autorestore_malicious(hit.user)
# restore from backup does not apply to db scans
and not isinstance(hit, MalwareDatabaseHitInfo)
]
restore_events = await cls.restore_from_backup(
to_restore, initiator=initiator, sink=sink, cause=cause, **kwargs
)
# FIXME: remove this mapping
# when we start to store UID instead of username in the db
panel_users = set(await hosting_panel.HostingPanel().get_users())
uid_to_name = {
pw.pw_uid: pw.pw_name
for pw in pwd.getpwall()
if pw.pw_name in panel_users
}
res = []
for hit in hits:
if isinstance(hit, MalwareDatabaseHitInfo):
owner = uid_to_name.get(hit.owner, str(hit.owner))
user = uid_to_name.get(hit.user, str(hit.user))
path = cast(MalwareDatabaseHitInfo, hit).path
else:
owner = hit.owner
user = hit.user
path = cast(MalwareHitAlternate, hit).orig_file
action, config_owner = choose_action_for_malicious(user)
if hit in restore_events and restore_events[hit].successful:
res.append((hit, restore_events[hit], action, True))
continue
handler_kw_args = kwargs.copy()
if isinstance(hit, MalwareDatabaseHitInfo):
handler_kw_args["db_name"] = hit.db_name
handler_kw_args["db_host"] = hit.db_host
handler_kw_args["db_port"] = hit.db_port
handler_kw_args["table_name"] = hit.table_name
handler_kw_args["table_field"] = hit.table_field
handler_kw_args["table_row_inf"] = hit.table_row_inf
handler_kw_args["scan_id"] = hit.scan_id
handler = cls._get_handler(action)
event = await handler(
path=path,
file_owner=owner,
file_user=user,
cause=cause,
initiator=initiator or config_owner,
sink=sink,
app_name=hit.app_name,
resource_type=resource_type,
**handler_kw_args,
)
res.append((hit, event, action, False))
return res