import asyncio import logging import os from abc import ABC, abstractmethod from collections import defaultdict from heapq import heappop, heappush from typing import Dict from defence360agent.contracts.config import ConfigFile, Core from defence360agent.contracts.messages import MessageType from defence360agent.contracts.plugins import BaseMessageProcessor, expect logger = logging.getLogger() class EventProcessorBase(BaseMessageProcessor, ABC): def __init__(self, loop): # note: empty list is a heap (no need for heapify here) self._msg_buf = defaultdict(list) self._loop = loop def add_message(self, message): heappush( self._msg_buf[message["username"]], (message["timestamp"], message) ) async def process_messages(self): await asyncio.gather( *( self.process_user_messages(user_messages) for user_messages in self._msg_buf.values() ) ) @expect(MessageType.cPanelEvent) async def process_event(self, message): if not self._message_is_relatable(message): # pragma: no cover return if message.hook == "Modify": await self._process_modify(message) elif message.hook == "Create": await self._process_create(message) elif message.hook == "change_package": await self._process_change_package(message) elif message.hook == "Remove": await self._process_account_removed(message) async def process_user_messages(self, messages): for _ in range(len(messages)): await self.process_message(heappop(messages)[1]) @abstractmethod async def _process_modify(self, message): """Modify hook""" @abstractmethod async def _process_create(self, message): """Create hook""" @abstractmethod async def _process_change_package(self, message): """change_package hook""" @abstractmethod async def _process_account_removed(self, message): """Remove hook""" @abstractmethod def _message_is_relatable(self, message): """Whether the message should be processed""" @abstractmethod async def is_enabled(self): """Whether messages should be processed""" class SettingsChangeBase(EventProcessorBase, ABC): """Process hook event messages from cPanel""" async def _process_modify(self, message): package_field = "plan" if "plan" in message.data else "exclude" await self._get_settings_and_update(message, package_field) async def _process_create(self, message): await self._get_settings_and_update(message, "plan", True) async def _process_change_package(self, message): await self._get_settings_and_update(message, "new_pkg", True) async def _process_account_removed(self, message): pass async def _get_settings_and_update( self, message, package_field: str, add_to_package: bool = False, ) -> None: logger.info("Get settings from %s", message) settings = await self._get_settings_from_message(message) await self._apply_settings( message, package_field, add_to_package, settings ) async def _apply_settings( self, message, package_field, add_to_package, settings ): logger.info("Step 1 %s ", settings) # Do nothing if there are no values for Imunify360 features # in the message for Modify hook if ( message.get("plan") is None and message["hook"] == "Modify" and all(value is None for value in settings.values()) ): return if not all(settings.values()): try: package_name = message.data[package_field] except KeyError: logger.warning("No information about package in message") fallback_settings = self._default_settings() else: fallback_settings = await self._get_package_settings( package_name, add_to_package ) for feature, value in settings.items(): if value is None: settings[feature] = fallback_settings[feature] logger.info( "Settings specified in hook message %s for %s", settings, message["username"], ) for feature, value in settings.items(): await self.on_settings_change(message["username"], feature, value) @abstractmethod def _message_is_relatable(self, message): """Whether the message should be processed""" @abstractmethod async def on_settings_change(self, user, feature, value): """What to do after settings were changed (e.g. sync the DB)""" @staticmethod @abstractmethod def _default_settings() -> Dict[str, str]: """Get default package settings""" @abstractmethod async def _get_settings_from_message(self, message): """Retrieve settings from the message""" @classmethod @abstractmethod async def _get_package_settings( cls, package_name: str, add_to_package: bool ) -> Dict[str, str]: """Get current package settings""" @abstractmethod async def is_enabled(self): """Whether messages should be processed""" class UserConfigProcessor(EventProcessorBase): def _message_is_relatable(self, message): return True async def is_enabled(self): return True async def _process_account_removed(self, message): user = message.get("user") or message.get("username") if user: try: os.unlink(ConfigFile(user).path) except FileNotFoundError: pass except OSError: # pragma: no cover logger.warning( "Cannot delete Imunify360 config file for user %s", user ) async def _process_modify(self, message): if old_username := message.data.get("old_username"): # User renamed os.rename( os.path.join(Core.USER_CONFDIR, old_username), os.path.join(Core.USER_CONFDIR, message.username), ) async def _process_create(self, message): """Create hook""" async def _process_change_package(self, message): """change_package hook"""