Track known files

This commit is contained in:
Joscha 2019-10-20 21:40:50 +00:00
parent 189282a3f6
commit 58279c24e1
3 changed files with 107 additions and 6 deletions

View file

@ -3,6 +3,7 @@ import logging
from pathlib import Path from pathlib import Path
from typing import Any from typing import Any
from .colors import *
from .config import * from .config import *
from .explore import * from .explore import *
from .known_files import * from .known_files import *
@ -75,6 +76,11 @@ def run(args: Any) -> None:
if prompt_choice("[C]ontinue to the next file or [A]bort the program?", "Ca") == "a": if prompt_choice("[C]ontinue to the next file or [A]bort the program?", "Ca") == "a":
raise CatastrophicError("Aborted") raise CatastrophicError("Aborted")
for path in known_files.find_forgotten_files():
logger.info(f"The file {style_path(path)} is no longer known")
known_files.save_final()
def main() -> None: def main() -> None:
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument("-c", "--config-file") parser.add_argument("-c", "--config-file")

View file

@ -1,7 +1,7 @@
import json import json
import logging import logging
from pathlib import Path from pathlib import Path
from typing import Dict, List, Set from typing import Dict, List, Optional, Set
from .colors import * from .colors import *
from .util import * from .util import *
@ -22,6 +22,9 @@ class KnownFiles:
logger.debug(f"File {style_path(self._path)} does not exist, " logger.debug(f"File {style_path(self._path)} does not exist, "
"creating a new file on the first upcoming save") "creating a new file on the first upcoming save")
def _normalize_path(self, path: Path) -> Path:
return path.expanduser().resolve()
def _read_known_files(self, text: str) -> Dict[Path, str]: def _read_known_files(self, text: str) -> Dict[Path, str]:
known_files: Dict[Path, str] = {} known_files: Dict[Path, str] = {}
raw_known_files = json.loads(text) raw_known_files = json.loads(text)
@ -35,13 +38,26 @@ class KnownFiles:
if not isinstance(file_hash, str): if not isinstance(file_hash, str):
raise CatastrophicError(style_error(f"Hash {hash!r} at path {path!r} is not a string")) raise CatastrophicError(style_error(f"Hash {hash!r} at path {path!r} is not a string"))
path = Path(path).expanduser().resolve() # normalized path = self._normalize_path(Path(path))
known_files[path] = file_hash known_files[path] = file_hash
return known_files return known_files
def was_recently_modified(self, path: Path) -> bool:
return self._normalize_path(path) in self._new_known_files
def get_hash(self, path: Path) -> Optional[str]:
path = self._normalize_path(path)
h = self._new_known_files.get(path)
if h is None:
h = self._old_known_files.get(path)
return h
def update_file(self, path: Path, file_hash: str) -> None: def update_file(self, path: Path, file_hash: str) -> None:
self._new_known_files[path.expanduser().resolve()] = file_hash self._new_known_files[self._normalize_path(path)] = file_hash
def save_incremental(self) -> None: def save_incremental(self) -> None:
to_save: Dict[str, str] = {} to_save: Dict[str, str] = {}
@ -54,7 +70,12 @@ class KnownFiles:
self._save(json.dumps(to_save)) self._save(json.dumps(to_save))
logger.debug(f"Incremental save to {style_path(self._path)} completed") logger.debug(f"Incremental save to {style_path(self._path)} completed")
def find_lost_files(self) -> Set[Path]: def find_forgotten_files(self) -> Set[Path]:
"""
Finds all files which were not modified this round and thus
are no longer known (i. e. have been forgotten).
"""
return set(self._old_known_files.keys() - self._new_known_files.keys()) return set(self._old_known_files.keys() - self._new_known_files.keys())
def save_final(self) -> None: def save_final(self) -> None:

View file

