""" This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program.  If not, see . Copyright © 2019 Cloud Linux Software Inc. This software is also available under ImunifyAV commercial license, see """ from collections import namedtuple import asyncio import ctypes import errno import logging import os import struct import platform from defence360agent.subsys import sysctl Event = namedtuple("Event", ("path", "flags", "cookie", "name", "wd")) logger = logging.getLogger(__name__) class Inotify: """ Tiny wrapper for inotify api. See `man inotify` for details """ ACCESS = 0x1 #: File was accessed MODIFY = 0x2 #: File was modified ATTRIB = 0x4 #: Metadata changed CLOSE_WRITE = 0x8 #: Writable file was closed CLOSE_NOWRITE = 0x10 #: Unwritable file closed OPEN = 0x20 #: File was opened MOVED_FROM = 0x40 #: File was moved from X MOVED_TO = 0x80 #: File was moved to Y CREATE = 0x100 #: Subfile was created DELETE = 0x200 #: Subfile was deleted DELETE_SELF = 0x400 #: Self was deleted MOVE_SELF = 0x800 #: Self was moved UNMOUNT = 0x2000 #: Backing fs was unmounted Q_OVERFLOW = 0x4000 #: Event queue overflowed IGNORED = 0x8000 #: File was ignored ONLYDIR = 0x1000000 #: only watch the path if it is a directory DONT_FOLLOW = 0x2000000 #: don't follow a sym link EXCL_UNLINK = 0x4000000 #: exclude events on unlinked objects MASK_ADD = 0x20000000 #: add to the mask of an already existing watch ISDIR = 0x40000000 #: event occurred against dir ONESHOT = 0x80000000 #: only send event once _n = "libc.{}".format("so.6" if platform.system() != "Darwin" else "dylib") _libc = ctypes.CDLL(_n, use_errno=True) event_prefix = struct.Struct("iIII") @staticmethod def _call(method, *args): """ Wrapper to all calls to C functions. Raises OSError with appropriate errno as argument in case of error return value. :param method: method to call :param args: method args :return: called function return value in case of success """ ret = getattr(Inotify._libc, method)(*args) if ret == -1: errno = ctypes.get_errno() raise OSError(errno, os.strerror(errno)) return ret @staticmethod def init(): """ Initialize an inotify instance. See `man inotify_init` for details :return: a file descriptor of new inotify instance """ return Inotify._call("inotify_init") @staticmethod def add_watch(fd, path, mask): """ Add a watch to an initialized inotify instance. This method is idempotent. If called twice with the same :fd: and :path: and different mask, will change watch flags of current watch. See `man inotify_add_watch` for details :param fd: file descriptor returned by `init()` :param path: path to file or directory to watch :param mask: bitmask of events to monitor :return: file descriptor of watch """ return Inotify._call("inotify_add_watch", fd, path, mask) @staticmethod def rm_watch(fd, wd): """ Remove existing watch from inotify instance. :param fd: file descriptor of inotify instance :param wd: watch file descriptor, returned by `add_watch()` :return: zero """ return Inotify._call("inotify_rm_watch", fd, wd) @staticmethod def unpack_prefix(data): """ Unpacks prefix of event struct. See `man inotify` for details :param data: struct bytestring :return: tuple of (wd, flag, cookie, length) """ return Inotify.event_prefix.unpack(data) @staticmethod def unpack_name(data): """ Unpack name field of inotify event struct See `man inotify` for details :param data: struct bytestring :return: name string """ return struct.unpack("%ds" % len(data), data)[0].rstrip(b"\x00") class Watcher: """ Asynchronous watcher for inotify events """ _CHUNK_SIZE = 1024 _MAX_WATCH_RETRIES = 3 _WATCHERS_RAISE_COEFF = 1.5 _MAX_USER_WATCHES = "fs.inotify.max_user_watches" def __init__(self, loop, coro_callback=None): self._loop = loop self._fd = Inotify.init() self._queue = asyncio.Queue() self._callback = coro_callback or self._queue.put self._loop.add_reader(self._fd, self._read) self._reset_state() def _reset_state(self): self.paths = {} self.descriptors = {} self.buf = b"" def _read(self): self.buf += os.read(self._fd, self._CHUNK_SIZE) # shortcut struct_size = Inotify.event_prefix.size while len(self.buf) >= struct_size: wd, flags, cookie, length = Inotify.unpack_prefix( self.buf[:struct_size] ) struct_end = struct_size + length name = Inotify.unpack_name(self.buf[struct_size:struct_end]) self.buf = self.buf[struct_end:] if wd not in self.paths: continue path = self.paths[wd] if flags & Inotify.IGNORED: logger.warning( "Got IGNORED event for %s, cleaning watch", path ) self._cleanup_watch(path) continue if flags & Inotify.Q_OVERFLOW: logger.error("Inotify queue overflow") continue ev = Event(path, flags, cookie, name, wd) self._loop.create_task(self._callback(ev)) def _raise_user_watches(self): current_max_watches = sysctl.read(self._MAX_USER_WATCHES) new_max_watchers = current_max_watches + int( current_max_watches * self._WATCHERS_RAISE_COEFF ) logger.info( "Raising %s to %s", self._MAX_USER_WATCHES, new_max_watchers ) sysctl.write(self._MAX_USER_WATCHES, new_max_watchers) def close(self): """ Close watcher. Close inotify fd, remove reader and reset state :return: """ self._loop.remove_reader(self._fd) try: os.close(self._fd) finally: self._reset_state() self._fd = None def watch(self, path, mask): """ Add file to watch :param path: file or directory to watch :param mask: events mask for this watch """ assert isinstance(path, bytes), "Path must be bytes" logger.info("Watching %r", path) retries = 0 while True: try: wd = Inotify.add_watch(self._fd, path, mask) self.paths[wd] = path self.descriptors[path] = wd break except OSError as e: if ( retries < self._MAX_WATCH_RETRIES and e.errno == errno.ENOSPC ): self._raise_user_watches() retries += 1 logger.warning( "Inotify: not enough watches (%r), retrying...", path ) continue logger.error("Inotify failed while watching %r", path) raise def _cleanup_watch(self, path): descriptor = self.descriptors.pop(path, None) if descriptor is not None: self.paths.pop(descriptor, None) def unwatch(self, path): """ Remove file or directory from watch :param path: file or directory to remove watch from """ if path not in self.descriptors: return logger.info("Stop watching %r", path) try: Inotify.rm_watch(self._fd, self.descriptors[path]) finally: self._cleanup_watch(path) async def get_event(self): """ Get watch event :return: `Event` named tuple """ event = await self._queue.get() logger.debug("Inotify event: %s", event) return event