""" This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program.  If not, see . Copyright © 2019 Cloud Linux Software Inc. This software is also available under ImunifyAV commercial license, see """ from logging import getLogger import asyncio from defence360agent.contracts.plugins import MessageSink, Scope from defence360agent.internals.cln import LicenseCLN from defence360agent.subsys.panels import hosting_panel, cpanel from defence360agent.internals.cln import CLN, CLNError from defence360agent.utils import RecurringCheckStop, recurring_check logger = getLogger(__name__) class Imunify360Register(MessageSink): """This plugin is used to register the server by IP and should work only for cPanel and ImunifyAV cases.""" SCOPE = Scope.AV async def create_sink(self, loop): self._loop = loop if hosting_panel.HostingPanel().NAME == cpanel.cPanel.NAME: self._task = self._loop.create_task(self._register()) else: self._task = None async def shutdown(self): if self._task: self._task.cancel() await self._task @recurring_check(24 * 60 * 60) async def _register(self): if LicenseCLN.license_info().get("license_type") == "imunify360": raise RecurringCheckStop() try: await CLN.register("IPL") except asyncio.CancelledError: raise except CLNError as e: if not ("IP license not found" in str(e.message)): logger.error("CLN errror on attempt to register by ip: %s", e) except Exception as e: logger.error("Error on attempt to register by ip: %r", e)