import asyncio import logging from enum import Enum from os import fsdecode from pathlib import Path from typing import List from defence360agent.utils import OsReleaseInfo logger = logging.getLogger(__name__) RUN_WITH_INTENSITY = "/usr/libexec/run-with-intensity" LVECTL_BIN_PATH = Path("/usr/sbin/lvectl") PROC_LVE_LIST_PATH = Path("/proc/lve/list") class LimitsMethod(Enum): NICE = "nice" LVE = "lve" CGROUPS = "cgroups" async def get_current_method() -> LimitsMethod: """Returns limit method, used in run-with-intensity tool.""" proc = await asyncio.create_subprocess_exec( RUN_WITH_INTENSITY, "show", stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) stdout, stderr = await proc.communicate() stdout = fsdecode(stdout).strip() if stdout == "nice": return LimitsMethod.NICE if stdout == "lve": return LimitsMethod.LVE if stdout == "cgroups": return LimitsMethod.CGROUPS raise LookupError( "Parsing of used limitation method failed\nstdout: {}\nstderr: {}" .format(stdout, fsdecode(stderr).strip()) ) async def create_subprocess( cmd: List[str], key: str, intensity_cpu: int, intensity_io: int, **subprocess_kwargs ) -> asyncio.subprocess.Process: """ Creates asyncio.Process with limited resources (cpu & io), using run-with-intensity tool. :param cmd: command to execute :param intensity_cpu: cpu intensity limit :param intensity_io: io intensity limit :param subprocess_kwargs: keyword arguments for create_subprocess_exec func :return: executed Process """ limits_cmd = [ RUN_WITH_INTENSITY, "run", "--intensity-cpu", str(intensity_cpu), "--intensity-io", str(intensity_io), ] limits_cmd.extend(["--key", key]) return await asyncio.create_subprocess_exec( *(limits_cmd + cmd), **subprocess_kwargs ) def is_lve_active() -> bool: """Checks that LVE-utils is active resource limiter.""" # to avoid possible errors such as DEF-11941 # make sure that OS is CL return PROC_LVE_LIST_PATH.exists() and OsReleaseInfo.is_cloudlinux() def has_lvectl() -> bool: """Checks that LVE-utils is installed.""" return LVECTL_BIN_PATH.exists()