@ -1,3 +1,4 @@
import hashlib
import logging import logging
import shutil import shutil
from pathlib import Path from pathlib import Path
@ -7,6 +8,7 @@ from .colors import *
from .config import * from .config import *
from .known_files import * from .known_files import *
from .parser import * from .parser import *
from .prompt import *
from .util import * from .util import *
__all__ = ["Processor"] __all__ = ["Processor"]
@ -29,13 +31,13 @@ class Processor:
self._process_file_with_header(path, header_path, config) self._process_file_with_header(path, header_path, config)
def _process_file_without_header(self, path: Path, config: Config) -> None: def _process_file_without_header(self, path: Path, config: Config) -> None:
logger.debug(f"Processing file {style_path(path)} with no header") logger.debug(f"Processing file {style_path(path)} without header")
try: try:
text = read_file(path) text = read_file(path)
except ReadFileException as e: except ReadFileException as e:
raise LessCatastrophicError( raise LessCatastrophicError(
style_error("Could not load file ") + style_error("Could not read file ") +
style_path(path) + f": {e}") style_path(path) + f": {e}")
header, lines = split_header_and_rest(text) header, lines = split_header_and_rest(text)
@ -83,15 +85,25 @@ class Processor:
for target in config.targets: for target in config.targets:
logger.info(f" -> {style_path(str(target))}") logger.info(f" -> {style_path(str(target))}")
if not self._justify_target(target):
logger.info("Skipping this target")
continue
try: try:
shutil.copy(path, target) shutil.copy(path, target)
except (IOError, shutil.SameFileError) as e: except (IOError, shutil.SameFileError) as e:
logger.warning(style_warning("Could not copy") + f": {e}") logger.warning(style_warning("Could not copy") + f": {e}")
self._update_known_hash(target)
def _process_parseable(self, lines: List[str], config: Config) -> None: def _process_parseable(self, lines: List[str], config: Config) -> None:
for target in config.targets: for target in config.targets:
logger.info(f" -> {style_path(str(target))}") logger.info(f" -> {style_path(str(target))}")
if not self._justify_target(target):
logger.info("Skipping this target")
continue
config_copy = config.copy() config_copy = config.copy()
config_copy.target = target config_copy.target = target
@ -117,3 +129,65 @@ class Processor:
except WriteFileException as e: except WriteFileException as e:
logger.warning(style_warning("Could not write to ") + style_path(str(target)) + logger.warning(style_warning("Could not write to ") + style_path(str(target)) +
f": {e}") f": {e}")
continue
self._update_known_hash(target)
def _obtain_hash(self, path: Path) -> Optional[str]:
BLOCK_SIZE = 2**16
try:
h = hashlib.sha256()
with open(path, "rb") as f:
while True:
block = f.read(BLOCK_SIZE)
if not block: break
h.update(block)
return h.hexdigest()
except IOError:
return None
def _justify_target(self, target: Path) -> bool:
if not target.exists():
return True
if not target.is_file():
logger.warning(style_warning("The target is a directory"))
return False
target_hash = self._obtain_hash(target)
if target_hash is None:
return prompt_yes_no("Overwriting a file that could not be hashed, continue?", False)
if self.known_files.was_recently_modified(target):
logger.warning(style_warning("This target was already overwritten earlier"))
return False
known_target_hash = self.known_files.get_hash(target)
if known_target_hash is None:
return prompt_yes_no("Overwriting an unknown file, continue?", False)
# The following condition is phrased awkwardly because I just
# feel better if the final statement in this function is not a
# 'return True'. After all, returning True here might cause
# loss of important configuration data.
if target_hash == known_target_hash:
# We're positive that this file hasn't changed since we've
# last seen it.
return True
return prompt_yes_no("Overwriting a file that was modified since it was last overwritten, continue?", False)
def _update_known_hash(self, target: Path) -> None:
target_hash = self._obtain_hash(target)
if target_hash is None:
raise LessCatastrophicError(
style_error("Could not obtain hash of target file ") +
style_path(target))
self.known_files.update_file(target, target_hash)
self.known_files.save_incremental()