"""
This program is free software: you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License,
or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see .
Copyright © 2019 Cloud Linux Software Inc.
This software is also available under ImunifyAV commercial license,
see
"""
import asyncio
import logging
import time
from contextlib import suppress
from random import randint
from subprocess import TimeoutExpired
from defence360agent.contracts.config import ANTIVIRUS_MODE, CustomBilling
from defence360agent.contracts.hook_events import HookEvent
from defence360agent.contracts.license import AV_DEFAULT_ID, LicenseCLN
from defence360agent.contracts.plugins import MessageSource
from defence360agent.internals.cln import CLN, CLNError
from defence360agent.internals.iaid import APIError, IndependentAgentIDAPI
from defence360agent.subsys.panels import hosting_panel
from defence360agent.subsys.panels.base import PanelException
from defence360agent.utils import await_for, recurring_check, retry_on
from defence360agent.utils.common import DAY, HOUR
from imav.patchman.license import License as PatchmanLicense
logger = logging.getLogger(__name__)
class CheckLicense(MessageSource):
TOKEN_UPDATE_PERIOD = DAY
RETRY_TIMEOUT = HOUR
HOOK_CHECK_TIMEOUT = DAY
HOOK_EXPIRING_TIME_DELTA = 3 * DAY
def __init__(self):
self.loop = None
self.sink = None
self.check_hooks_task = None
self.check_license_task = None
self.check_iaid_token_task = None
self.expiring_called = False
self.expired_called = False
async def create_source(self, loop, sink):
self.loop = loop
self.sink = sink
self.check_hooks_task = self.loop.create_task(self.check_hooks())
self.check_license_task = self.loop.create_task(
self._recurring_check()
)
async def shutdown(self):
self.check_hooks_task.cancel()
self.check_license_task.cancel()
if self.check_iaid_token_task:
self.check_iaid_token_task.cancel()
with suppress(asyncio.CancelledError):
await self.check_license_task
await self.check_hooks_task
await self.check_iaid_token_task
async def _recurring_check(self):
while True:
try:
await asyncio.sleep(await self._check())
except asyncio.CancelledError:
break
except TimeoutExpired:
logger.error("Token signatures verification timeout expired")
await asyncio.sleep(self.RETRY_TIMEOUT)
except Exception: # NOSONAR pylint:W0703
logger.exception("An exception occurred during license check")
await asyncio.sleep(self.RETRY_TIMEOUT)
async def _register_by_ip(self) -> [bool, float]:
if ANTIVIRUS_MODE and not CustomBilling.IP_LICENSE:
if CustomBilling.UPGRADE_URL or CustomBilling.UPGRADE_URL_360:
return False, self.TOKEN_UPDATE_PERIOD
return await self._register_by_key(key="IPL")
async def _register_by_key(self, key: str) -> [bool, float]:
"""
Try to register imunify key in CLN.
:param str key: key to register
:return: tuple of (bool, float): (success, timeout)
"""
try:
await CLN.register(key)
return True, self.TOKEN_UPDATE_PERIOD + randint(
0, self.TOKEN_UPDATE_PERIOD // 2
)
except CLNError as e:
logger.warning("Failed to register: %s", e)
return False, self.TOKEN_UPDATE_PERIOD
except asyncio.CancelledError:
raise
except Exception as e:
logger.error("Failed to register: %s", e)
return False, self.RETRY_TIMEOUT
async def _register_linked_license(self) -> float:
"""
Try to register any available license for the current customer.
IPL license has the highest priority.
Returns the timeout value.
"""
registered, timeout = await self._register_by_ip()
if not registered and PatchmanLicense.is_active():
if key := await PatchmanLicense.get_imunify_key():
_, timeout = await self._register_by_key(key)
return timeout
@retry_on(APIError, on_error=await_for(seconds=HOUR), timeout=DAY - HOUR)
async def _iaid_token_check(self):
await IndependentAgentIDAPI.ensure_is_activated_and_valid()
async def _check(self):
# Instead of checking users count every time license is checked
# (and trying to update license if user limit exceeded)
# we only detect number of users during checkin.
# This way, if we exceeded user limit, we will get extended license
# from cln immediately
logger.info("Checkin IAID token")
if (
self.check_iaid_token_task
and not self.check_iaid_token_task.done()
):
self.check_iaid_token_task.cancel()
with suppress(asyncio.CancelledError):
await self.check_iaid_token_task
if self.loop:
# for unit-tests where loop is not initialized
self.check_iaid_token_task = self.loop.create_task(
self._iaid_token_check()
)
logger.info("Checking token")
panel = hosting_panel.HostingPanel()
try:
LicenseCLN.users_count = await panel.users_count()
except PanelException as e:
logger.error("Failed to get users count: %s", e)
return self.RETRY_TIMEOUT
LicenseCLN.get_token.cache_clear()
if (
not LicenseCLN.is_registered()
or LicenseCLN.is_free()
and PatchmanLicense.is_active()
):
logger.info("Server is not registered, skipping checkin")
# Trying to get ip-based license
return await self._register_linked_license()
else:
now = time.time()
token = LicenseCLN.get_token()
# For paid license if less then 2 days or user limit exceeded than
# refreshing token
logger.info("Checking token expiration %r", token)
token_will_be_expired = token["token_expire_utc"] - now
if (
token["id"] != AV_DEFAULT_ID
and (token_will_be_expired < self.TOKEN_UPDATE_PERIOD)
or (LicenseCLN.users_count > token["limit"])
):
try:
if (await CLN.refresh_token(token)) is None:
# license is invalid
return self.TOKEN_UPDATE_PERIOD
except CLNError as e:
logger.warning("CLN API error: %s", e)
if (
not LicenseCLN.is_registered()
or LicenseCLN.is_free()
and PatchmanLicense.is_active()
):
# if we have an error, we will try to register by ip
return await self._register_linked_license()
else:
return self.RETRY_TIMEOUT
else:
# check token again not earlier than half of the token
# expiration or half of the day
# and no later than the token expiration (3/4 exp_time)
# or a day
now = time.time()
token_will_be_expired = (
LicenseCLN()
.get_token()
.get(
"token_expire_utc", now + self.TOKEN_UPDATE_PERIOD
)
- now
)
if token_will_be_expired <= 0:
# Try another time in a day
return self.TOKEN_UPDATE_PERIOD
if token_will_be_expired > self.TOKEN_UPDATE_PERIOD:
token_will_be_expired = int(self.TOKEN_UPDATE_PERIOD)
return token_will_be_expired // 2 + randint(
0, token_will_be_expired // 4
)
else:
# more then a day, sleeping
return self.TOKEN_UPDATE_PERIOD
@recurring_check(HOOK_CHECK_TIMEOUT)
async def check_hooks(self):
time_now_utc = int(time.time())
exp_time = LicenseCLN().get_token().get("license_expire_utc")
if exp_time is None:
return
if exp_time <= time_now_utc:
if not self.expired_called:
hook = HookEvent.LicenseExpired(exp_time=exp_time)
await self.sink.process_message(hook)
self.expired_called = True
elif (
exp_time - self.HOOK_EXPIRING_TIME_DELTA < time_now_utc < exp_time
):
if not self.expiring_called:
hook = HookEvent.LicenseExpiring(exp_time=exp_time)
await self.sink.process_message(hook)
self.expiring_called = True