Add watch option

This commit is contained in:
Corentin 2022-11-02 19:39:10 +09:00
commit 64c31f6211
5 changed files with 358 additions and 153 deletions

View file

@ -11,6 +11,7 @@ class Config:
APPS = ['app_name'] # Output binaries (path to source without extension) APPS = ['app_name'] # Output binaries (path to source without extension)
IGNORE_APPS = [] IGNORE_APPS = []
JOB_COUNT = int(os.cpu_count() * 0.8) # Concurent jobs (multi-processing) JOB_COUNT = int(os.cpu_count() * 0.8) # Concurent jobs (multi-processing)
WATCH = False # Watch source modification for auto-compiling
BIN_DIR = Path('bin') # Output directory (binaries) BIN_DIR = Path('bin') # Output directory (binaries)
INCLUDE_DIR = Path('include') # Include directory (header files) INCLUDE_DIR = Path('include') # Include directory (header files)

View file

@ -1,7 +1,7 @@
MAKE_PATH="." MAKE_PATH="."
UMAKE_PATH="umake" UMAKE_PATH="umake"
.PHONY: all clean .PHONY: all debug clean
all: all:
@PYTHONPATH=$(UMAKE_PATH) python3 $(MAKE_PATH)/make.py @PYTHONPATH=$(UMAKE_PATH) python3 $(MAKE_PATH)/make.py

View file

@ -22,4 +22,4 @@ disable=missing-module-docstring, missing-function-docstring, missing-class-docs
min-similarity-lines=6 min-similarity-lines=6
[pydocstyle] [pydocstyle]
ignore=D10,D203,D204 ignore=D10,D203,D204

343
umake.py
View file

