diff --git a/make.py b/make.py index 0de84ce..fb88c37 100644 --- a/make.py +++ b/make.py @@ -11,6 +11,7 @@ class Config: APPS = ['app_name'] # Output binaries (path to source without extension) IGNORE_APPS = [] JOB_COUNT = int(os.cpu_count() * 0.8) # Concurent jobs (multi-processing) + WATCH = False # Watch source modification for auto-compiling BIN_DIR = Path('bin') # Output directory (binaries) INCLUDE_DIR = Path('include') # Include directory (header files) diff --git a/makefile b/makefile index bbecf8f..4d5ae1c 100644 --- a/makefile +++ b/makefile @@ -1,7 +1,7 @@ MAKE_PATH="." UMAKE_PATH="umake" -.PHONY: all clean +.PHONY: all debug clean all: @PYTHONPATH=$(UMAKE_PATH) python3 $(MAKE_PATH)/make.py diff --git a/setup.cfg b/setup.cfg index 33684a9..c6cb7d1 100644 --- a/setup.cfg +++ b/setup.cfg @@ -22,4 +22,4 @@ disable=missing-module-docstring, missing-function-docstring, missing-class-docs min-similarity-lines=6 [pydocstyle] -ignore=D10,D203,D204 \ No newline at end of file +ignore=D10,D203,D204 diff --git a/umake.py b/umake.py index 1599a7d..ba5cafe 100755 --- a/umake.py +++ b/umake.py @@ -15,6 +15,7 @@ class Config: APPS = ['app_name'] # Output binaries (need to be found as .cpp directly in SOURCE_DIR) IGNORE_APPS = [] JOB_COUNT = int(os.cpu_count() * 0.8) # Concurent jobs (multi-processing) + WATCH = False # Watch source modification for auto-compiling BIN_DIR = Path('bin') # Output directory (binaries) INCLUDE_DIR = Path('include') # Include directory (header files) @@ -82,162 +83,214 @@ def make(config: Config): config.COMMON_FLAGS += ' ' + config.COMMON_RELEASE_FLAGS config.OBJECT_DIR = config.OBJECT_DIR / 'release' - # Create list of source to process (tuple[source_path, object_path]) - compile_list = [(Path(source_file), - config.OBJECT_DIR / source_file.parent.relative_to(config.SOURCE_DIR) / (source_file.stem + '.o')) - for source_file in config.CPP_SOURCES] - todo_list: list[tuple[Path, str]] = [] - error_paths: list[tuple[Path, str]] = [] + # Update job count + config.JOB_COUNT = arguments.j - hash_dict = {} - if (config.OBJECT_DIR / 'hash.json').exists(): - with open(config.OBJECT_DIR / 'hash.json', 'r', encoding='utf-8') as hash_file: - hash_dict = json.loads(hash_file.read()) + Builder(config).make() - # Get source dependencies - dependency_dict = {} - for source_path, object_path in compile_list: - cmd = ' '.join([config.CC, config.COMMON_FLAGS, config.COMPILE_FLAGS, str(source_path), '-M']) - job = subprocess.run(cmd, check=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE, - universal_newlines=True, shell=True) - if job.returncode != 0: - error_paths.append((source_path, job.stdout + job.stderr)) - break - dependency_dict[str(source_path)] = [ - line for line in job.stdout.split('.o: ')[1].replace('\n', '').replace('\\ ', '').split(' ') if line] - if error_paths: - for error_path, error_text in error_paths: - print(f'{ConsoleColor.RED}Error checking dependencies for {error_path}:' - f'\n{ConsoleColor.ENDCOLOR + error_text}') - with open(config.OBJECT_DIR / 'hash.json', 'w', encoding='utf-8') as hash_file: - hash_file.write(json.dumps(hash_dict, indent=1)) - sys.exit(1) - # Check source file and generate compilation commands to execute - for source_path, object_path in compile_list: - cmd = ' '.join([config.CC, config.COMMON_FLAGS, config.COMPILE_FLAGS, - str(source_path), '-c', '-o', str(object_path)]) - if not object_path.parent.exists(): - object_path.parent.mkdir(parents=True) - source_hash = get_hash(source_path) - dependency_changed = False - for dependency_path in dependency_dict[str(source_path)]: - dependency_hash = get_hash(dependency_path) - if str(dependency_path) not in hash_dict or hash_dict[str(dependency_path)] != dependency_hash: - dependency_changed = True - print(f'{ConsoleColor.ORANGE}Dependency changed for {source_path} :' - f' {dependency_path} {ConsoleColor.ENDCOLOR}') - break - if (dependency_changed or not object_path.exists() - or str(source_path) not in hash_dict or hash_dict[str(source_path)] != source_hash): - todo_list.append((source_path, cmd)) - continue +class Builder: + def __init__(self, config: Config): + self._config = config - if not todo_list and all( - [(config.BIN_DIR / app_path).exists() for app_path in config.APPS if app_path not in config.IGNORE_APPS]): - print(f'{ConsoleColor.GREEN}Nothing to do{ConsoleColor.ENDCOLOR}') - return + # Create list of source to process (tuple[source_path, object_path]) + self._compile_dict: dict[Path, Path] = { + Path(source_file): (self._config.OBJECT_DIR / source_file.parent.relative_to(self._config.SOURCE_DIR) + / (source_file.stem + '.o')) + for source_file in self._config.CPP_SOURCES} + self._todo_dict: dict[Path, str] = {} - # Running compilation processes - if not os.path.exists(config.OBJECT_DIR): - os.makedirs(config.OBJECT_DIR) - if arguments.j > 1: # Multi-process - jobs: list[tuple[Path, subprocess.Popen]] = [] - for source_path, cmd in todo_list: - print(ConsoleColor.BLUE + cmd + ConsoleColor.ENDCOLOR) - jobs.insert(0, (source_path, subprocess.Popen( - cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True, - shell=True))) # FIFO style (will be poped) - if len(jobs) >= arguments.j: # If jobs count is maxed we wait for the oldest one to finished - job_path, oldest_job = jobs.pop() - oldest_job.wait() - if oldest_job.returncode != 0: - error_paths.append((job_path, oldest_job.stdout.read() + oldest_job.stderr.read())) - break - # Update hash if no error - for dependency_path in dependency_dict[str(job_path)]: - hash_dict[str(dependency_path)] = get_hash(dependency_path) - del source_path - for job_path, job in jobs: # Wait the last jobs to finish - job.wait() - if job.returncode != 0: - error_paths.append((job_path, job.stdout.read() + job.stderr.read())) - else: - # Update hash if no error - for dependency_path in dependency_dict[str(job_path)]: - hash_dict[str(dependency_path)] = get_hash(dependency_path) - else: # Single-process - for source_path, cmd in todo_list: - print(ConsoleColor.BLUE + cmd + ConsoleColor.ENDCOLOR) + self._hash_dict: dict[str, str] = {} + if (self._config.OBJECT_DIR / 'hash.json').exists(): + with open(self._config.OBJECT_DIR / 'hash.json', 'r', encoding='utf-8') as hash_file: + self._hash_dict = json.loads(hash_file.read()) + + # Get source dependencies + error_paths: list[tuple[Path, str]] = [] + self._dependency_dict: dict[str, list[str]] = {} + for source_path, _object_path in self._compile_dict.items(): + cmd = ' '.join( + [self._config.CC, self._config.COMMON_FLAGS, self._config.COMPILE_FLAGS, str(source_path), '-M']) job = subprocess.run(cmd, check=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True, shell=True) if job.returncode != 0: error_paths.append((source_path, job.stdout + job.stderr)) break - # Update hash if no error - for dependency_path in dependency_dict[str(source_path)]: - hash_dict[str(dependency_path)] = get_hash(dependency_path) - del source_path - if error_paths: - for error_path, error_text in error_paths: - print(ConsoleColor.RED + f'Error compiling {error_path}:\n' + ConsoleColor.ENDCOLOR + error_text) - with open(config.OBJECT_DIR / 'hash.json', 'w', encoding='utf-8') as hash_file: - hash_file.write(json.dumps(hash_dict, indent=1)) - sys.exit(1) + self._dependency_dict[str(source_path)] = [ + line for line in job.stdout.split('.o: ')[1].replace('\n', '').replace('\\ ', '').split(' ') if line] + if error_paths: + for error_path, error_text in error_paths: + print(f'{ConsoleColor.RED}Error checking dependencies for {error_path}:' + f'\n{ConsoleColor.ENDCOLOR + error_text}') + with open(self._config.OBJECT_DIR / 'hash.json', 'w', encoding='utf-8') as hash_file: + hash_file.write(json.dumps(self._hash_dict, indent=1)) + sys.exit(1) - # Running linking processes - if not config.BIN_DIR.exists(): - config.BIN_DIR.mkdir(parents=True) - all_app_objects = [config.OBJECT_DIR / Path(app_path).parent / (Path(app_path).stem + '.o') - for app_path in config.APPS] - if arguments.j > 1: # Multi-process - jobs: list[tuple[Path, subprocess.Popen]] = [] - for app_path, app_object_path in zip(config.APPS, all_app_objects): - if app_path in config.IGNORE_APPS: - continue - bin_path = config.BIN_DIR / app_path - if not bin_path.parent.exists(): - bin_path.parent.mkdir(parents=True) - object_files = [str(object_path) for _, object_path in compile_list - if object_path not in all_app_objects] - object_files.append(str(app_object_path)) - cmd = ' '.join([config.CC, config.COMMON_FLAGS, *object_files, '-o', str(bin_path), config.LINK_FLAGS]) - print(ConsoleColor.BLUE + cmd + ConsoleColor.ENDCOLOR) - jobs.insert(0, (app_path, subprocess.Popen( - cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True, - shell=True))) # FIFO style (will be poped) - if len(jobs) >= arguments.j: # If jobs count is maxed we wait for the oldest one to finished - app_path, oldest_job = jobs.pop() - oldest_job.wait() - if oldest_job.returncode != 0: - error_paths.append((app_path, oldest_job.stdout.read() + oldest_job.stderr.read())) + def make(self): + self._populate_todo_dict() + self._compile_sources() + + if self._config.WATCH: + from watch import Watcher, WatchFlag # pylint: disable=import-outside-toplevel + + def callback(path: Path, _flags: WatchFlag): + source_hash = get_hash(path) + if source_hash != self._hash_dict[str(path)]: + print(f'{ConsoleColor.ORANGE}Source file changed : {path} {ConsoleColor.ENDCOLOR}') + self._populate_todo_dict() + self._compile_sources() + + watcher = Watcher() + for source_path in self._compile_dict: + watcher.register(source_path, WatchFlag.MODIFY, callback) + try: + watcher.watch() + except KeyboardInterrupt: + print('\rExit') + + def _populate_todo_dict(self): + """Check source file and generate compilation commands to execute.""" + for source_path, object_path in self._compile_dict.items(): + if not object_path.parent.exists(): + object_path.parent.mkdir(parents=True) + source_hash = get_hash(source_path) + dependency_changed = False + for path in self._dependency_dict[str(source_path)]: + dependency_hash = get_hash(path) + if str(path) not in self._hash_dict or self._hash_dict[str(path)] != dependency_hash: + dependency_changed = True + print(f'{ConsoleColor.ORANGE}Dependency changed for {source_path} : {path} {ConsoleColor.ENDCOLOR}') break - for job_path, job in jobs: # Wait the last jobs to finish - job.wait() - if job.returncode != 0: - error_paths.append((job_path, job.stdout.read() + job.stderr.read())) - else: # Single-process - for app_path, app_object_path in zip(config.APPS, all_app_objects): - if app_path in config.IGNORE_APPS: + if (dependency_changed or not object_path.exists() + or str(source_path) not in self._hash_dict or self._hash_dict[str(source_path)] != source_hash): + self._todo_dict[source_path] = self._generate_compilation_command(source_path, object_path) continue - bin_path = config.BIN_DIR / app_path - if not bin_path.parent.exists(): - bin_path.parent.mkdir(parents=True) - object_files = [str(object_path) for _, object_path in compile_list - if object_path not in all_app_objects] - object_files.append(str(app_object_path)) - cmd = ' '.join([config.CC, config.COMMON_FLAGS, *object_files, '-o', str(bin_path), config.LINK_FLAGS]) - print(ConsoleColor.BLUE + cmd + ConsoleColor.ENDCOLOR) - job = subprocess.run(cmd, check=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE, - universal_newlines=True, shell=True) - if job.returncode != 0: - error_paths.append((app_path, job.stdout + job.stderr)) - if error_paths: - for error_path, error_text in error_paths: - print(ConsoleColor.RED + f'Error linking {error_path}:\n' + ConsoleColor.ENDCOLOR + error_text) - with open(os.path.join(config.OBJECT_DIR, 'hash.json'), 'w', encoding='utf-8') as hash_file: - hash_file.write(json.dumps(hash_dict, indent=1)) - sys.exit(1) - with open(config.OBJECT_DIR / 'hash.json', 'w', encoding='utf-8') as hash_file: - hash_file.write(json.dumps(hash_dict, indent=1)) + def _generate_compilation_command(self, source_path: Path, object_path: Path) -> str: + return ' '.join([self._config.CC, self._config.COMMON_FLAGS, self._config.COMPILE_FLAGS, + str(source_path), '-c', '-o', str(object_path)]) + + def _compile_sources(self): + if not self._todo_dict and all( + (self._config.BIN_DIR / app_path).exists() + for app_path in self._config.APPS if app_path not in self._config.IGNORE_APPS): + print(f'{ConsoleColor.GREEN}Nothing to do{ConsoleColor.ENDCOLOR}') + return + + # Running compilation processes + if not os.path.exists(self._config.OBJECT_DIR): + os.makedirs(self._config.OBJECT_DIR) + error_paths: list[tuple[Path, str]] = [] + if self._config.JOB_COUNT > 1: # Multi-process + jobs: list[tuple[Path, subprocess.Popen]] = [] + completed_paths: list[Path] = [] + for source_path, cmd in self._todo_dict.items(): + print(ConsoleColor.BLUE + cmd + ConsoleColor.ENDCOLOR) + jobs.insert(0, (source_path, subprocess.Popen( + cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True, + shell=True))) # FIFO style (will be poped) + if len(jobs) >= self._config.JOB_COUNT: # If jobs count is maxed we wait for the oldest one to finished + job_path, oldest_job = jobs.pop() + oldest_job.wait() + if oldest_job.returncode != 0: + error_paths.append((job_path, oldest_job.stdout.read() + oldest_job.stderr.read())) + break + # Update hash if no error + for dependency_path in self._dependency_dict[str(job_path)]: + self._hash_dict[str(dependency_path)] = get_hash(dependency_path) + completed_paths.append(source_path) + for source_path in completed_paths: + del self._todo_dict[source_path] + + for job_path, job in jobs: # Wait the last jobs to finish + job.wait() + if job.returncode != 0: + error_paths.append((job_path, job.stdout.read() + job.stderr.read())) + else: + # Update hash if no error + for dependency_path in self._dependency_dict[str(job_path)]: + self._hash_dict[str(dependency_path)] = get_hash(dependency_path) + del self._todo_dict[job_path] + else: # Single-process + completed_paths: list[Path] = [] + for source_path, cmd in self._todo_dict.items(): + print(ConsoleColor.BLUE + cmd + ConsoleColor.ENDCOLOR) + job = subprocess.run(cmd, check=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE, + universal_newlines=True, shell=True) + if job.returncode != 0: + error_paths.append((source_path, job.stdout + job.stderr)) + break + # Update hash if no error + for dependency_path in self._dependency_dict[str(source_path)]: + self._hash_dict[str(dependency_path)] = get_hash(dependency_path) + completed_paths.append(source_path) + for source_path in completed_paths: + del self._todo_dict[source_path] + + if error_paths: + for error_path, error_text in error_paths: + print(ConsoleColor.RED + f'Error compiling {error_path}:\n' + ConsoleColor.ENDCOLOR + error_text) + with open(self._config.OBJECT_DIR / 'hash.json', 'w', encoding='utf-8') as hash_file: + hash_file.write(json.dumps(self._hash_dict, indent=1)) + sys.exit(1) + + # Running linking processes + if not self._config.BIN_DIR.exists(): + self._config.BIN_DIR.mkdir(parents=True) + all_app_objects = [self._config.OBJECT_DIR / Path(app_path).parent / (Path(app_path).stem + '.o') + for app_path in self._config.APPS] + if self._config.JOB_COUNT > 1: # Multi-process + jobs: list[tuple[Path, subprocess.Popen]] = [] + for app_path, app_object_path in zip(self._config.APPS, all_app_objects): + if app_path in self._config.IGNORE_APPS: + continue + bin_path = self._config.BIN_DIR / app_path + if not bin_path.parent.exists(): + bin_path.parent.mkdir(parents=True) + object_files = [str(object_path) for object_path in self._compile_dict.values() + if object_path not in all_app_objects] + object_files.append(str(app_object_path)) + cmd = ' '.join([self._config.CC, self._config.COMMON_FLAGS, *object_files, '-o', str(bin_path), + self._config.LINK_FLAGS]) + print(ConsoleColor.BLUE + cmd + ConsoleColor.ENDCOLOR) + jobs.insert(0, (app_path, subprocess.Popen( + cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True, + shell=True))) # FIFO style (will be poped) + if len(jobs) >= self._config.JOB_COUNT: # If jobs count is maxed we wait for the oldest one to finished + app_path, oldest_job = jobs.pop() + oldest_job.wait() + if oldest_job.returncode != 0: + error_paths.append((app_path, oldest_job.stdout.read() + oldest_job.stderr.read())) + break + for job_path, job in jobs: # Wait the last jobs to finish + job.wait() + if job.returncode != 0: + error_paths.append((job_path, job.stdout.read() + job.stderr.read())) + else: # Single-process + for app_path, app_object_path in zip(self._config.APPS, all_app_objects): + if app_path in self._config.IGNORE_APPS: + continue + bin_path = self._config.BIN_DIR / app_path + if not bin_path.parent.exists(): + bin_path.parent.mkdir(parents=True) + object_files = [str(object_path) for object_path in self._compile_dict.values() + if object_path not in all_app_objects] + object_files.append(str(app_object_path)) + cmd = ' '.join([self._config.CC, self._config.COMMON_FLAGS, *object_files, '-o', str(bin_path), + self._config.LINK_FLAGS]) + print(ConsoleColor.BLUE + cmd + ConsoleColor.ENDCOLOR) + job = subprocess.run(cmd, check=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE, + universal_newlines=True, shell=True) + if job.returncode != 0: + error_paths.append((app_path, job.stdout + job.stderr)) + if error_paths: + for error_path, error_text in error_paths: + print(ConsoleColor.RED + f'Error linking {error_path}:\n' + ConsoleColor.ENDCOLOR + error_text) + with open(os.path.join(self._config.OBJECT_DIR, 'hash.json'), 'w', encoding='utf-8') as hash_file: + hash_file.write(json.dumps(self._hash_dict, indent=1)) + sys.exit(1) + + with open(self._config.OBJECT_DIR / 'hash.json', 'w', encoding='utf-8') as hash_file: + hash_file.write(json.dumps(self._hash_dict, indent=1)) + + print(f'{ConsoleColor.GREEN}Compilation done{ConsoleColor.ENDCOLOR}') diff --git a/watch.py b/watch.py new file mode 100644 index 0000000..81c49b6 --- /dev/null +++ b/watch.py @@ -0,0 +1,151 @@ +import ctypes +import ctypes.util +from enum import IntFlag +import os +from pathlib import Path +import select +from struct import unpack, calcsize +from typing import Callable + + +class WatchFlag(IntFlag): + # Supported events suitable for MASK parameter of INOTIFY_ADD_WATCH. + ACCESS = 0x00000001 # File was accessed + MODIFY = 0x00000002 # File was modified + ATTRIB = 0x00000004 # Metadata changed + CLOSE_WRITE = 0x00000008 # Writtable file was closed + CLOSE_NOWRITE = 0x00000010 # Unwrittable file closed + # CLOSE = (CLOSE_WRITE | CLOSE_NOWRITE) # Close + OPEN = 0x00000020 # File was opened + MOVED_FROM = 0x00000040 # File was moved from X + MOVED_TO = 0x00000080 # File was moved to Y + # MOVE = (MOVED_FROM | MOVED_TO) # Moves + CREATE = 0x00000100 # Subfile was created + DELETE = 0x00000200 # Subfile was deleted + DELETE_SELF = 0x00000400 # Self was deleted + MOVE_SELF = 0x00000800 # Self was moved + + # Events sent by the kernel. + UNMOUNT = 0x00002000 # Backing fs was unmounted + Q_OVERFLOW = 0x00004000 # Event queued overflowed + IGNORED = 0x00008000 # File was ignored + + # Helper events. + # CLOSE = (CLOSE_WRITE | CLOSE_NOWRITE) # Close + # MOVE = (MOVED_FROM | MOVED_TO) # Moves + + # Special flags. + ONLYDIR = 0x01000000 # Only watch the path if it is a directory + DONT_FOLLOW = 0x02000000 # Do not follow a sym link + EXCL_UNLINK = 0x04000000 # Exclude events on unlinked objects + MASK_CREATE = 0x10000000 # Only create watches + MASK_ADD = 0x20000000 # Add to the mask of an already existing watch + ISDIR = 0x40000000 # Event occurred against dir + ONESHOT = 0x80000000 # Only send event once + + # All events which a program can wait on. + # ALL_EVENTS = ( + # ACCESS | MODIFY | ATTRIB | CLOSE_WRITE | CLOSE_NOWRITE | OPEN | MOVED_FROM | MOVED_TO + # | CREATE | DELETE | DELETE_SELF | MOVE_SELF) + + +class InotifyError(Exception): + def __init__(self, message, *args, **kwargs): + message += f' ERRNO=({ctypes.get_errno()})' + super().__init__(message, *args, **kwargs) + + +WatcherCallback = Callable[[Path, WatchFlag], None] + + +class Watcher: + _EVENT_FORMAT = 'iIII' + _EVENT_SIZE = calcsize(_EVENT_FORMAT) + + def __init__(self): + libc_path = ctypes.util.find_library('c') + if libc_path is None: + libc_path = 'libc.so.6' + self._instance = ctypes.cdll.LoadLibrary(libc_path) + + self._inotify_init: ctypes.CDLL._FuncPtr = self._instance.inotify_init + self._inotify_init.argtypes = [] + self._inotify_init.restype = self._check_nonnegative + + self._inotify_add_watch: ctypes.CDLL._FuncPtr = self._instance.inotify_add_watch + self._inotify_add_watch.argtypes = [ctypes.c_int, ctypes.c_char_p, ctypes.c_uint32] + self._inotify_add_watch.restype = self._check_nonnegative + + self._inotify_rm_watch: ctypes.CDLL._FuncPtr = self._instance.inotify_rm_watch + self._inotify_rm_watch.argtypes = [ctypes.c_int, ctypes.c_int] + self._inotify_rm_watch.restype = self._check_nonnegative + + self._inotify_fd = self._inotify_init() + + self._poll = select.poll() + self._poll.register(self._inotify_fd, select.POLLIN) + + self._watch_fds: dict[Path, int] = {} + self._watch_info: dict[int, tuple[Path, WatcherCallback]] = {} + + def register(self, path: Path, flags: WatchFlag, callback: WatcherCallback): + if path in self._watch_fds: + self._inotify_rm_watch(self._inotify_fd, self._watch_fds[path]) + + watch_fd = self._inotify_add_watch(self._inotify_fd, str(path).encode(), flags) + self._watch_fds[path] = watch_fd + self._watch_info[watch_fd] = (path, callback) + + def unregister(self, path: Path): + watch_fd = self._watch_fds[path] + self._inotify_rm_watch(self._inotify_fd, watch_fd) + del self._watch_fds[path] + del self._watch_info[watch_fd] + + def watch(self): + while True: + results = self._poll.poll() + for poll_fd, _event in results: + watch_buffer = os.read(poll_fd, self._EVENT_SIZE) + if len(watch_buffer) < self._EVENT_SIZE: + continue + watch_fd, mask, _cookie, _namesize = unpack(self._EVENT_FORMAT, watch_buffer) + watch_path, callback = self._watch_info[watch_fd] + callback(watch_path, WatchFlag(mask)) + + def __del__(self): + self._poll.unregister(self._inotify_fd) + for _watch_path, watch_fd in self._watch_fds.items(): + self._inotify_rm_watch(self._inotify_fd, watch_fd) + os.close(self._inotify_fd) + + @staticmethod + def _check_nonnegative(result): + if result == -1: + raise InotifyError(f'Call failed (should not be -1): {result}') + return result + + +def main(): + from argparse import ArgumentParser # pylint: disable=import-outside-toplevel + + parser = ArgumentParser() + parser.add_argument('paths', nargs='*', type=Path) + arguments = parser.parse_args() + + paths: list[Path] = arguments.paths + + def callback(path: Path, _flags: WatchFlag): + print(f'{path} has been modified') + + watcher = Watcher() + for path in paths: + watcher.register(path, WatchFlag.MODIFY, callback) + try: + watcher.watch() + except KeyboardInterrupt: + print('\rExit') + + +if __name__ == '__main__': + main()