umake/watch.py
2022-11-02 19:39:10 +09:00

151 lines
5.3 KiB
Python

import ctypes
import ctypes.util
from enum import IntFlag
import os
from pathlib import Path
import select
from struct import unpack, calcsize
from typing import Callable
class WatchFlag(IntFlag):
# Supported events suitable for MASK parameter of INOTIFY_ADD_WATCH.
ACCESS = 0x00000001 # File was accessed
MODIFY = 0x00000002 # File was modified
ATTRIB = 0x00000004 # Metadata changed
CLOSE_WRITE = 0x00000008 # Writtable file was closed
CLOSE_NOWRITE = 0x00000010 # Unwrittable file closed
# CLOSE = (CLOSE_WRITE | CLOSE_NOWRITE) # Close
OPEN = 0x00000020 # File was opened
MOVED_FROM = 0x00000040 # File was moved from X
MOVED_TO = 0x00000080 # File was moved to Y
# MOVE = (MOVED_FROM | MOVED_TO) # Moves
CREATE = 0x00000100 # Subfile was created
DELETE = 0x00000200 # Subfile was deleted
DELETE_SELF = 0x00000400 # Self was deleted
MOVE_SELF = 0x00000800 # Self was moved
# Events sent by the kernel.
UNMOUNT = 0x00002000 # Backing fs was unmounted
Q_OVERFLOW = 0x00004000 # Event queued overflowed
IGNORED = 0x00008000 # File was ignored
# Helper events.
# CLOSE = (CLOSE_WRITE | CLOSE_NOWRITE) # Close
# MOVE = (MOVED_FROM | MOVED_TO) # Moves
# Special flags.
ONLYDIR = 0x01000000 # Only watch the path if it is a directory
DONT_FOLLOW = 0x02000000 # Do not follow a sym link
EXCL_UNLINK = 0x04000000 # Exclude events on unlinked objects
MASK_CREATE = 0x10000000 # Only create watches
MASK_ADD = 0x20000000 # Add to the mask of an already existing watch
ISDIR = 0x40000000 # Event occurred against dir
ONESHOT = 0x80000000 # Only send event once
# All events which a program can wait on.
# ALL_EVENTS = (
# ACCESS | MODIFY | ATTRIB | CLOSE_WRITE | CLOSE_NOWRITE | OPEN | MOVED_FROM | MOVED_TO
# | CREATE | DELETE | DELETE_SELF | MOVE_SELF)
class InotifyError(Exception):
def __init__(self, message, *args, **kwargs):
message += f' ERRNO=({ctypes.get_errno()})'
super().__init__(message, *args, **kwargs)
WatcherCallback = Callable[[Path, WatchFlag], None]
class Watcher:
_EVENT_FORMAT = 'iIII'
_EVENT_SIZE = calcsize(_EVENT_FORMAT)
def __init__(self):
libc_path = ctypes.util.find_library('c')
if libc_path is None:
libc_path = 'libc.so.6'
self._instance = ctypes.cdll.LoadLibrary(libc_path)
self._inotify_init: ctypes.CDLL._FuncPtr = self._instance.inotify_init
self._inotify_init.argtypes = []
self._inotify_init.restype = self._check_nonnegative
self._inotify_add_watch: ctypes.CDLL._FuncPtr = self._instance.inotify_add_watch
self._inotify_add_watch.argtypes = [ctypes.c_int, ctypes.c_char_p, ctypes.c_uint32]
self._inotify_add_watch.restype = self._check_nonnegative
self._inotify_rm_watch: ctypes.CDLL._FuncPtr = self._instance.inotify_rm_watch
self._inotify_rm_watch.argtypes = [ctypes.c_int, ctypes.c_int]
self._inotify_rm_watch.restype = self._check_nonnegative
self._inotify_fd = self._inotify_init()
self._poll = select.poll()
self._poll.register(self._inotify_fd, select.POLLIN)
self._watch_fds: dict[Path, int] = {}
self._watch_info: dict[int, tuple[Path, WatcherCallback]] = {}
def register(self, path: Path, flags: WatchFlag, callback: WatcherCallback):
if path in self._watch_fds:
self._inotify_rm_watch(self._inotify_fd, self._watch_fds[path])
watch_fd = self._inotify_add_watch(self._inotify_fd, str(path).encode(), flags)
self._watch_fds[path] = watch_fd
self._watch_info[watch_fd] = (path, callback)
def unregister(self, path: Path):
watch_fd = self._watch_fds[path]
self._inotify_rm_watch(self._inotify_fd, watch_fd)
del self._watch_fds[path]
del self._watch_info[watch_fd]
def watch(self):
while True:
results = self._poll.poll()
for poll_fd, _event in results:
watch_buffer = os.read(poll_fd, self._EVENT_SIZE)
if len(watch_buffer) < self._EVENT_SIZE:
continue
watch_fd, mask, _cookie, _namesize = unpack(self._EVENT_FORMAT, watch_buffer)
watch_path, callback = self._watch_info[watch_fd]
callback(watch_path, WatchFlag(mask))
def __del__(self):
self._poll.unregister(self._inotify_fd)
for _watch_path, watch_fd in self._watch_fds.items():
self._inotify_rm_watch(self._inotify_fd, watch_fd)
os.close(self._inotify_fd)
@staticmethod
def _check_nonnegative(result):
if result == -1:
raise InotifyError(f'Call failed (should not be -1): {result}')
return result
def main():
from argparse import ArgumentParser # pylint: disable=import-outside-toplevel
parser = ArgumentParser()
parser.add_argument('paths', nargs='*', type=Path)
arguments = parser.parse_args()
paths: list[Path] = arguments.paths
def callback(path: Path, _flags: WatchFlag):
print(f'{path} has been modified')
watcher = Watcher()
for path in paths:
watcher.register(path, WatchFlag.MODIFY, callback)
try:
watcher.watch()
except KeyboardInterrupt:
print('\rExit')
if __name__ == '__main__':
main()