@ -15,6 +15,7 @@ class Config:
APPS = ['app_name'] # Output binaries (need to be found as .cpp directly in SOURCE_DIR) APPS = ['app_name'] # Output binaries (need to be found as .cpp directly in SOURCE_DIR)
IGNORE_APPS = [] IGNORE_APPS = []
JOB_COUNT = int(os.cpu_count() * 0.8) # Concurent jobs (multi-processing) JOB_COUNT = int(os.cpu_count() * 0.8) # Concurent jobs (multi-processing)
WATCH = False # Watch source modification for auto-compiling
BIN_DIR = Path('bin') # Output directory (binaries) BIN_DIR = Path('bin') # Output directory (binaries)
INCLUDE_DIR = Path('include') # Include directory (header files) INCLUDE_DIR = Path('include') # Include directory (header files)
@ -82,162 +83,214 @@ def make(config: Config):
config.COMMON_FLAGS += ' ' + config.COMMON_RELEASE_FLAGS config.COMMON_FLAGS += ' ' + config.COMMON_RELEASE_FLAGS
config.OBJECT_DIR = config.OBJECT_DIR / 'release' config.OBJECT_DIR = config.OBJECT_DIR / 'release'
# Create list of source to process (tuple[source_path, object_path]) # Update job count
compile_list = [(Path(source_file), config.JOB_COUNT = arguments.j
config.OBJECT_DIR / source_file.parent.relative_to(config.SOURCE_DIR) / (source_file.stem + '.o'))
for source_file in config.CPP_SOURCES]
todo_list: list[tuple[Path, str]] = []
error_paths: list[tuple[Path, str]] = []
hash_dict = {} Builder(config).make()
if (config.OBJECT_DIR / 'hash.json').exists():
with open(config.OBJECT_DIR / 'hash.json', 'r', encoding='utf-8') as hash_file:
hash_dict = json.loads(hash_file.read())
# Get source dependencies
dependency_dict = {}
for source_path, object_path in compile_list:
cmd = ' '.join([config.CC, config.COMMON_FLAGS, config.COMPILE_FLAGS, str(source_path), '-M'])
job = subprocess.run(cmd, check=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
universal_newlines=True, shell=True)
if job.returncode != 0:
error_paths.append((source_path, job.stdout + job.stderr))
break
dependency_dict[str(source_path)] = [
line for line in job.stdout.split('.o: ')[1].replace('\n', '').replace('\\ ', '').split(' ') if line]
if error_paths:
for error_path, error_text in error_paths:
print(f'{ConsoleColor.RED}Error checking dependencies for {error_path}:'
f'\n{ConsoleColor.ENDCOLOR + error_text}')
with open(config.OBJECT_DIR / 'hash.json', 'w', encoding='utf-8') as hash_file:
hash_file.write(json.dumps(hash_dict, indent=1))
sys.exit(1)
# Check source file and generate compilation commands to execute class Builder:
for source_path, object_path in compile_list: def __init__(self, config: Config):
cmd = ' '.join([config.CC, config.COMMON_FLAGS, config.COMPILE_FLAGS, self._config = config
str(source_path), '-c', '-o', str(object_path)])
if not object_path.parent.exists():
object_path.parent.mkdir(parents=True)
source_hash = get_hash(source_path)
dependency_changed = False
for dependency_path in dependency_dict[str(source_path)]:
dependency_hash = get_hash(dependency_path)
if str(dependency_path) not in hash_dict or hash_dict[str(dependency_path)] != dependency_hash:
dependency_changed = True
print(f'{ConsoleColor.ORANGE}Dependency changed for {source_path} :'
f' {dependency_path} {ConsoleColor.ENDCOLOR}')
break
if (dependency_changed or not object_path.exists()
or str(source_path) not in hash_dict or hash_dict[str(source_path)] != source_hash):
todo_list.append((source_path, cmd))
continue
if not todo_list and all( # Create list of source to process (tuple[source_path, object_path])
[(config.BIN_DIR / app_path).exists() for app_path in config.APPS if app_path not in config.IGNORE_APPS]): self._compile_dict: dict[Path, Path] = {
print(f'{ConsoleColor.GREEN}Nothing to do{ConsoleColor.ENDCOLOR}') Path(source_file): (self._config.OBJECT_DIR / source_file.parent.relative_to(self._config.SOURCE_DIR)
return / (source_file.stem + '.o'))
for source_file in self._config.CPP_SOURCES}
self._todo_dict: dict[Path, str] = {}
# Running compilation processes self._hash_dict: dict[str, str] = {}
if not os.path.exists(config.OBJECT_DIR): if (self._config.OBJECT_DIR / 'hash.json').exists():
os.makedirs(config.OBJECT_DIR) with open(self._config.OBJECT_DIR / 'hash.json', 'r', encoding='utf-8') as hash_file:
if arguments.j > 1: # Multi-process self._hash_dict = json.loads(hash_file.read())
jobs: list[tuple[Path, subprocess.Popen]] = []
for source_path, cmd in todo_list: # Get source dependencies
print(ConsoleColor.BLUE + cmd + ConsoleColor.ENDCOLOR) error_paths: list[tuple[Path, str]] = []
jobs.insert(0, (source_path, subprocess.Popen( self._dependency_dict: dict[str, list[str]] = {}
cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True, for source_path, _object_path in self._compile_dict.items():
shell=True))) # FIFO style (will be poped) cmd = ' '.join(
if len(jobs) >= arguments.j: # If jobs count is maxed we wait for the oldest one to finished [self._config.CC, self._config.COMMON_FLAGS, self._config.COMPILE_FLAGS, str(source_path), '-M'])
job_path, oldest_job = jobs.pop()
oldest_job.wait()
if oldest_job.returncode != 0:
error_paths.append((job_path, oldest_job.stdout.read() + oldest_job.stderr.read()))
break
# Update hash if no error
for dependency_path in dependency_dict[str(job_path)]:
hash_dict[str(dependency_path)] = get_hash(dependency_path)
del source_path
for job_path, job in jobs: # Wait the last jobs to finish
job.wait()
if job.returncode != 0:
error_paths.append((job_path, job.stdout.read() + job.stderr.read()))
else:
# Update hash if no error
for dependency_path in dependency_dict[str(job_path)]:
hash_dict[str(dependency_path)] = get_hash(dependency_path)
else: # Single-process
for source_path, cmd in todo_list:
print(ConsoleColor.BLUE + cmd + ConsoleColor.ENDCOLOR)
job = subprocess.run(cmd, check=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE, job = subprocess.run(cmd, check=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
universal_newlines=True, shell=True) universal_newlines=True, shell=True)
if job.returncode != 0: if job.returncode != 0:
error_paths.append((source_path, job.stdout + job.stderr)) error_paths.append((source_path, job.stdout + job.stderr))
break break
# Update hash if no error self._dependency_dict[str(source_path)] = [
for dependency_path in dependency_dict[str(source_path)]: line for line in job.stdout.split('.o: ')[1].replace('\n', '').replace('\\ ', '').split(' ') if line]
hash_dict[str(dependency_path)] = get_hash(dependency_path) if error_paths:
del source_path for error_path, error_text in error_paths:
if error_paths: print(f'{ConsoleColor.RED}Error checking dependencies for {error_path}:'
for error_path, error_text in error_paths: f'\n{ConsoleColor.ENDCOLOR + error_text}')
print(ConsoleColor.RED + f'Error compiling {error_path}:\n' + ConsoleColor.ENDCOLOR + error_text) with open(self._config.OBJECT_DIR / 'hash.json', 'w', encoding='utf-8') as hash_file:
with open(config.OBJECT_DIR / 'hash.json', 'w', encoding='utf-8') as hash_file: hash_file.write(json.dumps(self._hash_dict, indent=1))
hash_file.write(json.dumps(hash_dict, indent=1)) sys.exit(1)
sys.exit(1)
# Running linking processes def make(self):
if not config.BIN_DIR.exists(): self._populate_todo_dict()
config.BIN_DIR.mkdir(parents=True) self._compile_sources()
all_app_objects = [config.OBJECT_DIR / Path(app_path).parent / (Path(app_path).stem + '.o')
for app_path in config.APPS] if self._config.WATCH:
if arguments.j > 1: # Multi-process from watch import Watcher, WatchFlag # pylint: disable=import-outside-toplevel
jobs: list[tuple[Path, subprocess.Popen]] = []
for app_path, app_object_path in zip(config.APPS, all_app_objects): def callback(path: Path, _flags: WatchFlag):
if app_path in config.IGNORE_APPS: source_hash = get_hash(path)
continue if source_hash != self._hash_dict[str(path)]:
bin_path = config.BIN_DIR / app_path print(f'{ConsoleColor.ORANGE}Source file changed : {path} {ConsoleColor.ENDCOLOR}')
if not bin_path.parent.exists(): self._populate_todo_dict()
bin_path.parent.mkdir(parents=True) self._compile_sources()
object_files = [str(object_path) for _, object_path in compile_list
if object_path not in all_app_objects] watcher = Watcher()
object_files.append(str(app_object_path)) for source_path in self._compile_dict:
cmd = ' '.join([config.CC, config.COMMON_FLAGS, *object_files, '-o', str(bin_path), config.LINK_FLAGS]) watcher.register(source_path, WatchFlag.MODIFY, callback)
print(ConsoleColor.BLUE + cmd + ConsoleColor.ENDCOLOR) try:
jobs.insert(0, (app_path, subprocess.Popen( watcher.watch()
cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True, except KeyboardInterrupt:
shell=True))) # FIFO style (will be poped) print('\rExit')
if len(jobs) >= arguments.j: # If jobs count is maxed we wait for the oldest one to finished
app_path, oldest_job = jobs.pop() def _populate_todo_dict(self):
oldest_job.wait() """Check source file and generate compilation commands to execute."""
if oldest_job.returncode != 0: for source_path, object_path in self._compile_dict.items():
error_paths.append((app_path, oldest_job.stdout.read() + oldest_job.stderr.read())) if not object_path.parent.exists():
object_path.parent.mkdir(parents=True)
source_hash = get_hash(source_path)
dependency_changed = False
for path in self._dependency_dict[str(source_path)]:
dependency_hash = get_hash(path)
if str(path) not in self._hash_dict or self._hash_dict[str(path)] != dependency_hash:
dependency_changed = True
print(f'{ConsoleColor.ORANGE}Dependency changed for {source_path} : {path} {ConsoleColor.ENDCOLOR}')
break break
for job_path, job in jobs: # Wait the last jobs to finish if (dependency_changed or not object_path.exists()
job.wait() or str(source_path) not in self._hash_dict or self._hash_dict[str(source_path)] != source_hash):
if job.returncode != 0: self._todo_dict[source_path] = self._generate_compilation_command(source_path, object_path)
error_paths.append((job_path, job.stdout.read() + job.stderr.read()))
else: # Single-process
for app_path, app_object_path in zip(config.APPS, all_app_objects):
if app_path in config.IGNORE_APPS:
continue continue
bin_path = config.BIN_DIR / app_path
if not bin_path.parent.exists():
bin_path.parent.mkdir(parents=True)
object_files = [str(object_path) for _, object_path in compile_list
if object_path not in all_app_objects]
object_files.append(str(app_object_path))
cmd = ' '.join([config.CC, config.COMMON_FLAGS, *object_files, '-o', str(bin_path), config.LINK_FLAGS])
print(ConsoleColor.BLUE + cmd + ConsoleColor.ENDCOLOR)
job = subprocess.run(cmd, check=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
universal_newlines=True, shell=True)
if job.returncode != 0:
error_paths.append((app_path, job.stdout + job.stderr))
if error_paths:
for error_path, error_text in error_paths:
print(ConsoleColor.RED + f'Error linking {error_path}:\n' + ConsoleColor.ENDCOLOR + error_text)
with open(os.path.join(config.OBJECT_DIR, 'hash.json'), 'w', encoding='utf-8') as hash_file:
hash_file.write(json.dumps(hash_dict, indent=1))
sys.exit(1)
with open(config.OBJECT_DIR / 'hash.json', 'w', encoding='utf-8') as hash_file: def _generate_compilation_command(self, source_path: Path, object_path: Path) -> str:
hash_file.write(json.dumps(hash_dict, indent=1)) return ' '.join([self._config.CC, self._config.COMMON_FLAGS, self._config.COMPILE_FLAGS,
str(source_path), '-c', '-o', str(object_path)])
def _compile_sources(self):
if not self._todo_dict and all(
(self._config.BIN_DIR / app_path).exists()
for app_path in self._config.APPS if app_path not in self._config.IGNORE_APPS):
print(f'{ConsoleColor.GREEN}Nothing to do{ConsoleColor.ENDCOLOR}')
return
# Running compilation processes
if not os.path.exists(self._config.OBJECT_DIR):
os.makedirs(self._config.OBJECT_DIR)
error_paths: list[tuple[Path, str]] = []
if self._config.JOB_COUNT > 1: # Multi-process
jobs: list[tuple[Path, subprocess.Popen]] = []
completed_paths: list[Path] = []
for source_path, cmd in self._todo_dict.items():
print(ConsoleColor.BLUE + cmd + ConsoleColor.ENDCOLOR)
jobs.insert(0, (source_path, subprocess.Popen(
cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True,
shell=True))) # FIFO style (will be poped)
if len(jobs) >= self._config.JOB_COUNT: # If jobs count is maxed we wait for the oldest one to finished
job_path, oldest_job = jobs.pop()
oldest_job.wait()
if oldest_job.returncode != 0:
error_paths.append((job_path, oldest_job.stdout.read() + oldest_job.stderr.read()))
break
# Update hash if no error
for dependency_path in self._dependency_dict[str(job_path)]:
self._hash_dict[str(dependency_path)] = get_hash(dependency_path)
completed_paths.append(source_path)
for source_path in completed_paths:
del self._todo_dict[source_path]
for job_path, job in jobs: # Wait the last jobs to finish
job.wait()
if job.returncode != 0:
error_paths.append((job_path, job.stdout.read() + job.stderr.read()))
else:
# Update hash if no error
for dependency_path in self._dependency_dict[str(job_path)]:
self._hash_dict[str(dependency_path)] = get_hash(dependency_path)
del self._todo_dict[job_path]
else: # Single-process
completed_paths: list[Path] = []
for source_path, cmd in self._todo_dict.items():
print(ConsoleColor.BLUE + cmd + ConsoleColor.ENDCOLOR)
job = subprocess.run(cmd, check=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
universal_newlines=True, shell=True)
if job.returncode != 0:
error_paths.append((source_path, job.stdout + job.stderr))
break
# Update hash if no error
for dependency_path in self._dependency_dict[str(source_path)]:
self._hash_dict[str(dependency_path)] = get_hash(dependency_path)
completed_paths.append(source_path)
for source_path in completed_paths:
del self._todo_dict[source_path]
if error_paths:
for error_path, error_text in error_paths:
print(ConsoleColor.RED + f'Error compiling {error_path}:\n' + ConsoleColor.ENDCOLOR + error_text)
with open(self._config.OBJECT_DIR / 'hash.json', 'w', encoding='utf-8') as hash_file:
hash_file.write(json.dumps(self._hash_dict, indent=1))
sys.exit(1)
# Running linking processes
if not self._config.BIN_DIR.exists():
self._config.BIN_DIR.mkdir(parents=True)
all_app_objects = [self._config.OBJECT_DIR / Path(app_path).parent / (Path(app_path).stem + '.o')
for app_path in self._config.APPS]
if self._config.JOB_COUNT > 1: # Multi-process
jobs: list[tuple[Path, subprocess.Popen]] = []
for app_path, app_object_path in zip(self._config.APPS, all_app_objects):
if app_path in self._config.IGNORE_APPS:
continue
bin_path = self._config.BIN_DIR / app_path
if not bin_path.parent.exists():
bin_path.parent.mkdir(parents=True)
object_files = [str(object_path) for object_path in self._compile_dict.values()
if object_path not in all_app_objects]
object_files.append(str(app_object_path))
cmd = ' '.join([self._config.CC, self._config.COMMON_FLAGS, *object_files, '-o', str(bin_path),
self._config.LINK_FLAGS])
print(ConsoleColor.BLUE + cmd + ConsoleColor.ENDCOLOR)
jobs.insert(0, (app_path, subprocess.Popen(
cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True,
shell=True))) # FIFO style (will be poped)
if len(jobs) >= self._config.JOB_COUNT: # If jobs count is maxed we wait for the oldest one to finished
app_path, oldest_job = jobs.pop()
oldest_job.wait()
if oldest_job.returncode != 0:
error_paths.append((app_path, oldest_job.stdout.read() + oldest_job.stderr.read()))
break
for job_path, job in jobs: # Wait the last jobs to finish
job.wait()
if job.returncode != 0:
error_paths.append((job_path, job.stdout.read() + job.stderr.read()))
else: # Single-process
for app_path, app_object_path in zip(self._config.APPS, all_app_objects):
if app_path in self._config.IGNORE_APPS:
continue
bin_path = self._config.BIN_DIR / app_path
if not bin_path.parent.exists():
bin_path.parent.mkdir(parents=True)
object_files = [str(object_path) for object_path in self._compile_dict.values()
if object_path not in all_app_objects]
object_files.append(str(app_object_path))
cmd = ' '.join([self._config.CC, self._config.COMMON_FLAGS, *object_files, '-o', str(bin_path),
self._config.LINK_FLAGS])
print(ConsoleColor.BLUE + cmd + ConsoleColor.ENDCOLOR)
job = subprocess.run(cmd, check=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
universal_newlines=True, shell=True)
if job.returncode != 0:
error_paths.append((app_path, job.stdout + job.stderr))
if error_paths:
for error_path, error_text in error_paths:
print(ConsoleColor.RED + f'Error linking {error_path}:\n' + ConsoleColor.ENDCOLOR + error_text)
with open(os.path.join(self._config.OBJECT_DIR, 'hash.json'), 'w', encoding='utf-8') as hash_file:
hash_file.write(json.dumps(self._hash_dict, indent=1))
sys.exit(1)
with open(self._config.OBJECT_DIR / 'hash.json', 'w', encoding='utf-8') as hash_file:
hash_file.write(json.dumps(self._hash_dict, indent=1))
print(f'{ConsoleColor.GREEN}Compilation done{ConsoleColor.ENDCOLOR}')

