import urllib.parse from typing import List, Optional import defence360agent.subsys.panels.hosting_panel as hp from defence360agent.contracts.config import ( MyImunifyConfig, is_mi_freemium_license, ) from defence360agent.myimunify.model import ( MyImunify, set_protection_status_for_all_users, update_users_protection, ) from defence360agent.rpc_tools import lookup from defence360agent.rpc_tools.utils import run_in_executor_decorator from defence360agent.utils import Scope class MyImunifyEndpoints(lookup.RootEndpoints): SCOPE = Scope.IM360 @lookup.bind("myimunify", "update") async def update(self, items: List[str], protection: str): await update_users_protection( self._sink, items, protection == "enabled" ) return {} @lookup.bind("myimunify", "enable-all") async def enable_all(self): await set_protection_status_for_all_users(self._sink, True) @lookup.bind("myimunify", "disable-all") async def disable_all(self): await set_protection_status_for_all_users(self._sink, False) class MyImunifyCommonEndpoints(lookup.CommonEndpoints): SCOPE = Scope.IM360 @lookup.bind("myimunify", "status") async def status(self, items: List[str], user: Optional[str] = None): purchase_url = MyImunifyConfig.PURCHASE_PAGE_URL panel_manager = hp.HostingPanel() if user is not None: items = [user] # if MY_IMNUNIFY is disabled, we don't need to generate purchase # url with domain and ip [because it will not been shown to user] if MyImunifyConfig.ENABLED: user_domains = ( await panel_manager.get_domains_per_user() ).get(user, []) domain = next(iter(user_domains), None) purchase_url = ( MyImunifyConfig.PURCHASE_PAGE_URL + "/?" + urllib.parse.urlencode( { "m": "cloudlinux_advantage", "action": "provisioning", "suite": "my_imunify_account_protection", "username": user, "domain": domain, "server_ip": panel_manager.get_server_ip(), } ) ) response = MyImunify.select().where(MyImunify.user.in_(items)).dicts() return { "myimunify_enabled": MyImunifyConfig.ENABLED, "purchase_page_url": purchase_url, "is_freemium": is_mi_freemium_license(), "items": [ {"username": item["user"], "protection": item["protection"]} for item in response ], }