import asyncio import logging import time from pathlib import Path from defence360agent.internals.iaid import IAIDTokenError from defence360agent.api.server import APIError from defence360agent.api.server.events import EventsAPI from defence360agent.contracts.config import ( Core, IContactMessageType, ) from defence360agent.contracts.messages import MessageType from defence360agent.contracts.plugins import ( MessageSink, MessageSource, ) from defence360agent.internals.the_sink import TheSink from defence360agent.model.icontact import IContactThrottle from defence360agent.subsys.panels.cpanel import cPanel from defence360agent.subsys.panels.hosting_panel import HostingPanel from defence360agent.utils import ( await_for, create_task_and_log_exceptions, recurring_check, retry_on, Scope, ) from defence360agent.utils.common import DAY logger = logging.getLogger(__name__) async def async_log_on_error(e, i): logger.warning( "Can't get recommendations for the dashboard due to " "iaid token error, reason: %s. Attempt %s", e, i, ) await_for(seconds=100) class IContactSender(MessageSink, MessageSource): PROCESSING_ORDER = MessageSink.ProcessingOrder.ICONTACT_SENT SCOPE = Scope.AV_IM360 def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._tasks = [] self._notification_flag_path = ( Path(Core.TMPDIR) / "icontact_generic_notifications" ) async def create_sink(self, loop): pass async def _send_icontact_message( self, *, message_type, params, period_limit, user=None, ): if message_type is None: return if not IContactThrottle.may_be_notified( message_type, period_limit, user=user, ): return template_args = await self._panel.notify( message_type=IContactMessageType.GENERIC, params=params, user=user, ) if template_args: IContactThrottle.refresh(message_type, user=user) sent_message = MessageType.IContactSent( message_type=message_type, timestamp=int(time.time()), template_args=template_args, ) await self._sink.process_message(sent_message) async def create_source(self, loop, sink: TheSink): self._sink = sink self._panel = HostingPanel() if self._panel.NAME == cPanel.NAME: self._tasks = [ create_task_and_log_exceptions( loop, self.generic_notifications ) ] async def shutdown(self): for task in self._tasks: task.cancel() await asyncio.gather(*self._tasks, return_exceptions=True) @retry_on( APIError, on_error=await_for(seconds=10), max_tries=3, silent=True, log=logger, ) @retry_on( IAIDTokenError, on_error=async_log_on_error, max_tries=3, silent=True, log=logger, ) async def get_notifications(self) -> list: notifications = [] if ( not self._notification_flag_path.exists() or (self._notification_flag_path.stat().st_mtime + DAY) < time.time() ): # send notification request no more than once a day notifications = await EventsAPI.notification() # update flag modify time self._notification_flag_path.touch(mode=0o644, exist_ok=True) return notifications @recurring_check(DAY) async def generic_notifications(self): if notifications := await self.get_notifications(): logger.info( "Sending %s generic icontact notifications", len(notifications) ) for notification in notifications: await self._send_icontact_message( message_type=notification["type"], params={ "subject": notification["notification_subject"], "body_html": notification["notification_body_html"], }, period_limit=notification["notification_period_limit"], user=notification.get("notification_user"), )