diff --git a/evering/__main__.py b/evering/__main__.py index 01b6875..af4477a 100644 --- a/evering/__main__.py +++ b/evering/__main__.py @@ -3,6 +3,7 @@ import logging from pathlib import Path from typing import Any +from .colors import * from .config import * from .explore import * from .known_files import * @@ -75,6 +76,11 @@ def run(args: Any) -> None: if prompt_choice("[C]ontinue to the next file or [A]bort the program?", "Ca") == "a": raise CatastrophicError("Aborted") + for path in known_files.find_forgotten_files(): + logger.info(f"The file {style_path(path)} is no longer known") + + known_files.save_final() + def main() -> None: parser = argparse.ArgumentParser() parser.add_argument("-c", "--config-file") diff --git a/evering/known_files.py b/evering/known_files.py index 173c855..c8a3350 100644 --- a/evering/known_files.py +++ b/evering/known_files.py @@ -1,7 +1,7 @@ import json import logging from pathlib import Path -from typing import Dict, List, Set +from typing import Dict, List, Optional, Set from .colors import * from .util import * @@ -22,6 +22,9 @@ class KnownFiles: logger.debug(f"File {style_path(self._path)} does not exist, " "creating a new file on the first upcoming save") + def _normalize_path(self, path: Path) -> Path: + return path.expanduser().resolve() + def _read_known_files(self, text: str) -> Dict[Path, str]: known_files: Dict[Path, str] = {} raw_known_files = json.loads(text) @@ -35,13 +38,26 @@ class KnownFiles: if not isinstance(file_hash, str): raise CatastrophicError(style_error(f"Hash {hash!r} at path {path!r} is not a string")) - path = Path(path).expanduser().resolve() # normalized + path = self._normalize_path(Path(path)) known_files[path] = file_hash return known_files + def was_recently_modified(self, path: Path) -> bool: + return self._normalize_path(path) in self._new_known_files + + def get_hash(self, path: Path) -> Optional[str]: + path = self._normalize_path(path) + + h = self._new_known_files.get(path) + + if h is None: + h = self._old_known_files.get(path) + + return h + def update_file(self, path: Path, file_hash: str) -> None: - self._new_known_files[path.expanduser().resolve()] = file_hash + self._new_known_files[self._normalize_path(path)] = file_hash def save_incremental(self) -> None: to_save: Dict[str, str] = {} @@ -54,7 +70,12 @@ class KnownFiles: self._save(json.dumps(to_save)) logger.debug(f"Incremental save to {style_path(self._path)} completed") - def find_lost_files(self) -> Set[Path]: + def find_forgotten_files(self) -> Set[Path]: + """ + Finds all files which were not modified this round and thus + are no longer known (i. e. have been forgotten). + """ + return set(self._old_known_files.keys() - self._new_known_files.keys()) def save_final(self) -> None: diff --git a/evering/process.py b/evering/process.py index 514ae3f..47819c2 100644 --- a/evering/process.py +++ b/evering/process.py @@ -1,3 +1,4 @@ +import hashlib import logging import shutil from pathlib import Path @@ -7,6 +8,7 @@ from .colors import * from .config import * from .known_files import * from .parser import * +from .prompt import * from .util import * __all__ = ["Processor"] @@ -29,13 +31,13 @@ class Processor: self._process_file_with_header(path, header_path, config) def _process_file_without_header(self, path: Path, config: Config) -> None: - logger.debug(f"Processing file {style_path(path)} with no header") + logger.debug(f"Processing file {style_path(path)} without header") try: text = read_file(path) except ReadFileException as e: raise LessCatastrophicError( - style_error("Could not load file ") + + style_error("Could not read file ") + style_path(path) + f": {e}") header, lines = split_header_and_rest(text) @@ -83,15 +85,25 @@ class Processor: for target in config.targets: logger.info(f" -> {style_path(str(target))}") + if not self._justify_target(target): + logger.info("Skipping this target") + continue + try: shutil.copy(path, target) except (IOError, shutil.SameFileError) as e: logger.warning(style_warning("Could not copy") + f": {e}") + self._update_known_hash(target) + def _process_parseable(self, lines: List[str], config: Config) -> None: for target in config.targets: logger.info(f" -> {style_path(str(target))}") + if not self._justify_target(target): + logger.info("Skipping this target") + continue + config_copy = config.copy() config_copy.target = target @@ -117,3 +129,65 @@ class Processor: except WriteFileException as e: logger.warning(style_warning("Could not write to ") + style_path(str(target)) + f": {e}") + continue + + self._update_known_hash(target) + + def _obtain_hash(self, path: Path) -> Optional[str]: + BLOCK_SIZE = 2**16 + + try: + h = hashlib.sha256() + + with open(path, "rb") as f: + while True: + block = f.read(BLOCK_SIZE) + if not block: break + h.update(block) + + return h.hexdigest() + + except IOError: + return None + + def _justify_target(self, target: Path) -> bool: + if not target.exists(): + return True + + if not target.is_file(): + logger.warning(style_warning("The target is a directory")) + return False + + target_hash = self._obtain_hash(target) + if target_hash is None: + return prompt_yes_no("Overwriting a file that could not be hashed, continue?", False) + + if self.known_files.was_recently_modified(target): + logger.warning(style_warning("This target was already overwritten earlier")) + return False + + known_target_hash = self.known_files.get_hash(target) + if known_target_hash is None: + return prompt_yes_no("Overwriting an unknown file, continue?", False) + + # The following condition is phrased awkwardly because I just + # feel better if the final statement in this function is not a + # 'return True'. After all, returning True here might cause + # loss of important configuration data. + + if target_hash == known_target_hash: + # We're positive that this file hasn't changed since we've + # last seen it. + return True + + return prompt_yes_no("Overwriting a file that was modified since it was last overwritten, continue?", False) + + def _update_known_hash(self, target: Path) -> None: + target_hash = self._obtain_hash(target) + if target_hash is None: + raise LessCatastrophicError( + style_error("Could not obtain hash of target file ") + + style_path(target)) + + self.known_files.update_file(target, target_hash) + self.known_files.save_incremental()