151
watch.py Normal file
View file

@ -0,0 +1,151 @@
import ctypes
import ctypes.util
from enum import IntFlag
import os
from pathlib import Path
import select
from struct import unpack, calcsize
from typing import Callable
class WatchFlag(IntFlag):
# Supported events suitable for MASK parameter of INOTIFY_ADD_WATCH.
ACCESS = 0x00000001 # File was accessed
MODIFY = 0x00000002 # File was modified
ATTRIB = 0x00000004 # Metadata changed
CLOSE_WRITE = 0x00000008 # Writtable file was closed
CLOSE_NOWRITE = 0x00000010 # Unwrittable file closed
# CLOSE = (CLOSE_WRITE | CLOSE_NOWRITE) # Close
OPEN = 0x00000020 # File was opened
MOVED_FROM = 0x00000040 # File was moved from X
MOVED_TO = 0x00000080 # File was moved to Y
# MOVE = (MOVED_FROM | MOVED_TO) # Moves
CREATE = 0x00000100 # Subfile was created
DELETE = 0x00000200 # Subfile was deleted
DELETE_SELF = 0x00000400 # Self was deleted
MOVE_SELF = 0x00000800 # Self was moved
# Events sent by the kernel.
UNMOUNT = 0x00002000 # Backing fs was unmounted
Q_OVERFLOW = 0x00004000 # Event queued overflowed
IGNORED = 0x00008000 # File was ignored
# Helper events.
# CLOSE = (CLOSE_WRITE | CLOSE_NOWRITE) # Close
# MOVE = (MOVED_FROM | MOVED_TO) # Moves
# Special flags.
ONLYDIR = 0x01000000 # Only watch the path if it is a directory
DONT_FOLLOW = 0x02000000 # Do not follow a sym link
EXCL_UNLINK = 0x04000000 # Exclude events on unlinked objects
MASK_CREATE = 0x10000000 # Only create watches
MASK_ADD = 0x20000000 # Add to the mask of an already existing watch
ISDIR = 0x40000000 # Event occurred against dir
ONESHOT = 0x80000000 # Only send event once
# All events which a program can wait on.
# ALL_EVENTS = (
# ACCESS | MODIFY | ATTRIB | CLOSE_WRITE | CLOSE_NOWRITE | OPEN | MOVED_FROM | MOVED_TO
# | CREATE | DELETE | DELETE_SELF | MOVE_SELF)
class InotifyError(Exception):
def __init__(self, message, *args, **kwargs):
message += f' ERRNO=({ctypes.get_errno()})'
super().__init__(message, *args, **kwargs)
WatcherCallback = Callable[[Path, WatchFlag], None]
class Watcher:
_EVENT_FORMAT = 'iIII'
_EVENT_SIZE = calcsize(_EVENT_FORMAT)
def __init__(self):
libc_path = ctypes.util.find_library('c')
if libc_path is None:
libc_path = 'libc.so.6'
self._instance = ctypes.cdll.LoadLibrary(libc_path)
self._inotify_init: ctypes.CDLL._FuncPtr = self._instance.inotify_init
self._inotify_init.argtypes = []
self._inotify_init.restype = self._check_nonnegative
self._inotify_add_watch: ctypes.CDLL._FuncPtr = self._instance.inotify_add_watch
self._inotify_add_watch.argtypes = [ctypes.c_int, ctypes.c_char_p, ctypes.c_uint32]
self._inotify_add_watch.restype = self._check_nonnegative
self._inotify_rm_watch: ctypes.CDLL._FuncPtr = self._instance.inotify_rm_watch
self._inotify_rm_watch.argtypes = [ctypes.c_int, ctypes.c_int]
self._inotify_rm_watch.restype = self._check_nonnegative
self._inotify_fd = self._inotify_init()
self._poll = select.poll()
self._poll.register(self._inotify_fd, select.POLLIN)
self._watch_fds: dict[Path, int] = {}
self._watch_info: dict[int, tuple[Path, WatcherCallback]] = {}
def register(self, path: Path, flags: WatchFlag, callback: WatcherCallback):
if path in self._watch_fds:
self._inotify_rm_watch(self._inotify_fd, self._watch_fds[path])
watch_fd = self._inotify_add_watch(self._inotify_fd, str(path).encode(), flags)
self._watch_fds[path] = watch_fd
self._watch_info[watch_fd] = (path, callback)
def unregister(self, path: Path):
watch_fd = self._watch_fds[path]
self._inotify_rm_watch(self._inotify_fd, watch_fd)
del self._watch_fds[path]
del self._watch_info[watch_fd]
def watch(self):
while True:
results = self._poll.poll()
for poll_fd, _event in results:
watch_buffer = os.read(poll_fd, self._EVENT_SIZE)
if len(watch_buffer) < self._EVENT_SIZE:
continue
watch_fd, mask, _cookie, _namesize = unpack(self._EVENT_FORMAT, watch_buffer)
watch_path, callback = self._watch_info[watch_fd]
callback(watch_path, WatchFlag(mask))
def __del__(self):
self._poll.unregister(self._inotify_fd)
for _watch_path, watch_fd in self._watch_fds.items():
self._inotify_rm_watch(self._inotify_fd, watch_fd)
os.close(self._inotify_fd)
@staticmethod
def _check_nonnegative(result):
if result == -1:
raise InotifyError(f'Call failed (should not be -1): {result}')
return result
def main():
from argparse import ArgumentParser # pylint: disable=import-outside-toplevel
parser = ArgumentParser()
parser.add_argument('paths', nargs='*', type=Path)
arguments = parser.parse_args()
paths: list[Path] = arguments.paths
def callback(path: Path, _flags: WatchFlag):
print(f'{path} has been modified')
watcher = Watcher()
for path in paths:
watcher.register(path, WatchFlag.MODIFY, callback)
try:
watcher.watch()
except KeyboardInterrupt:
print('\rExit')
if __name__ == '__main__':
main()