import logging import hashlib from clcommon.clwpos_lib import find_wp_paths from defence360agent.api.server.events import EventsAPI from defence360agent.contracts import config from defence360agent.contracts.myimunify_id import get_myimunify_users from defence360agent.utils.whmcs import get_upgrade_url_link from defence360agent.subsys.panels.hosting_panel import HostingPanel from defence360agent.myimunify.advice.dataclass import MyImunifyWPAdvice from defence360agent.myimunify.model import MyImunify logger = logging.getLogger(__name__) ADV_TYPE = "IMUNIFY_PROTECTION" def get_myimunify_protection_status(username): item = [username] response = MyImunify.select().where(MyImunify.user.in_(item)).dicts() if response and response[0].get("protection", False): return "active" else: return "no" async def _make_advice(imunify_advice): """ imunify advice item: {"id": 123, "server_id": null, "type": "malware_found_myimun_2", "date": 123, "severity": 1, "translation_id": "1", "parameters": {}, "description": null, "link_text": null, "link": null, "dashboard": false, "popup": false, "snoozed_until": 0, "popup_title": null, "popup_description": null, "config_action": {}, "ignore": {}, "notification": false, "smartadvice": true, "smartadvice_title": "Web hosting user account is infected", "smartadvice_description": "\nImunify detected live malware on the user account hosting this website:\n\n* inf1\n\n* inf2\n", "smartadvice_user": "isuser", "smartadvice_domain": "isuser.com", "smartadvice_docroot": "/", "ts": 123, "first_generated": 123, "iaid": "agent-iaid-123", "notification_body_html": null, "notification_period_limit": 0, "notification_subject": null, "notification_user": null} -> { "created_at": "2024-10-02T01:22:11.918688+00:00", "updated_at": "2024-10-02T01:22:11.918688+00:00", "metadata": { "app": "imunify" "username": "tkcpanel", "domain": "tk-cpanel.com", "website": "/", "panel_url": "https://10.193.176.2:2083/cpsess0000000000/frontend/paper_lantern/lveversion/wpos.live.pl", }, "advice": { "id": "287718", "type": "CPCSS", "status": "review", "description": "Turn on Critical Path CSS", "is_premium": true, "module_name": "critical_css", "license_status": "NOT_REQUIRED", "subscription": { "status": "active", "upgrade_url": "https://whmcs.dev.cloudlinux.com?username=tkcpanel&domain=tk-cpanel.com&server_ip=10.193.176.2&m=cloudlinux_advantage&action=provisioning&suite=accelerate_wp_premium" }, "total_stages": 0, "completed_stages": 0, "detailed_description": "Critical Path CSS eliminates render-blocking CSS on your website and improves browser page render performance. Your website will load much faster for your visitors.\nNote: Applying the current advice will also enable the AccelerateWP feature." } } """ advices_by_docroot = [] username = imunify_advice["smartadvice_user"] protection_status = get_myimunify_protection_status(username) domain = imunify_advice["smartadvice_domain"] infected_docroot = imunify_advice["smartadvice_docroot"] upgrade_url = imunify_advice["upgrade_url"] description = imunify_advice["smartadvice_title"] detailed_description = imunify_advice["smartadvice_description"] iaid = imunify_advice["iaid"] panel_url = await HostingPanel().panel_user_link(username) for site in find_wp_paths(infected_docroot): website = f"/{site}" # for analytics: iaid-hash(username-domain-website) hashed_udw = hashlib.md5( f"{username}-{domain}-{website}".encode("utf-8") ).hexdigest() adv_id = f"{iaid}_{hashed_udw}" im360_protection_advice = MyImunifyWPAdvice( username=username, domain=domain, website=website, panel_url=panel_url + f"?show_cleanup_dialog=true", # Flag tells UI to display cleanup dialog id=adv_id, type=ADV_TYPE, status="review", description=description, detailed_description=detailed_description, is_premium=False, module_name="imunify", license_status="NOT_REQUIRED", subscription_status=protection_status, upgrade_url=upgrade_url, total_stages=0, completed_stages=0, ) advices_by_docroot.append(im360_protection_advice.to_advice()) return advices_by_docroot async def make_advice() -> list: advices = [] advice_list = await get_advice_notifications() logger.info("IM360 advice list: %s", str(advice_list)) if advice_list: for item in advice_list: try: advice_item = await _make_advice(item) except KeyError as e: logger.error( "Unable to make advice based on item: %s, malformed" " error: %s", str(item), str(e), ) continue advices.extend(advice_item) return advices async def get_advice_notifications() -> [dict]: users_to_report = set( [ item["username"] for item in await get_myimunify_users() if not item["protection"] ] ) users_to_pop = set() for user in users_to_report: conf = config.ConfigFile(user) if not conf.get("CONTROL_PANEL", "smart_advice_allowed"): users_to_pop.add(user) users_to_report = users_to_report - users_to_pop response = await EventsAPI.smart_advices() logger.info( "Smart Advice events API response " "with notifications: %s, users to report: %s", str(response), str(users_to_report), ) data = [ event for event in response if event.get("smartadvice_user") in users_to_report ] for item in data: item["upgrade_url"] = get_upgrade_url_link( item.get("smartadvice_user"), item.get("smartadvice_domain") ) return data