import pwd from abc import ABC, abstractmethod from pathlib import Path from typing import Dict, List, Optional, Set from imav.contracts.config import MalwareTune from defence360agent.utils import get_external_ip TCP_PORTS_COMMON = [ "20", "21", "22", "25", "53", "80", "110", "443", "587", "993", "995", ] class PanelException(Exception): pass class InvalidTokenException(Exception): pass class AbstractPanel(ABC): """Abstract class that provides only basic hosting panel integration functionality.""" NAME = "MINIMAL" OPEN_PORTS = { "tcp": { "in": ["465"] + TCP_PORTS_COMMON, "out": ["113"] + TCP_PORTS_COMMON, }, "udp": { "in": ["20", "21", "53", "443"], "out": ["20", "21", "53", "113", "123"], }, } exception = PanelException smtp_allow_users = [] # type: List[str] @classmethod @abstractmethod def is_installed(cls): """ Checks if hosting panel installed on the known path :return: bool: """ pass @classmethod def get_server_ip(cls): """ Stub with external IP as currently only implementation for cPanel needed """ return get_external_ip() @classmethod async def version(cls): return None @abstractmethod async def enable_imunify360_plugin(self, name=None): """ Registers and enables Imunify360 UI plugin in hosting panel """ pass @abstractmethod async def disable_imunify360_plugin(self, name=None): """ UnRegisters Imunify360 UI plugin in hosting panel """ pass @abstractmethod async def get_user_domains(self): """ Returns domains hosted via control panel :return: list """ pass @abstractmethod async def get_users(self) -> List[str]: """ Returns system users from hosting panel :return: list """ pass @abstractmethod async def get_domain_to_owner(self) -> Dict[str, List[str]]: """ Returns dict with domain to list of users pairs """ pass @abstractmethod async def get_domains_per_user(self) -> Dict[str, List[str]]: """ Returns dict with user to list of domains pairs """ pass async def get_user_details(self) -> Dict[str, Dict[str, str]]: """ Returns dict with user to email pairs """ return { user: {"email": "", "locale": ""} for user in await self.get_users() } async def users_count(self) -> int: return len(list(await self.get_users())) def authenticate(self, protocol, data: dict): """ Performs actions to distinguish endusers from admins :param protocol: _RpcServerProtocol :param data: parsed params :returns (user_type, user_name) """ name = None if protocol._uid != 0: # we can get here if a non-root web panel user visits i360 UI # To emulate it: # su -s /bin/bash -c # $'echo \'{"command":["config", "show"],"params":{}}\' # | nc -U -w1 \ # /var/run/defence360agent/non_root_simple_rpc.sock' # fakeuser pw = pwd.getpwuid(protocol._uid) name = pw.pw_name return protocol.user, name @classmethod def get_modsec_config_path(cls): raise NotImplementedError def get_SMTP_conflict_status(self) -> bool: """ Return Conflict status """ return False @abstractmethod def basedirs(self) -> Set[str]: pass @classmethod def base_home_dir(cls, home_dir: str) -> Path: base_dir = Path(home_dir).resolve().parent return base_dir @classmethod def get_rapid_scan_db_dir(cls, home_dir: str) -> Optional[str]: try: base_dir = cls.base_home_dir(home_dir) resolved_home = Path(home_dir).resolve() tail = resolved_home.relative_to(base_dir) # Symbolic link loop could cause runtime error except (ValueError, RuntimeError): return None if rapid_scan_basedir_override := getattr( MalwareTune, "RAPID_SCAN_BASEDIR_OVERRIDE", None ): base_dir = rapid_scan_basedir_override return str(base_dir / ".rapid-scan-db" / tail) @classmethod async def retrieve_key(cls) -> str: """ Returns registration key from panel, if possible, raise PanelException if not successful (or wrong panel key provided), or NoImplemented if method not supported by the panel. """ raise NotImplementedError @classmethod async def notify(cls, *, message_type, params, user=None): """ Notify a customer using the panel internal tooling """ return None @abstractmethod async def list_docroots(self) -> Dict[str, str]: """ :return dict with docroot to domain """ pass def ensure_valid_panel(**dec_kwargs): """ Run function only if hosting panel is installed, elsewhere raise PanelException This method is intended to be used as a decorator on AbstractPanel instance methods. :raise PanelException: :param dec_kwargs: kwargs passed to is_installed function :return: """ def real_decorator(fn): """ :param fn: coroutine """ async def wrapper(self, *args, **kwargs): if not self.is_installed(**dec_kwargs): raise self.exception( "%s is not valid!" % self.__class__.__name__ ) return await fn(self, *args, **kwargs) return wrapper return real_decorator class ModsecVendorsError(Exception): """ Raises when its impossible to get modsec vendor """ pass