""" This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program.  If not, see . Copyright © 2019 Cloud Linux Software Inc. This software is also available under ImunifyAV commercial license, see """ import asyncio import logging import time from contextlib import suppress from random import randint from subprocess import TimeoutExpired from defence360agent.contracts.config import ANTIVIRUS_MODE, CustomBilling from defence360agent.contracts.hook_events import HookEvent from defence360agent.contracts.license import AV_DEFAULT_ID, LicenseCLN from defence360agent.contracts.plugins import MessageSource from defence360agent.internals.cln import CLN, CLNError from defence360agent.internals.iaid import APIError, IndependentAgentIDAPI from defence360agent.subsys.panels import hosting_panel from defence360agent.subsys.panels.base import PanelException from defence360agent.utils import await_for, recurring_check, retry_on from defence360agent.utils.common import DAY, HOUR from imav.patchman.license import License as PatchmanLicense logger = logging.getLogger(__name__) class CheckLicense(MessageSource): TOKEN_UPDATE_PERIOD = DAY RETRY_TIMEOUT = HOUR HOOK_CHECK_TIMEOUT = DAY HOOK_EXPIRING_TIME_DELTA = 3 * DAY def __init__(self): self.loop = None self.sink = None self.check_hooks_task = None self.check_license_task = None self.check_iaid_token_task = None self.expiring_called = False self.expired_called = False async def create_source(self, loop, sink): self.loop = loop self.sink = sink self.check_hooks_task = self.loop.create_task(self.check_hooks()) self.check_license_task = self.loop.create_task( self._recurring_check() ) async def shutdown(self): self.check_hooks_task.cancel() self.check_license_task.cancel() if self.check_iaid_token_task: self.check_iaid_token_task.cancel() with suppress(asyncio.CancelledError): await self.check_license_task await self.check_hooks_task await self.check_iaid_token_task async def _recurring_check(self): while True: try: await asyncio.sleep(await self._check()) except asyncio.CancelledError: break except TimeoutExpired: logger.error("Token signatures verification timeout expired") await asyncio.sleep(self.RETRY_TIMEOUT) except Exception: # NOSONAR pylint:W0703 logger.exception("An exception occurred during license check") await asyncio.sleep(self.RETRY_TIMEOUT) async def _register_by_ip(self) -> [bool, float]: if ANTIVIRUS_MODE and not CustomBilling.IP_LICENSE: if CustomBilling.UPGRADE_URL or CustomBilling.UPGRADE_URL_360: return False, self.TOKEN_UPDATE_PERIOD return await self._register_by_key(key="IPL") async def _register_by_key(self, key: str) -> [bool, float]: """ Try to register imunify key in CLN. :param str key: key to register :return: tuple of (bool, float): (success, timeout) """ try: await CLN.register(key) return True, self.TOKEN_UPDATE_PERIOD + randint( 0, self.TOKEN_UPDATE_PERIOD // 2 ) except CLNError as e: logger.warning("Failed to register: %s", e) return False, self.TOKEN_UPDATE_PERIOD except asyncio.CancelledError: raise except Exception as e: logger.error("Failed to register: %s", e) return False, self.RETRY_TIMEOUT async def _register_linked_license(self) -> float: """ Try to register any available license for the current customer. IPL license has the highest priority. Returns the timeout value. """ registered, timeout = await self._register_by_ip() if not registered and PatchmanLicense.is_active(): if key := await PatchmanLicense.get_imunify_key(): _, timeout = await self._register_by_key(key) return timeout @retry_on(APIError, on_error=await_for(seconds=HOUR), timeout=DAY - HOUR) async def _iaid_token_check(self): await IndependentAgentIDAPI.ensure_is_activated_and_valid() async def _check(self): # Instead of checking users count every time license is checked # (and trying to update license if user limit exceeded) # we only detect number of users during checkin. # This way, if we exceeded user limit, we will get extended license # from cln immediately logger.info("Checkin IAID token") if ( self.check_iaid_token_task and not self.check_iaid_token_task.done() ): self.check_iaid_token_task.cancel() with suppress(asyncio.CancelledError): await self.check_iaid_token_task if self.loop: # for unit-tests where loop is not initialized self.check_iaid_token_task = self.loop.create_task( self._iaid_token_check() ) logger.info("Checking token") panel = hosting_panel.HostingPanel() try: LicenseCLN.users_count = await panel.users_count() except PanelException as e: logger.error("Failed to get users count: %s", e) return self.RETRY_TIMEOUT LicenseCLN.get_token.cache_clear() if ( not LicenseCLN.is_registered() or LicenseCLN.is_free() and PatchmanLicense.is_active() ): logger.info("Server is not registered, skipping checkin") # Trying to get ip-based license return await self._register_linked_license() else: now = time.time() token = LicenseCLN.get_token() # For paid license if less then 2 days or user limit exceeded than # refreshing token logger.info("Checking token expiration %r", token) token_will_be_expired = token["token_expire_utc"] - now if ( token["id"] != AV_DEFAULT_ID and (token_will_be_expired < self.TOKEN_UPDATE_PERIOD) or (LicenseCLN.users_count > token["limit"]) ): try: if (await CLN.refresh_token(token)) is None: # license is invalid return self.TOKEN_UPDATE_PERIOD except CLNError as e: logger.warning("CLN API error: %s", e) if ( not LicenseCLN.is_registered() or LicenseCLN.is_free() and PatchmanLicense.is_active() ): # if we have an error, we will try to register by ip return await self._register_linked_license() else: return self.RETRY_TIMEOUT else: # check token again not earlier than half of the token # expiration or half of the day # and no later than the token expiration (3/4 exp_time) # or a day now = time.time() token_will_be_expired = ( LicenseCLN() .get_token() .get( "token_expire_utc", now + self.TOKEN_UPDATE_PERIOD ) - now ) if token_will_be_expired <= 0: # Try another time in a day return self.TOKEN_UPDATE_PERIOD if token_will_be_expired > self.TOKEN_UPDATE_PERIOD: token_will_be_expired = int(self.TOKEN_UPDATE_PERIOD) return token_will_be_expired // 2 + randint( 0, token_will_be_expired // 4 ) else: # more then a day, sleeping return self.TOKEN_UPDATE_PERIOD @recurring_check(HOOK_CHECK_TIMEOUT) async def check_hooks(self): time_now_utc = int(time.time()) exp_time = LicenseCLN().get_token().get("license_expire_utc") if exp_time is None: return if exp_time <= time_now_utc: if not self.expired_called: hook = HookEvent.LicenseExpired(exp_time=exp_time) await self.sink.process_message(hook) self.expired_called = True elif ( exp_time - self.HOOK_EXPIRING_TIME_DELTA < time_now_utc < exp_time ): if not self.expiring_called: hook = HookEvent.LicenseExpiring(exp_time=exp_time) await self.sink.process_message(hook) self.expiring_called = True