import asyncio import json import logging import os import socket import urllib.error import urllib.parse import urllib.request from collections import defaultdict from functools import lru_cache from pathlib import Path from urllib.parse import parse_qsl, urlencode, urljoin, urlparse, urlunparse import psutil from defence360agent.contracts.config import ANTIVIRUS_MODE from defence360agent.contracts.license import LicenseCLN from defence360agent.utils import CheckRunError, async_lru_cache, check_run from defence360agent.utils.common import get_hostname _TIMEOUT = 300 # timeout for network operations _IMUNIFY_EMAIL_CONFIG_EXECUTABLE = Path("/usr/sbin/ie-config") logger = logging.getLogger(__name__) IE_SUPPORTED_CMD = ( "wget -qq -O -" " https://repo.imunify360.cloudlinux.com/defence360/imunifyemail-deploy.sh" " | bash -s 'is-supported'" ) @async_lru_cache(maxsize=1) async def is_imunify_email_supported() -> bool: try: await check_run(IE_SUPPORTED_CMD, shell=True) except CheckRunError as e: if e.returncode != 100: logger.error(f"imunify-email check failed {str(e)}") return False return True async def get_imunify_email_status(): """Try to get imunify-email status""" if ANTIVIRUS_MODE: return False if not _IMUNIFY_EMAIL_CONFIG_EXECUTABLE.exists(): return False try: output = await check_run( [str(_IMUNIFY_EMAIL_CONFIG_EXECUTABLE), "status"] ) except CheckRunError: return False return "spamfilter exim configuration: enabled" in output.decode() class CLNError(Exception): def __init__(self, status=None, message=None): self.message = message self.status = status def __str__(self): if self.message: return self.message return "Unexpected status code from CLN: {}".format(self.status) class InvalidLicenseError(Exception): pass class BackupNotFound(CLNError): GB = 1024 * 1024 * 1024 def __init__(self, url): self.url = url def __str__(self): return "Backup not found in CLN" def add_used_space(self): if self.url is None: return pu = urlparse(self.url) query = dict(parse_qsl(pu.query)) query["used_space"] = self._disk_usage() return urlunparse( ( pu.scheme, pu.netloc, pu.path, pu.params, urlencode(query), pu.fragment, ) ) def _disk_usage(self): total_used = 0 partitions = psutil.disk_partitions() processed = set() for p in partitions: if ( (p.device not in processed) and ("noauto" not in p.opts) and (not p.device.startswith("/dev/loop")) ): total_used += psutil.disk_usage(p.mountpoint).used processed.add(p.device) return round(total_used / self.GB) def _post_request(url, data=None, headers=None, timeout=None): """To be used by RestCLN._request().""" kwargs = {} if headers is not None: kwargs["headers"] = headers if data is not None: if isinstance(data, bytes): kwargs.setdefault( "headers", {"Content-type": "application/octet-stream"} ) elif isinstance(data, str): data = data.encode("utf-8") kwargs.setdefault( "headers", {"Content-type": "text/plain; charset=utf-8"} ) else: # dict data = urllib.parse.urlencode(data).encode("ascii") kwargs.setdefault( "headers", {"Content-type": "application/x-www-form-urlencoded"}, ) kwargs["data"] = data try: resp = urllib.request.urlopen( urllib.request.Request(url, **kwargs), timeout=timeout ) except socket.timeout: raise TimeoutError("Timed out receiving response") except OSError as e: if hasattr(e, "code"): # HTTPError if e.code < 400: raise CLNError(e.code) from e # e.code >= 400 message = None if e.fp is not None: logger.warning( "CLN.post(url=%r, data=%r, headers=%r): %d %s", url, data, headers, e.code, e.reason, ) try: resp_data = e.read() except socket.timeout: raise TimeoutError("Timed out reading error message") # the response may be non-json message = resp_data.decode(errors="replace") raise CLNError(message=message, status=e.code) from e else: logger.warning( "CLN.post(url=%r, data=%r, headers=%r, timeout=%r): %s", url, data, headers, timeout, e, ) raise else: with resp: if resp.code == 204: return resp.code, None elif resp.code in (200, 244): # 244 - /im/ab/check returns link for backup buy page try: content = resp.read() except socket.timeout: raise TimeoutError("Timed out reading response") else: try: return resp.code, json.loads(content.decode()) except json.JSONDecodeError as e: raise CLNError( message=( f"Non-json data from CLN: {content} for" f" code={resp.code}" ), status=resp.code, ) from e else: raise CLNError(resp.code) class RestCLN: _BASE_URL = os.environ.get( "IM360_CLN_API_BASE_URL", "https://cln.cloudlinux.com/api/im/" ) _REGISTER_URL = urljoin(_BASE_URL, "register") _UNREGISTER_URL = urljoin(_BASE_URL, "unregister") _CHECKIN_URL = urljoin(_BASE_URL, "checkin") _ACRONIS_CREDENTIALS_URL = urljoin(_BASE_URL, "ab/credentials") _ACRONIS_REMOVE_URL = urljoin(_BASE_URL, "ab/remove") _ACRONIS_CHECK_URL = urljoin(_BASE_URL, "ab/check") STATUS_OK_PAID_LICENSE = "ok" STATUS_OK_TRIAL_LICENSE = "ok-trial" @classmethod async def _request(cls, url, *, data=None, headers=None, timeout=_TIMEOUT): return await asyncio.get_event_loop().run_in_executor( None, _post_request, url, data, headers, timeout ) @classmethod async def register(cls, key: str) -> dict: """ Register server with key :param key: registration key :return: license token in case of success """ _, token = await cls._request(cls._REGISTER_URL, data={"key": key}) return token @classmethod async def checkin( cls, server_id: str, users_count: int, hostname: str = None, ): """ Update license token :param str server_id: server id :param int users_count: users count :param str hostname: current server hostname :return: dict new license token """ hostname = hostname or get_hostname() imunify_email_status = await get_imunify_email_status() req = { "id": server_id, "hostname": hostname, "im": { "users": users_count, "imunifyEmail": imunify_email_status, "supported_features": { "IM_EMAIL": await is_imunify_email_supported(), }, }, } data = json.dumps(req) logger.info("CLN checkin: %s", data) _, token = await cls._request( cls._CHECKIN_URL, data=data, headers={"Content-type": "application/json"}, ) return token @classmethod async def acronis_credentials(cls, server_id: str) -> dict: """ Creates Acronis Backup account and get user & password :param server_id: server id """ _, creds = await cls._request( cls._ACRONIS_CREDENTIALS_URL, data={"id": server_id} ) return creds @classmethod async def acronis_remove(cls, server_id: str): """ Removes Acronis Backup account :param server_id: server id """ await cls._request(cls._ACRONIS_REMOVE_URL, data={"id": server_id}) @classmethod async def acronis_check(cls, server_id: str) -> dict: """ If Acronis account exists return backup size in GB or if backups not exists URL for backups :param server_id: server id """ status, response = await cls._request( cls._ACRONIS_CHECK_URL, data={"id": server_id} ) if status == 244: # Backup not found raise BackupNotFound(url=None) # Prohibit purchasing a new backup return response @classmethod async def unregister(cls, server_id=None): """ Unregister server id :return: None """ server_id = server_id or LicenseCLN.get_server_id() await cls._request(cls._UNREGISTER_URL, data={"id": server_id}) class CLN: _CALLBACKS = defaultdict(set) @classmethod def add_callback_for(cls, method_name, coro_callback): cls._CALLBACKS[method_name].add(coro_callback) @classmethod async def run_callbacks_for(cls, method_name): for callback in cls._CALLBACKS[method_name]: try: await callback() except asyncio.CancelledError: raise except Exception as e: logger.exception( "Error '{!r}' happened when run callback {} for" "CLN {} method".format(e, callback, method_name) ) @classmethod def is_avp_key(cls, key): return key.startswith("IMAVP") @classmethod async def register(cls, key): if cls.is_avp_key(key) and not ANTIVIRUS_MODE: raise InvalidLicenseError( "Imunify360 can not be registered with ImunifyAV+ key" ) license = await RestCLN.register(key) # in case of IP license, we have to register to know if license is # valid for server (i.e. Imunify360 license is used for Imunify360) if not LicenseCLN.is_valid(license): # release registered server id await RestCLN.unregister(license["id"]) raise InvalidLicenseError("License is invalid for this server") LicenseCLN.update(license) await cls.run_callbacks_for("register") @classmethod async def unregister(cls): await RestCLN.unregister() LicenseCLN.delete() await cls.run_callbacks_for("unregister") @classmethod async def refresh_token(cls, token): """Refreshes token and returns new one on success, None otherwise""" if LicenseCLN.is_free(): # noop: free license can not be refreshed return LicenseCLN.get_token() if LicenseCLN.get_token().get("is_alternative"): # self-signed licenses are refreshed by customer return LicenseCLN.get_token() new_token = await RestCLN.checkin(token["id"], LicenseCLN.users_count) logger.info("Got new token from CLN: %s", new_token) if new_token is None: await CLN.unregister() else: LicenseCLN.update(new_token) await cls.run_callbacks_for("refresh_token") return LicenseCLN.get_token() def subscribe_to_license_changes(coro): for method_name in ["register", "unregister", "refresh_token"]: CLN.add_callback_for(method_name, coro_callback=coro)