"""
This program is free software: you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License,
or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see .
Copyright © 2019 Cloud Linux Software Inc.
This software is also available under ImunifyAV commercial license,
see
"""
from typing import List, Optional, Set
from defence360agent.contracts.messages import (
Accumulatable,
Ack,
FilesUpdated,
Lockable,
Message,
Noop,
Received,
Reportable,
ShortenReprListMixin,
Splittable,
)
from imav.malwarelib.config import MalwareScanType, QueuedScanState
from imav.malwarelib.scan.mds.report import (
MalwareDatabaseHitInfo,
MalwareDatabaseScanReport,
)
class MalwareScanTask(Message):
"""
Creates task to scan file for malware
"""
DEFAULT_METHOD = "MALWARE_SCAN_TASK"
class MalwareScanComplete(Message):
DEFAULT_METHOD = "MALWARE_SCAN_COMPLETE"
class MalwareCleanComplete(Message):
DEFAULT_METHOD = "MALWARE_CLEAN_COMPLETE"
class MalwareRestoreComplete(Message):
DEFAULT_METHOD = "MALWARE_RESTORE_COMPLETE"
class MalwareScanSummary(ShortenReprListMixin, Message, Reportable):
DEFAULT_METHOD = "MALWARE_SCAN_SUMMARY"
def __init__(self, *args, items, **kwargs):
# keep the summary only without results
items = [{"summary": item["summary"]} for item in items]
super().__init__(*args, items=items, **kwargs)
class MalwareScanResult(ShortenReprListMixin, Message, Reportable, Splittable):
DEFAULT_METHOD = "MALWARE_SCAN_RESULT"
LIST_SIZE = 5
BATCH_SIZE = 1000
BATCH_FIELD = "results"
def __init__(self, *args, items, **kwargs):
items = [
{
# keep summary fields necessary for storing results only
"summary": {
key: value
for key, value in item["summary"].items()
if key in {"scanid", "type", "args"}
},
# keep results
"results": item["results"],
}
for item in items
]
super().__init__(*args, items=items, **kwargs)
class MalwareScan(Accumulatable, Lockable):
"""
Represents results of single scan
"""
DEFAULT_METHOD = "MALWARE_SCAN"
LIST_CLASS = (MalwareScanSummary, MalwareScanResult)
def do_accumulate(self) -> bool:
results = self.get("results")
if self.get("summary", {}).get("type") == MalwareScanType.BACKGROUND:
return results is not None
return bool(results)
class MalwareMRSUpload(Message):
"""
Used to isolate a possibly long running uploading to MRS from the
other MalwareScan handlers.
"""
DEFAULT_METHOD = "MALWARE_MRS_UPLOAD"
class MalwareScanQueue(Message):
DEFAULT_METHOD = "MALWARE_SCAN_QUEUE"
class MalwareScanQueuePut(MalwareScanQueue):
def __init__(self, *, paths, scan_args):
super().__init__(paths=paths, scan_args=scan_args)
class MalwareScanQueueRemove(MalwareScanQueue):
def __init__(self, *, scan_ids):
super().__init__(scan_ids=scan_ids)
class MalwareScanQueueRecheck(MalwareScanQueue):
def __init__(self):
super().__init__()
class MalwareScanQueueStopBackground(MalwareScanQueue):
def __init__(self):
super().__init__()
class MalwareScanQueueUpdateStatus(MalwareScanQueue):
def __init__(
self, *, scan_ids: List[str] = None, status: QueuedScanState = None
):
super().__init__(scan_ids=scan_ids, status=status)
class MalwareResponse(Message, Received):
TYPES = BLACK, WHITE, KNOWN, UNKNOWN = "BLACK", "WHITE", "KNOWN", "UNKNOWN"
DEFAULT_METHOD = "MALWARE_RESPONSE"
class MalwareSendFiles(Message, Received):
DEFAULT_METHOD = "MALWARE_SEND_FILES"
class MalwareRescanFiles(Message, Received):
DEFAULT_METHOD = "MALWARE_RESCAN_FILES"
class MalwareCleanupTask(Message):
"""
Creates task to cleanup files
"""
DEFAULT_METHOD = "MALWARE_CLEANUP_TASK"
def __init__(
self,
*,
hits,
standard_only=None,
cause=None,
initiator=None,
scan_id=None,
post_action=None,
):
super().__init__(
hits=hits,
standard_only=standard_only,
cause=cause,
initiator=initiator,
scan_id=scan_id,
post_action=post_action,
)
class MalwareCleanupList(ShortenReprListMixin, Message, Reportable):
DEFAULT_METHOD = "MALWARE_CLEANUP_LIST"
class MalwareCleanup(Accumulatable):
"""
Represents results of single cleanup
"""
DEFAULT_METHOD = "MALWARE_CLEANUP"
LIST_CLASS = MalwareCleanupList
class MalwareIgnorePathUpdated(Message):
"""Signal through a message bus that MalwareIgnorePath has been updated."""
pass
class MalwareDatabaseScan(Message):
"""
Represents results of a single MDS scan
"""
args: Optional[List[str]]
path: str
scan_id: str
type: str
error: Optional[str]
started: int
completed: int
total_resources: int
total_malicious: int
hits: Set[MalwareDatabaseHitInfo]
initiator: Optional[str]
def __init__(
self,
*,
args: Optional[List[str]],
path: Optional[str],
scan_id: str,
type: str,
started: int = 0,
completed: int = 0,
total_resources: int = 0,
total_malicious: int = 0,
hits: Set[MalwareDatabaseHitInfo] = None,
error: Optional[str] = None,
initiator: Optional[str] = None,
):
super().__init__(
args=args,
path=path,
scan_id=scan_id,
type=type,
started=started,
completed=completed,
total_resources=total_resources,
total_malicious=total_malicious,
hits=hits,
error=error,
initiator=initiator,
)
def update_with_report(self, report: MalwareDatabaseScanReport):
self.update(
started=report.started,
completed=report.completed,
total_resources=report.total_resources,
total_malicious=report.total_malicious,
hits=report.hits,
)
def update_with_error(self, message):
return self.update(error=message)
def __setitem__(self, key, value):
raise NotImplementedError
def __delitem__(self, key):
raise NotImplementedError
class MalwareDatabaseRestore(Message):
"""
Represents results of a single MDS restore
"""
class MalwareDatabaseRestoreTask(Message):
"""
Represents a single MDS restore task
"""
path: str
app_name: str
def __init__(self, path: str, app_name: str):
super().__init__(path=path, app_name=app_name)
class MalwareDatabaseCleanup(Message):
"""
Represents results of a single mds cleanup
"""
succeeded: Set[MalwareDatabaseHitInfo]
failed: Set[MalwareDatabaseHitInfo]
def __init__(
self,
succeeded: Set[MalwareDatabaseHitInfo],
failed: Set[MalwareDatabaseHitInfo],
):
super().__init__(
succeeded=succeeded,
failed=failed,
)
class MalwareDatabaseCleanupFailed(Message):
"""
Signifies an MDS cleanup failure
"""
error: str
def __init__(self, *, error: str):
super().__init__(error=error)
class MalwareDatabaseRestoreFailed(Message):
"""
Signifies an MDS restore failure
"""
error: str
def __init__(self, *, error: str):
super().__init__(error=error)
class MalwareCleanupRevertList(ShortenReprListMixin, Message, Reportable):
DEFAULT_METHOD = "MALWARE_CLEANUP_REVERT_LIST"
class MalwareCleanupRevert(Accumulatable):
LIST_CLASS = MalwareCleanupRevertList
class CheckDetachedScans(Message):
DEFAULT_METHOD = "MALWARE_CHECK_DETACHED_SCANS"
MSGS_WITHOUT_IP = [
msg.DEFAULT_METHOD
for msg in (
Ack,
CheckDetachedScans,
MalwareScan,
MalwareScanSummary,
MalwareScanResult,
MalwareScanComplete,
MalwareCleanComplete,
MalwareRestoreComplete,
MalwareSendFiles,
Noop,
FilesUpdated,
)
]