import json import urllib.error import urllib.request import urllib.parse import asyncio from typing import List import time import logging from defence360agent.utils import retry_on, split_for_chunk from defence360agent.api.server import API, APIError logger = logging.getLogger(__name__) class ReputationAPI(API): REQUEST_URL = "/api/reputation/check" RESULT_URL = "/api/reputation/result" # during stress tests 'Request Entity Too Large' error has been caught, # in request size somewhere between 800000 and 900000 bytes # max domain length - 255, 800000 / 255 = 3137 # 3000 is the nearest 'round' number CHUNK_SIZE = 3000 WAIT_BEFORE_RETRY = 5 WAIT_FOR_RESULT = 1200 _SOCKET_TIMEOUT = 60 @classmethod async def check(cls, domains: List[str]) -> List[dict]: logger.info("DomainListRequest domains: %s", domains) loop = asyncio.get_event_loop() return await loop.run_in_executor(None, cls._check, domains) @classmethod def _check(cls, domains: List[str]) -> List[dict]: result_list = [] for chunk in split_for_chunk(domains, cls.CHUNK_SIZE): result = cls._check_chunk(chunk) next_chunk = cls._get_result(result["result_id"]) result_list += next_chunk return result_list @classmethod @retry_on(APIError, timeout=WAIT_FOR_RESULT) def _check_chunk(cls, chunk) -> dict: check_request = urllib.request.Request( cls._BASE_URL + cls.REQUEST_URL, method="POST", headers={"Content-Type": "application/json"}, data=json.dumps(dict(domains=chunk)).encode(), ) return cls.request(check_request) @classmethod @retry_on(APIError, timeout=WAIT_FOR_RESULT) def _get_result(cls, result_id: str): data = dict(result_id=result_id) url = "{}?{}".format( cls._BASE_URL + cls.RESULT_URL, urllib.parse.urlencode(data) ) request = urllib.request.Request(url) response = cls.request(request) result = response["result"] if result is None: # time inside sync executor time.sleep(cls.WAIT_BEFORE_RETRY) raise APIError("Response not ready yet") return result