import asyncio import logging import os import subprocess as su from typing import Iterable, Union from defence360agent.contracts.config import Core from defence360agent.utils import check_run, run, OsReleaseInfo logger = logging.getLogger(__name__) DOS_PROTECTOR_SERVICE_NAME = "imunify360-dos-protection" UAL_SERVICE_NAME = "imunify360-unified-access-logger" PAM_SERVICE_NAME = "imunify360-pam" AUDITD_SERVICE_NAME = "imunify-auditd-log-reader" SCANLOGD_SERVICE_NAME = "imunify360-scanlogd" def _apply_cmd(func): async def wrapper(*args, **kwargs): cmd = func(*args, **kwargs) logger.debug("check_call(%r)", cmd) await check_run(cmd) return wrapper async def _reset_failed_state( services: Iterable[Union["_CentOs6", "_SystemctlBased"]] ): for s in services: await s.reset_failed() await s.restart() for _ in range(10): if await s.is_active(): break logger.warning( "Service %s is still not active, sleep for %s seconds", s, 1 ) await asyncio.sleep(1) class _CentOs6: SVC_CTL_BIN = "/sbin/service" _CHKCONFIG = "/sbin/chkconfig" def __init__(self, service_name): self._service_name = service_name @_apply_cmd def start(self): return [self.SVC_CTL_BIN, self._service_name, "start"] @_apply_cmd def stop(self): return [self.SVC_CTL_BIN, self._service_name, "stop"] @_apply_cmd def restart(self): return [self.SVC_CTL_BIN, self._service_name, "restart"] async def reset_failed(self): """Not implemented for Centos6""" pass @_apply_cmd def enable(self, **kwargs): return [self._CHKCONFIG, "--add", self._service_name] async def is_enabled(self): cmd = [self._CHKCONFIG, "--list", self._service_name] proc = await asyncio.create_subprocess_exec( *cmd, stdout=su.PIPE, stderr=su.DEVNULL ) out, _ = await proc.communicate() rc = await proc.wait() return rc == 0 and b":on" in out def is_enabled_sync(self): cmd = [self._CHKCONFIG, "--list", self._service_name] cp = su.run(cmd, stdout=su.PIPE, stderr=su.DEVNULL) return cp.returncode == 0 and b":on" in cp.stdout @_apply_cmd def disable(self): return [self._CHKCONFIG, "--del", self._service_name] mask = disable unmask = enable async def is_active(self): cmd = [self.SVC_CTL_BIN, self._service_name, "status"] exit_code, _, _ = await run(cmd) return exit_code == 0 async def activate_socket_service(self): if await self.is_enabled() and not await self.is_active(): await _reset_failed_state((self,)) def unit_exists(self): cp = su.run( [self._CHKCONFIG, "--list", self._service_name], stdout=su.DEVNULL, stderr=su.DEVNULL, ) return cp.returncode == 0 class _SystemctlBased: SVC_CTL_BIN = "systemctl" def __init__(self, service_name): self._service_name = service_name @_apply_cmd def start(self): return [self.SVC_CTL_BIN, "start", self._service_name] @_apply_cmd def stop(self): return [self.SVC_CTL_BIN, "stop", self._service_name] @_apply_cmd def restart(self): return [self.SVC_CTL_BIN, "restart", self._service_name] @_apply_cmd def _enable_now(self, *, now: bool): return [ self.SVC_CTL_BIN, "enable", *(["--now"] if now else []), self._service_name, ] async def enable(self, *, now: bool): await self._enable_now(now=now) # WARN: Ubuntu 16.04 demonstrates very special behavior of the # `systemcl enable --now` command - if the unit is stopped it # wouldn't be started. We need to handle that case. # TODO: Remove this case on dropping support for Ubuntu 16.04. osinfo = {} try: OsReleaseInfo.dict_from_file(osinfo) except (FileNotFoundError, PermissionError): return if osinfo.get("ID", "").lower() != "ubuntu": return if osinfo.get("VERSION_ID", "") == "16.04": await self.restart() async def is_enabled(self): cmd = [self.SVC_CTL_BIN, "is-enabled", self._service_name] proc = await asyncio.create_subprocess_exec( *cmd, stdout=su.DEVNULL, stderr=su.DEVNULL ) await proc.communicate() rc = await proc.wait() return rc == 0 def is_enabled_sync(self): cmd = [self.SVC_CTL_BIN, "is-enabled", self._service_name] rc = su.call(cmd, stdout=su.DEVNULL, stderr=su.DEVNULL) return rc == 0 @_apply_cmd def disable(self, *, now: bool): return [ self.SVC_CTL_BIN, "disable", *(["--now"] if now else []), self._service_name, ] @_apply_cmd def reload(self): return [self.SVC_CTL_BIN, "reload", self._service_name] @_apply_cmd def mask(self): """ It was created for imunify360-webshield which required masking as far This is no more relevant but let it stay is started by 'Wants=' in imunify360.service """ return [self.SVC_CTL_BIN, "mask", self._service_name] @_apply_cmd def unmask(self): """ It was created for imunify360-webshield which required masking as far This is no more relevant but let it stay is started by 'Wants=' in imunify360.service """ return [self.SVC_CTL_BIN, "unmask", self._service_name] async def is_active(self): cmd = [self.SVC_CTL_BIN, "is-active", self._service_name] exit_code, _, _ = await run(cmd) return exit_code == 0 @_apply_cmd def reset_failed(self): return [self.SVC_CTL_BIN, "reset-failed", self._service_name] async def activate_socket_service(self): agent_service_socket = adaptor(f"{self._service_name}.socket") if ( await agent_service_socket.is_enabled() and not await agent_service_socket.is_active() ): await _reset_failed_state((self, agent_service_socket)) def unit_exists(self): cp = su.run( [self.SVC_CTL_BIN, "cat", self._service_name], stdout=su.DEVNULL, stderr=su.DEVNULL, ) return cp.returncode == 0 class _CentOs7(_SystemctlBased): SVC_CTL_BIN = "/usr/bin/systemctl" class _DebianUbuntu(_SystemctlBased): SVC_CTL_BIN = "/bin/systemctl" class MinidaemonService(_CentOs6): def __init__(self, service_name="minidaemon"): self._service_name = service_name @_apply_cmd def restart(self, service=None): cmd = [self.SVC_CTL_BIN, self._service_name, "restart"] # restart all if service: cmd += [service] # restart specific child process return cmd def adaptor(service_name, *, include_centos6=True): for a in ( _DebianUbuntu, _CentOs7, *((_CentOs6,) if include_centos6 else tuple()), ): if os.path.exists(a.SVC_CTL_BIN): return a(service_name) else: raise RuntimeError("Cannot instantiate appropriate adaptor.") def imunify360_service(): return adaptor(Core.SVC_NAME) def imunify360_dos_protector_service(): try: return adaptor(DOS_PROTECTOR_SERVICE_NAME, include_centos6=False) except RuntimeError: logger.info("DOS Protector service is not available on this system") return None def imunify360_ual_service(): return adaptor(UAL_SERVICE_NAME) def imunify360_pam_service(): return adaptor(PAM_SERVICE_NAME) def imunify360_scanlogd_service(): return adaptor(SCANLOGD_SERVICE_NAME) def imunify360_auditd_service(): unit = adaptor(AUDITD_SERVICE_NAME) if unit.unit_exists(): return adaptor(AUDITD_SERVICE_NAME) logger.info("Auditd-log-reader service is not available on this system") return None