import asyncio from contextlib import suppress import logging from subprocess import TimeoutExpired import time from random import randint from defence360agent.contracts.config import ANTIVIRUS_MODE, CustomBilling from defence360agent.contracts.hook_events import HookEvent from defence360agent.contracts.license import LicenseCLN, AV_DEFAULT_ID from defence360agent.contracts.plugins import MessageSource from defence360agent.internals.cln import CLN, CLNError from defence360agent.internals.iaid import APIError, IndependentAgentIDAPI from defence360agent.subsys.panels import hosting_panel from defence360agent.subsys.panels.base import PanelException from defence360agent.utils import await_for, recurring_check, retry_on from defence360agent.utils.common import DAY, HOUR logger = logging.getLogger(__name__) class CheckLicense(MessageSource): TOKEN_UPDATE_PERIOD = DAY RETRY_TIMEOUT = HOUR HOOK_CHECK_TIMEOUT = DAY HOOK_EXPIRING_TIME_DELTA = 3 * DAY def __init__(self): self.loop = None self.sink = None self.check_hooks_task = None self.check_license_task = None self.check_iaid_token_task = None self.expiring_called = False self.expired_called = False async def create_source(self, loop, sink): self.loop = loop self.sink = sink self.check_hooks_task = self.loop.create_task(self.check_hooks()) self.check_license_task = self.loop.create_task( self._recurring_check() ) async def shutdown(self): self.check_hooks_task.cancel() self.check_license_task.cancel() if self.check_iaid_token_task: self.check_iaid_token_task.cancel() with suppress(asyncio.CancelledError): await self.check_license_task await self.check_hooks_task await self.check_iaid_token_task async def _recurring_check(self): while True: try: await asyncio.sleep(await self._check()) except asyncio.CancelledError: break except TimeoutExpired: logger.error("Token signatures verification timeout expired") await asyncio.sleep(self.RETRY_TIMEOUT) except Exception: # NOSONAR pylint:W0703 logger.exception("An exception occurred during license check") await asyncio.sleep(self.RETRY_TIMEOUT) async def _register_by_ip(self): if ANTIVIRUS_MODE and not CustomBilling.IP_LICENSE: if CustomBilling.UPGRADE_URL or CustomBilling.UPGRADE_URL_360: return self.TOKEN_UPDATE_PERIOD try: await CLN.register("IPL") return self.TOKEN_UPDATE_PERIOD + randint( 0, self.TOKEN_UPDATE_PERIOD // 2 ) except CLNError as e: logger.warning("Failed to register by ip: %s", e) return self.TOKEN_UPDATE_PERIOD except asyncio.CancelledError: raise except Exception as e: logger.error("Failed to register by ip: %s", e) return self.RETRY_TIMEOUT @retry_on(APIError, on_error=await_for(seconds=HOUR), timeout=DAY - HOUR) async def _iaid_token_check(self): await IndependentAgentIDAPI.ensure_is_activated_and_valid() async def _check(self): # Instead of checking users count every time license is checked # (and trying to update license if user limit exceeded) # we only detect number of users during checkin. # This way, if we exceeded user limit, we will get extended license # from cln immediately logger.info("Checkin IAID token") if ( self.check_iaid_token_task and not self.check_iaid_token_task.done() ): self.check_iaid_token_task.cancel() with suppress(asyncio.CancelledError): await self.check_iaid_token_task if self.loop: # for unit-tests where loop is not initialized self.check_iaid_token_task = self.loop.create_task( self._iaid_token_check() ) logger.info("Checking token") panel = hosting_panel.HostingPanel() try: LicenseCLN.users_count = await panel.users_count() except PanelException as e: logger.error("Failed to get users count: %s", e) return self.RETRY_TIMEOUT LicenseCLN.get_token.cache_clear() if not LicenseCLN.is_registered(): logger.info("Server is not registered, skipping checkin") # Trying to get ip-based license return await self._register_by_ip() else: now = time.time() token = LicenseCLN.get_token() # For paid license if less then 2 days or user limit exceeded than # refreshing token logger.info("Checking token expiration %r", token) token_will_be_expired = token["token_expire_utc"] - now if ( token["id"] != AV_DEFAULT_ID and (token_will_be_expired < self.TOKEN_UPDATE_PERIOD) or (LicenseCLN.users_count > token["limit"]) ): try: if (await CLN.refresh_token(token)) is None: # license is invalid return self.TOKEN_UPDATE_PERIOD except CLNError as e: logger.warning("CLN API error: %s", e) if not LicenseCLN.is_registered(): # if we have an error, we will try to register by ip return await self._register_by_ip() else: return self.RETRY_TIMEOUT else: # check token again not earlier than half of the token # expiration or half of the day # and no later than the token expiration (3/4 exp_time) # or a day now = time.time() token_will_be_expired = ( LicenseCLN() .get_token() .get( "token_expire_utc", now + self.TOKEN_UPDATE_PERIOD ) - now ) if token_will_be_expired <= 0: # Try another time in a day return self.TOKEN_UPDATE_PERIOD if token_will_be_expired > self.TOKEN_UPDATE_PERIOD: token_will_be_expired = int(self.TOKEN_UPDATE_PERIOD) return token_will_be_expired // 2 + randint( 0, token_will_be_expired // 4 ) else: # more then a day, sleeping return self.TOKEN_UPDATE_PERIOD @recurring_check(HOOK_CHECK_TIMEOUT) async def check_hooks(self): time_now_utc = int(time.time()) exp_time = LicenseCLN().get_token().get("license_expire_utc") if exp_time is None: return if exp_time <= time_now_utc: if not self.expired_called: hook = HookEvent.LicenseExpired(exp_time=exp_time) await self.sink.process_message(hook) self.expired_called = True elif ( exp_time - self.HOOK_EXPIRING_TIME_DELTA < time_now_utc < exp_time ): if not self.expiring_called: hook = HookEvent.LicenseExpiring(exp_time=exp_time) await self.sink.process_message(hook) self.expiring_called = True