import ctypes import ctypes.util from enum import IntFlag import os from pathlib import Path import select from struct import unpack, calcsize from typing import Callable class WatchFlag(IntFlag): # Supported events suitable for MASK parameter of INOTIFY_ADD_WATCH. ACCESS = 0x00000001 # File was accessed MODIFY = 0x00000002 # File was modified ATTRIB = 0x00000004 # Metadata changed CLOSE_WRITE = 0x00000008 # Writtable file was closed CLOSE_NOWRITE = 0x00000010 # Unwrittable file closed # CLOSE = (CLOSE_WRITE | CLOSE_NOWRITE) # Close OPEN = 0x00000020 # File was opened MOVED_FROM = 0x00000040 # File was moved from X MOVED_TO = 0x00000080 # File was moved to Y # MOVE = (MOVED_FROM | MOVED_TO) # Moves CREATE = 0x00000100 # Subfile was created DELETE = 0x00000200 # Subfile was deleted DELETE_SELF = 0x00000400 # Self was deleted MOVE_SELF = 0x00000800 # Self was moved # Events sent by the kernel. UNMOUNT = 0x00002000 # Backing fs was unmounted Q_OVERFLOW = 0x00004000 # Event queued overflowed IGNORED = 0x00008000 # File was ignored # Helper events. # CLOSE = (CLOSE_WRITE | CLOSE_NOWRITE) # Close # MOVE = (MOVED_FROM | MOVED_TO) # Moves # Special flags. ONLYDIR = 0x01000000 # Only watch the path if it is a directory DONT_FOLLOW = 0x02000000 # Do not follow a sym link EXCL_UNLINK = 0x04000000 # Exclude events on unlinked objects MASK_CREATE = 0x10000000 # Only create watches MASK_ADD = 0x20000000 # Add to the mask of an already existing watch ISDIR = 0x40000000 # Event occurred against dir ONESHOT = 0x80000000 # Only send event once # All events which a program can wait on. # ALL_EVENTS = ( # ACCESS | MODIFY | ATTRIB | CLOSE_WRITE | CLOSE_NOWRITE | OPEN | MOVED_FROM | MOVED_TO # | CREATE | DELETE | DELETE_SELF | MOVE_SELF) class InotifyError(Exception): def __init__(self, message, *args, **kwargs): message += f' ERRNO=({ctypes.get_errno()})' super().__init__(message, *args, **kwargs) WatcherCallback = Callable[[Path, WatchFlag], None] class Watcher: _EVENT_FORMAT = 'iIII' _EVENT_SIZE = calcsize(_EVENT_FORMAT) def __init__(self): libc_path = ctypes.util.find_library('c') if libc_path is None: libc_path = 'libc.so.6' self._instance = ctypes.cdll.LoadLibrary(libc_path) self._inotify_init: ctypes.CDLL._FuncPtr = self._instance.inotify_init self._inotify_init.argtypes = [] self._inotify_init.restype = self._check_nonnegative self._inotify_add_watch: ctypes.CDLL._FuncPtr = self._instance.inotify_add_watch self._inotify_add_watch.argtypes = [ctypes.c_int, ctypes.c_char_p, ctypes.c_uint32] self._inotify_add_watch.restype = self._check_nonnegative self._inotify_rm_watch: ctypes.CDLL._FuncPtr = self._instance.inotify_rm_watch self._inotify_rm_watch.argtypes = [ctypes.c_int, ctypes.c_int] self._inotify_rm_watch.restype = self._check_nonnegative self._inotify_fd = self._inotify_init() self._poll = select.poll() self._poll.register(self._inotify_fd, select.POLLIN) self._watch_fds: dict[Path, int] = {} self._watch_info: dict[int, tuple[Path, WatcherCallback]] = {} def register(self, path: Path, flags: WatchFlag, callback: WatcherCallback): if path in self._watch_fds: self._inotify_rm_watch(self._inotify_fd, self._watch_fds[path]) watch_fd = self._inotify_add_watch(self._inotify_fd, str(path).encode(), flags) self._watch_fds[path] = watch_fd self._watch_info[watch_fd] = (path, callback) def unregister(self, path: Path): watch_fd = self._watch_fds[path] self._inotify_rm_watch(self._inotify_fd, watch_fd) del self._watch_fds[path] del self._watch_info[watch_fd] def watch(self): while True: results = self._poll.poll() for poll_fd, _event in results: watch_buffer = os.read(poll_fd, self._EVENT_SIZE) if len(watch_buffer) < self._EVENT_SIZE: continue watch_fd, mask, _cookie, _namesize = unpack(self._EVENT_FORMAT, watch_buffer) watch_path, callback = self._watch_info[watch_fd] callback(watch_path, WatchFlag(mask)) def __del__(self): self._poll.unregister(self._inotify_fd) for _watch_path, watch_fd in self._watch_fds.items(): self._inotify_rm_watch(self._inotify_fd, watch_fd) os.close(self._inotify_fd) @staticmethod def _check_nonnegative(result): if result == -1: raise InotifyError(f'Call failed (should not be -1): {result}') return result def main(): from argparse import ArgumentParser # pylint: disable=import-outside-toplevel parser = ArgumentParser() parser.add_argument('paths', nargs='*', type=Path) arguments = parser.parse_args() paths: list[Path] = arguments.paths def callback(path: Path, _flags: WatchFlag): print(f'{path} has been modified') watcher = Watcher() for path in paths: watcher.register(path, WatchFlag.MODIFY, callback) try: watcher.watch() except KeyboardInterrupt: print('\rExit') if __name__ == '__main__': main()