import itertools import logging import time from peewee import ( CharField, FloatField, IntegerField, TextField, ) from defence360agent.model import instance, Model logger = logging.getLogger(__name__) class InfectedDomainList(Model): """Domains with bad reputation, used for Reputation Management feature.""" id = IntegerField(primary_key=True) #: Username associated with the domain in hosting panel. username = CharField(null=True) #: Domain name. name = CharField(null=False) #: The kind of threat reported by reputation engine, #: e.g. "SOCIAL_ENGINEERING". threat_type = CharField(null=False) #: The time when Imunify first detected that the domain has bad reputation. timestamp = FloatField() #: The name of the reputation engine, e.g. "google-safe-browsing". vendor = TextField(null=True) class Meta: database = instance.db db_table = "infected_domain_list" @classmethod def get_by_user(cls, existing_users, offset=0, limit=50): # to be able to filter query results using existing_users, # limit/offset os applied on python side query = cls.select().order_by( cls.username, cls.name, cls.timestamp.desc() ) filtered_by_user = ( row for row in query.dicts() if row["username"] in existing_users ) grouped = itertools.groupby( filtered_by_user, key=lambda row: (row["username"], row["name"]) ) max_count = 0 result = [] for i, value in enumerate(grouped): max_count += 1 if (len(result) < limit) and (i >= offset): group, threats = value username, name = group result.append( { "username": username, "domain": name, "threats": [ { "type": t["threat_type"], "vendor": t["vendor"], "timestamp": t["timestamp"], } for t in threats ], } ) return result, max_count @classmethod def refresh_domains(cls, domains, domains_to_users): """ Update domain reputatuion info. If threat info already exists, do not update timestamp :param domains: reputation data from server :param domains_to_users: domain -> users mapping from hosting panel :return: """ existing = { (r["name"], r["threat_type"], r["vendor"]): r["timestamp"] for r in cls.select().dicts() } with instance.db.atomic(): cls.delete().execute() now = time.time() for domain_info in domains: domain = domain_info["query"] if domain not in domains_to_users: logger.warning("Users for domain %s not found.", domain) continue for user in domains_to_users[domain]: vendor = domain_info["vendor"] if vendor in ( "google-safe-browsing", "yandex-safe-browsing", ): threat_type = domain_info["details"]["threat_type"] elif vendor == "spamhaus": threat_type = domain_info["details"] elif vendor in ("phishtank", "openphish"): # https://cloudlinux.atlassian.net/wiki/spaces/IPT/pages/929759302/4.1+Multiple+vendors+in+Reputation+Management+ver.4.2 # noqa: E501 threat_type = "spam domain" else: threat_type = "THREAT_TYPE_UNSPECIFIED" timestamp = existing.get( (domain, threat_type, vendor), now ) cls.create( username=user, name=domain, threat_type=threat_type, vendor=vendor, timestamp=timestamp, )