import logging from defence360agent.rpc_tools import lookup from defence360agent.rpc_tools.validate import ( ValidationError, validate_av_plus_license, ) from defence360agent.subsys.panels.base import PanelException from defence360agent.model.infected_domain import InfectedDomainList from defence360agent.subsys.panels import hosting_panel from defence360agent.api.server.reputation import ReputationAPI from defence360agent.model.simplification import run_in_executor logger = logging.getLogger(__name__) class ReputationManagementEndpoints(lookup.RootEndpoints): @lookup.bind("infected-domains") @validate_av_plus_license async def list_domains(self, limit, offset): existing_users = set(await hosting_panel.HostingPanel().get_users()) items, max_count = InfectedDomainList.get_by_user( existing_users, offset=offset, limit=limit ) return { "items": items, "max_count": max_count, } @lookup.bind("check-domains") @validate_av_plus_license async def check_domains(self): hp = hosting_panel.HostingPanel() # TODO: strange behaviour is detected # I think it's normal case for cPanel DNS only # we should do not process domains if it not found or panel # not available if not hp.is_installed(): raise ValidationError("No avaliable control panel found!") try: domains = await hp.get_user_domains() except PanelException as e: raise ValidationError(str(e)) if not domains: raise ValidationError("Domains not found") reputation_data = await ReputationAPI.check(domains) domain_to_user = ( await hosting_panel.HostingPanel().get_domain_to_owner() ) await run_in_executor( None, lambda: InfectedDomainList.refresh_domains( reputation_data, domain_to_user ), )