"""
This program is free software: you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License,
or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see .
Copyright © 2019 Cloud Linux Software Inc.
This software is also available under ImunifyAV commercial license,
see
"""
from collections import namedtuple
import asyncio
import ctypes
import errno
import logging
import os
import struct
import platform
from defence360agent.subsys import sysctl
Event = namedtuple("Event", ("path", "flags", "cookie", "name", "wd"))
logger = logging.getLogger(__name__)
class Inotify:
"""
Tiny wrapper for inotify api. See `man inotify` for details
"""
ACCESS = 0x1 #: File was accessed
MODIFY = 0x2 #: File was modified
ATTRIB = 0x4 #: Metadata changed
CLOSE_WRITE = 0x8 #: Writable file was closed
CLOSE_NOWRITE = 0x10 #: Unwritable file closed
OPEN = 0x20 #: File was opened
MOVED_FROM = 0x40 #: File was moved from X
MOVED_TO = 0x80 #: File was moved to Y
CREATE = 0x100 #: Subfile was created
DELETE = 0x200 #: Subfile was deleted
DELETE_SELF = 0x400 #: Self was deleted
MOVE_SELF = 0x800 #: Self was moved
UNMOUNT = 0x2000 #: Backing fs was unmounted
Q_OVERFLOW = 0x4000 #: Event queue overflowed
IGNORED = 0x8000 #: File was ignored
ONLYDIR = 0x1000000 #: only watch the path if it is a directory
DONT_FOLLOW = 0x2000000 #: don't follow a sym link
EXCL_UNLINK = 0x4000000 #: exclude events on unlinked objects
MASK_ADD = 0x20000000 #: add to the mask of an already existing watch
ISDIR = 0x40000000 #: event occurred against dir
ONESHOT = 0x80000000 #: only send event once
_n = "libc.{}".format("so.6" if platform.system() != "Darwin" else "dylib")
_libc = ctypes.CDLL(_n, use_errno=True)
event_prefix = struct.Struct("iIII")
@staticmethod
def _call(method, *args):
"""
Wrapper to all calls to C functions. Raises OSError with appropriate
errno as argument in case of error return value.
:param method: method to call
:param args: method args
:return: called function return value in case of success
"""
ret = getattr(Inotify._libc, method)(*args)
if ret == -1:
errno = ctypes.get_errno()
raise OSError(errno, os.strerror(errno))
return ret
@staticmethod
def init():
"""
Initialize an inotify instance.
See `man inotify_init` for details
:return: a file descriptor of new inotify instance
"""
return Inotify._call("inotify_init")
@staticmethod
def add_watch(fd, path, mask):
"""
Add a watch to an initialized inotify instance. This method is
idempotent. If called twice with the same :fd: and :path: and
different mask, will change watch flags of current watch.
See `man inotify_add_watch` for details
:param fd: file descriptor returned by `init()`
:param path: path to file or directory to watch
:param mask: bitmask of events to monitor
:return: file descriptor of watch
"""
return Inotify._call("inotify_add_watch", fd, path, mask)
@staticmethod
def rm_watch(fd, wd):
"""
Remove existing watch from inotify instance.
:param fd: file descriptor of inotify instance
:param wd: watch file descriptor, returned by `add_watch()`
:return: zero
"""
return Inotify._call("inotify_rm_watch", fd, wd)
@staticmethod
def unpack_prefix(data):
"""
Unpacks prefix of event struct.
See `man inotify` for details
:param data: struct bytestring
:return: tuple of (wd, flag, cookie, length)
"""
return Inotify.event_prefix.unpack(data)
@staticmethod
def unpack_name(data):
"""
Unpack name field of inotify event struct
See `man inotify` for details
:param data: struct bytestring
:return: name string
"""
return struct.unpack("%ds" % len(data), data)[0].rstrip(b"\x00")
class Watcher:
"""
Asynchronous watcher for inotify events
"""
_CHUNK_SIZE = 1024
_MAX_WATCH_RETRIES = 3
_WATCHERS_RAISE_COEFF = 1.5
_MAX_USER_WATCHES = "fs.inotify.max_user_watches"
def __init__(self, loop, coro_callback=None):
self._loop = loop
self._fd = Inotify.init()
self._queue = asyncio.Queue()
self._callback = coro_callback or self._queue.put
self._loop.add_reader(self._fd, self._read)
self._reset_state()
def _reset_state(self):
self.paths = {}
self.descriptors = {}
self.buf = b""
def _read(self):
self.buf += os.read(self._fd, self._CHUNK_SIZE)
# shortcut
struct_size = Inotify.event_prefix.size
while len(self.buf) >= struct_size:
wd, flags, cookie, length = Inotify.unpack_prefix(
self.buf[:struct_size]
)
struct_end = struct_size + length
name = Inotify.unpack_name(self.buf[struct_size:struct_end])
self.buf = self.buf[struct_end:]
if wd not in self.paths:
continue
path = self.paths[wd]
if flags & Inotify.IGNORED:
logger.warning(
"Got IGNORED event for %s, cleaning watch", path
)
self._cleanup_watch(path)
continue
if flags & Inotify.Q_OVERFLOW:
logger.error("Inotify queue overflow")
continue
ev = Event(path, flags, cookie, name, wd)
self._loop.create_task(self._callback(ev))
def _raise_user_watches(self):
current_max_watches = sysctl.read(self._MAX_USER_WATCHES)
new_max_watchers = current_max_watches + int(
current_max_watches * self._WATCHERS_RAISE_COEFF
)
logger.info(
"Raising %s to %s", self._MAX_USER_WATCHES, new_max_watchers
)
sysctl.write(self._MAX_USER_WATCHES, new_max_watchers)
def close(self):
"""
Close watcher. Close inotify fd, remove reader and reset state
:return:
"""
self._loop.remove_reader(self._fd)
try:
os.close(self._fd)
finally:
self._reset_state()
self._fd = None
def watch(self, path, mask):
"""
Add file to watch
:param path: file or directory to watch
:param mask: events mask for this watch
"""
assert isinstance(path, bytes), "Path must be bytes"
logger.info("Watching %r", path)
retries = 0
while True:
try:
wd = Inotify.add_watch(self._fd, path, mask)
self.paths[wd] = path
self.descriptors[path] = wd
break
except OSError as e:
if (
retries < self._MAX_WATCH_RETRIES
and e.errno == errno.ENOSPC
):
self._raise_user_watches()
retries += 1
logger.warning(
"Inotify: not enough watches (%r), retrying...", path
)
continue
logger.error("Inotify failed while watching %r", path)
raise
def _cleanup_watch(self, path):
descriptor = self.descriptors.pop(path, None)
if descriptor is not None:
self.paths.pop(descriptor, None)
def unwatch(self, path):
"""
Remove file or directory from watch
:param path: file or directory to remove watch from
"""
if path not in self.descriptors:
return
logger.info("Stop watching %r", path)
try:
Inotify.rm_watch(self._fd, self.descriptors[path])
finally:
self._cleanup_watch(path)
async def get_event(self):
"""
Get watch event
:return: `Event` named tuple
"""
event = await self._queue.get()
logger.debug("Inotify event: %s", event)
return event