"""
This program is free software: you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License,
or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see .
Copyright © 2019 Cloud Linux Software Inc.
This software is also available under ImunifyAV commercial license,
see
"""
import asyncio
import functools
import logging
import os
import warnings
from collections import namedtuple
from functools import partial
from typing import List, Sequence
from defence360agent.contracts.config import Malware, UserType
from defence360agent.contracts.license import LicenseError
from defence360agent.contracts.messages import MessageType
from defence360agent.contracts.permissions import (
MS_IGNORE_LIST_EDIT,
check_permission,
)
from defence360agent.feature_management.constants import AV, AV_REPORT, FULL
from defence360agent.feature_management.lookup import feature
from defence360agent.model.instance import db
from defence360agent.model.simplification import run_in_executor
from defence360agent.rpc_tools.lookup import (
CommonEndpoints,
RootEndpoints,
bind,
)
from defence360agent.rpc_tools.utils import run_in_executor_decorator
from defence360agent.rpc_tools.validate import ValidationError
from defence360agent.utils import (
Scope,
does_path_belong_to_user,
get_path_owner,
get_results_iterable_expression,
is_cloudways,
safe_fileops,
)
from imav.malwarelib.config import (
MalwareHitStatus,
MalwareScanResourceType,
MalwareScanType,
)
from imav.malwarelib.model import (
MalwareHistory,
MalwareHit,
MalwareIgnorePath,
)
from imav.malwarelib.rpc.endpoints.ondemand import split_args
from imav.malwarelib.scan.crontab import get_crontab
from imav.malwarelib.scan.queue_supervisor_sync import (
QueueSupervisorSync as ScanQueue,
)
from imav.malwarelib.subsys.malware import MalwareAction
from imav.malwarelib.utils import user_list
from imav.malwarelib.utils.endpoints import MaliciousEndpointStatus
from imav.malwarelib.utils.submit import (
FALSE_NEGATIVE,
FALSE_POSITIVE,
submit_malware,
)
logger = logging.getLogger(__name__)
IgnoreParameters = namedtuple(
"IgnoreParameters",
["path", "app_name", "db_host", "db_port", "db_name"],
defaults=(None, None, None, None, None),
)
class SubmitEndpoints(RootEndpoints):
SCOPE = Scope.AV
_SEND_FILES_DISABLED_BANNER = """\
Warning: This server’s security can be enhanced \
by enabling the MALWARE_SCANNING.sends_file_for_analysis option. \
This may minimize the number of undetected malware, \
making your system more resistant to new threats.
The command below can be used to enable the option:
imunify-antivirus config update \
'{"MALWARE_SCANNING": {"sends_file_for_analysis": true}}'
- or -
imunify360-agent config update \
'{"MALWARE_SCANNING": {"sends_file_for_analysis": true}}'
"""
@bind("submit", "false-positive")
async def submit_fp(self, filename, reason, scanner=None):
# WARNING: scanner parameter is deprecated
try:
await submit_malware(filename, FALSE_POSITIVE, reason=reason)
except LicenseError as e:
raise ValidationError(e)
except FileNotFoundError:
raise ValidationError("File {} doesn't exist.".format(filename))
@bind("submit", "false-negative")
async def submit_fn(self, filename):
try:
await submit_malware(filename, FALSE_NEGATIVE)
except LicenseError as e:
raise ValidationError(e)
except FileNotFoundError:
raise ValidationError("File {} doesn't exist.".format(filename))
else:
if not Malware.SEND_FILES:
return warnings.warn(self._SEND_FILES_DISABLED_BANNER)
class MaliciousEndpoints(CommonEndpoints):
SCOPE = Scope.AV
def __init__(self, sink):
super().__init__(sink)
self.queue = ScanQueue(sink=sink)
@feature(AV, [FULL, AV_REPORT])
@bind("malware", "malicious", "list")
@run_in_executor_decorator
def malicious_list(self, user=None, **kwargs):
return MalwareHit.malicious_list(user=user, **kwargs)
@classmethod
@feature(AV, [FULL, AV_REPORT])
@bind("malware", "read")
async def read_file(cls, path, offset, limit, user=None, **_):
mode = "rb"
if not os.path.exists(path):
raise FileNotFoundError("notifications.fileNotFound")
if user:
open_fun = functools.partial(
safe_fileops.safe_open_file,
path,
mode,
user,
respect_homedir=False,
)
else:
open_fun = functools.partial(open, path, mode)
try:
with open_fun() as f:
f.seek(offset)
chunk = f.read(limit)
eof = False if chunk else True
text = chunk.decode("utf-8", errors="ignore")
return {
"data": {
"chunk": text,
"eof": eof,
"limit": limit,
"offset": offset,
"size": os.fstat(f.fileno()).st_size,
},
}
except asyncio.CancelledError:
raise
except Exception as e:
raise PermissionError("notifications.permissionError") from e
@staticmethod
@feature(AV, [FULL, AV_REPORT])
@bind("malware", "malicious", "remove-from-list")
async def malicious_remove_from_list(ids, user=None):
hits_to_remove = MalwareHit.malicious_select(ids, user=user)
MalwareHit.delete_instances(hits_to_remove)
return MaliciousEndpointStatus(hits_to_remove, [])
@feature(AV, [FULL, AV_REPORT])
@bind("malware", "malicious", "move-to-ignore")
async def malicious_move_to_ignore(self, ids, user=None):
ignored = await self._malicious_move_to_ignore(ids, user)
return len(ignored)
async def _malicious_move_to_ignore(self, ids, user=None):
check_permission(MS_IGNORE_LIST_EDIT, user)
hits = await run_in_executor(
asyncio.get_event_loop(),
partial(MalwareHit.malicious_select, ids, user=user),
)
# flush found
malicious_found = (
h for h in hits if h.status == MalwareHitStatus.FOUND
)
await run_in_executor(
asyncio.get_event_loop(),
partial(MalwareHit.delete_instances, malicious_found),
)
file_hits = [
hit
for hit in hits
if hit.resource_type == MalwareScanResourceType.FILE.value
]
file_items = [
IgnoreParameters(path=hit.orig_file) for hit in file_hits
]
db_items = [
IgnoreParameters(
hit.orig_file,
hit.app_name,
hit.db_host,
hit.db_port,
hit.db_name,
)
for hit in hits
if hit.resource_type == MalwareScanResourceType.DB.value
]
ignored = await IgnoreEndpoints(self._sink).try_add_to_ignore(
file_items,
resource_type=MalwareScanResourceType.FILE.value,
) + await IgnoreEndpoints(self._sink).try_add_to_ignore(
db_items,
resource_type=MalwareScanResourceType.DB.value,
)
if Malware.SEND_FILES:
for hit in file_hits:
await MalwareAction.submit_for_analysis(
type=FALSE_POSITIVE,
reason=hit.type,
path=hit.orig_file,
file_owner=hit.owner,
file_user=hit.user,
initiator=user,
)
return ignored
@feature(AV, [FULL, AV_REPORT])
@bind("malware", "history", "list")
@run_in_executor_decorator
def get_history(self, user=None, **kwargs):
return MalwareHistory.get_history(user=user, **kwargs)
@bind("malware", "user", "list")
async def user_list(
self, offset, limit, search=None, order_by=None, user=None, ids=None
):
if user:
# user endpoint
_, users = await user_list.fetch_user_list(
self.queue.get_scans_from_paths, match={user}
)
max_count = len(users)
elif search:
# search
_, users = await user_list.fetch_user_list(
self.queue.get_scans_from_paths, match=search
)
max_count = len(users)
elif ids:
# filter by ids
max_count, users = await user_list.fetch_user_list(
self.queue.get_scans_from_paths, match=ids
)
else:
# all users
max_count, users = await user_list.fetch_user_list(
self.queue.get_scans_from_paths
)
# sort
users = user_list.sort(users)
for order in reversed(order_by or []):
users = user_list.sort(users, order.column_name, desc=order.desc)
# limit and offset
start = offset
end = offset + limit
return max_count, users[start:end]
@bind("malware", "user", "scan")
async def user_scan(
self, scan_file, scan_db, background=False, **scan_args
):
if not scan_file and not scan_db:
raise ValidationError(
"Either --scan-file or --scan-db should be specified"
)
if background and self.queue.status().get("background"):
raise ValidationError("Background scan pending")
if background:
scan_type = MalwareScanType.BACKGROUND
else:
scan_type = MalwareScanType.ON_DEMAND
users = await user_list.panel_users()
paths = [user["home"] for user in users]
if scan_db:
await self.queue.put(
paths=paths,
resource_type=MalwareScanResourceType.DB,
scan_type=scan_type,
**split_args(scan_args)
)
if scan_file:
await self.queue.put(
paths=paths,
resource_type=MalwareScanResourceType.FILE,
scan_type=scan_type,
**split_args(scan_args)
)
if background and Malware.CRONTABS_SCAN_ENABLED:
crontab_paths = [get_crontab(user["user"]) for user in users]
await self.queue.put(
paths=crontab_paths,
resource_type=MalwareScanResourceType.FILE,
scan_type=scan_type,
**split_args(scan_args)
)
@bind("malware", "suspicious", "list")
@run_in_executor_decorator
def suspicious_list(self, user=None, **kwargs):
return MalwareHit.suspicious_list(user=user, **kwargs)
@bind("malware", "suspicious", "move-to-ignore")
async def suspicious_move_to_ignore(self, ids):
total = 0
def expression(ids):
return MalwareHit.select().where(MalwareHit.id.in_(ids))
hits = await run_in_executor(
asyncio.get_event_loop(),
lambda: list(get_results_iterable_expression(expression, ids)),
)
for hit in hits:
MalwareIgnorePath.get_or_create(
path=hit.orig_file,
resource_type=MalwareScanResourceType.FILE.value,
)
hit.delete_instance()
total += 1
return total
def get_file_ownership(path, user) -> tuple[str, str]:
if is_cloudways():
# return a file owner and a user who keeps a file for Cloudways
owner = get_path_owner(path)
user = user or UserType.ROOT
else:
# use the same value for owner and user in other cases
owner = user = user or get_path_owner(path)
return owner, user
@feature(AV, [FULL, AV_REPORT])
class IgnoreEndpoints(CommonEndpoints):
@bind("malware", "ignore", "list")
@run_in_executor_decorator
def ignore_list(self, user=None, **kwargs):
check_permission(MS_IGNORE_LIST_EDIT, user)
if user is not None:
kwargs["user"] = user
return MalwareIgnorePath.paths_count_and_list(**kwargs)
@bind("malware", "ignore", "delete-ui")
async def ignore_delete_ui(self, ids, user=None):
return await self.ignore_delete(
ids=ids,
user=user,
skip_rescan=False,
)
@bind("malware", "ignore", "delete")
async def ignore_delete(self, ids, user=None, skip_rescan=False):
check_permission(MS_IGNORE_LIST_EDIT, user)
ignore_paths: List[MalwareIgnorePath] = list(
MalwareIgnorePath.select().where(MalwareIgnorePath.id.in_(ids))
)
if user is not None:
user_crontab_path = get_crontab(user)
ignore_paths = [
ignore_path
for ignore_path in ignore_paths
if does_path_belong_to_user(ignore_path.path, user)
or ignore_path.path == user_crontab_path
]
paths = [
ignore_path.path
for ignore_path in ignore_paths
if ignore_path.resource_type == MalwareScanResourceType.FILE.value
]
with db.atomic():
for ignore_path in ignore_paths:
file_owner, file_user = get_file_ownership(
ignore_path.path, user
)
MalwareAction.delete_from_ignore_sync(
path=ignore_path.path,
file_owner=file_owner,
file_user=file_user,
initiator=user or UserType.ROOT,
resource_type=ignore_path.resource_type,
)
if paths:
if not skip_rescan:
await self._sink.process_message(
# FIXME: this spawns a scan with the `realtime` type
MessageType.MalwareScanTask(filelist=paths)
)
await self._sink.process_message(
MessageType.MalwareIgnorePathUpdated()
)
return len(paths)
@bind("malware", "ignore", "add")
async def ignore_add(self, resource_type, paths, user=None):
items = [IgnoreParameters(path) for path in paths]
added = await self.try_add_to_ignore(items, resource_type, user)
return len(added)
async def try_add_to_ignore(
self,
items: Sequence[IgnoreParameters],
resource_type: str,
user: str = None,
) -> List[str]:
check_permission(MS_IGNORE_LIST_EDIT, user)
added = []
items = [item for item in items if os.path.isabs(item.path)]
if user is not None:
user_crontab_path = get_crontab(user)
items = (
i
for i in items
if does_path_belong_to_user(i.path, user)
or i.path == user_crontab_path
)
already_ignored = MalwareIgnorePath.path_list(
resource_type=resource_type
)
items = (items for items in items if items.path not in already_ignored)
for item in items:
file_owner, file_user = get_file_ownership(item.path, user)
result = await MalwareAction.ignore(
path=item.path,
resource_type=resource_type,
file_owner=file_owner,
file_user=file_user,
initiator=user or UserType.ROOT,
app_name=item.app_name,
db_host=item.db_host,
db_port=item.db_port,
db_name=item.db_name,
)
if result.successful:
added.append(item.path)
if added:
await self._sink.process_message(
MessageType.MalwareIgnorePathUpdated()
)
return added
async def is_path_ignored(self, check_path, user=None):
assert os.path.isabs(check_path)
if user is not None and not does_path_belong_to_user(check_path, user):
return False
return await MalwareIgnorePath.is_path_ignored(check_path)
class MalwareRebuildPatterns(RootEndpoints):
@bind("malware", "rebuild", "patterns")
async def rebuild_patterns(self):
await self._sink.process_message(
MessageType.MalwareIgnorePathUpdated()
)
return {}
class MalwareRescanEdnpoints(RootEndpoints):
@bind("malware", "rescan")
async def malware_rescan_files(self, files: List[str]):
await self._sink.process_message(
MessageType.MalwareRescanFiles(files=files)
)
return {}
class MalwareSendFiles(RootEndpoints):
@bind("malware", "send", "files")
async def malware_send_files(self, reason: str, files: List[str]):
await self._sink.process_message(
MessageType.MalwareSendFiles(reason=reason, files=files)
)
return {}