Add Files module
This commit is contained in:
parent
1b5eb8c537
commit
bc5694dd64
6 changed files with 181 additions and 6 deletions
|
|
@ -1,7 +1,9 @@
|
|||
from .echo import Echo
|
||||
from .files import Files
|
||||
from .pacman import Pacman
|
||||
|
||||
__all__: list[str] = [
|
||||
"Echo",
|
||||
"Files",
|
||||
"Pacman",
|
||||
]
|
||||
|
|
|
|||
155
pasch/modules/files.py
Normal file
155
pasch/modules/files.py
Normal file
|
|
@ -0,0 +1,155 @@
|
|||
import hashlib
|
||||
import json
|
||||
import random
|
||||
import string
|
||||
from pathlib import Path
|
||||
|
||||
from rich.markup import escape
|
||||
|
||||
from pasch.orchestrator import Module, Orchestrator
|
||||
|
||||
|
||||
def random_tmp_path(path: Path) -> Path:
|
||||
prefix = "" if path.name.startswith(".") else "."
|
||||
suffix = random.sample(string.ascii_letters + string.digits, 6)
|
||||
name = f"{prefix}{path.name}.{suffix}~pasch"
|
||||
return path.with_name(name)
|
||||
|
||||
|
||||
def atomic_write(path: Path, content: bytes) -> None:
|
||||
tmp_path = random_tmp_path(path)
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
tmp_path.write_bytes(content)
|
||||
tmp_path.rename(path)
|
||||
|
||||
|
||||
def hash_data(data: bytes) -> str:
|
||||
m = hashlib.sha256()
|
||||
m.update(data)
|
||||
return f"sha256-{m.hexdigest()}"
|
||||
|
||||
|
||||
def hash_file(path: Path) -> str | None:
|
||||
try:
|
||||
data = path.read_bytes()
|
||||
except FileNotFoundError:
|
||||
return None
|
||||
return hash_data(data)
|
||||
|
||||
|
||||
def path_to_str(path: Path) -> str:
|
||||
return str(path.resolve())
|
||||
|
||||
|
||||
class FileDb:
|
||||
def __init__(self, path: Path) -> None:
|
||||
self._path = path
|
||||
|
||||
def _load(self) -> dict[str, str]:
|
||||
try:
|
||||
text = self._path.read_text(encoding="utf-8")
|
||||
except FileNotFoundError:
|
||||
return {}
|
||||
|
||||
data = json.loads(text)
|
||||
if type(data) is not dict:
|
||||
raise ValueError("file db is not a dict")
|
||||
for k, v in data.items():
|
||||
if type(v) is not str:
|
||||
raise ValueError(f"file db contains non-str hash at key {k!r}")
|
||||
return data
|
||||
|
||||
def _save(self, data: dict[str, str]) -> None:
|
||||
atomic_write(self._path, json.dumps(data).encode("utf-8"))
|
||||
|
||||
def add_hash(self, path: Path, hash: str) -> None:
|
||||
data = self._load()
|
||||
data[path_to_str(path)] = hash
|
||||
self._save(data)
|
||||
|
||||
def remove_hash(self, path: Path) -> None:
|
||||
data = self._load()
|
||||
data.pop(path_to_str(path), None)
|
||||
self._save(data)
|
||||
|
||||
def verify_hash(self, path: Path, cur_hash: str | None) -> str | None:
|
||||
cur_hash = hash_file(path)
|
||||
if cur_hash is None:
|
||||
return
|
||||
known_hash = self._load().get(path_to_str(path))
|
||||
if known_hash is None:
|
||||
return "File unknown and contents don't match target state."
|
||||
if known_hash != cur_hash:
|
||||
return "File contents don't match the last known or target state."
|
||||
|
||||
def paths(self) -> list[str]:
|
||||
return list(sorted(self._load().keys()))
|
||||
|
||||
|
||||
class Files(Module):
|
||||
def __init__(
|
||||
self,
|
||||
orchestrator: Orchestrator,
|
||||
file_db_name: str = "files.json",
|
||||
root: Path | None = None,
|
||||
) -> None:
|
||||
super().__init__(orchestrator)
|
||||
self._files: dict[str, bytes] = {}
|
||||
self._file_db = FileDb(self.orchestrator.state_dir / file_db_name)
|
||||
self._root = root or Path.home()
|
||||
|
||||
def _read_path(self, path: Path | str) -> Path:
|
||||
return self._root / path
|
||||
|
||||
def add(self, path: Path | str, content: bytes) -> None:
|
||||
path = self._read_path(path)
|
||||
self._files[path_to_str(path)] = content
|
||||
|
||||
def realize(self) -> None:
|
||||
for path, content in sorted(self._files.items()):
|
||||
self._write_file(self._read_path(path), content)
|
||||
|
||||
for path in self._file_db.paths():
|
||||
if path not in self._files:
|
||||
self._remove_file(self._read_path(path))
|
||||
|
||||
def _write_file(self, path: Path, content: bytes) -> None:
|
||||
cur_hash = hash_file(path)
|
||||
target_hash = hash_data(content)
|
||||
if cur_hash == target_hash:
|
||||
return
|
||||
|
||||
relative_path = path.relative_to(self._root, walk_up=True)
|
||||
self.c.print(f"[bold green]+[/] {escape(str(relative_path))}", highlight=False)
|
||||
|
||||
if reason := self._file_db.verify_hash(path, cur_hash):
|
||||
self.c.print(f"[red]Error:[/] {escape(reason)}", highlight=False)
|
||||
|
||||
# We want to avoid scenarios where we fail to remember a file we've
|
||||
# written. It is better to remember a file with an incorrect hash than
|
||||
# to forget it entirely. Thus, we must always update the file db before
|
||||
# we write a file.
|
||||
self._file_db.add_hash(path, target_hash)
|
||||
atomic_write(path, content)
|
||||
|
||||
def _remove_file(self, path: Path) -> None:
|
||||
relative_path = path.relative_to(self._root, walk_up=True)
|
||||
self.c.print(f"[bold red]-[/] {escape(str(relative_path))}", highlight=False)
|
||||
|
||||
cur_hash = hash_file(path)
|
||||
if reason := self._file_db.verify_hash(path, cur_hash):
|
||||
self.c.print(f"[red]Error:[/] {escape(reason)}", highlight=False)
|
||||
|
||||
try:
|
||||
path.unlink()
|
||||
except FileNotFoundError:
|
||||
pass
|
||||
for parent in path.parents:
|
||||
try:
|
||||
parent.rmdir()
|
||||
except:
|
||||
break
|
||||
|
||||
# We want to avoid scenarios where we forget a file without actually
|
||||
# removing it. Thus, the db must be updated after the removal.
|
||||
self._file_db.remove_hash(path)
|
||||
|
|
@ -1,7 +1,6 @@
|
|||
from dataclasses import dataclass, field
|
||||
from subprocess import CalledProcessError
|
||||
|
||||
from rich import print
|
||||
from rich.markup import escape
|
||||
|
||||
from pasch.cmd import run_capture, run_execute
|
||||
|
|
@ -37,9 +36,9 @@ class Pacman(Module):
|
|||
to_uninstall = installed - target
|
||||
|
||||
for package in sorted(to_install):
|
||||
print(f"[bold green]+[/] {escape(package)}")
|
||||
self.c.print(f"[bold green]+[/] {escape(package)}")
|
||||
for package in sorted(to_uninstall):
|
||||
print(f"[bold red]-[/] {escape(package)}")
|
||||
self.c.print(f"[bold red]-[/] {escape(package)}")
|
||||
|
||||
self._install_packages(to_install)
|
||||
self._uninstall_packages(to_uninstall)
|
||||
|
|
|
|||
|
|
@ -3,21 +3,27 @@ from __future__ import annotations
|
|||
from abc import ABC, abstractmethod
|
||||
|
||||
from rich import print
|
||||
from rich.console import Console
|
||||
from rich.markup import escape
|
||||
from xdg_base_dirs import xdg_state_home
|
||||
|
||||
|
||||
class Module(ABC):
|
||||
def __init__(self, orchestrator: Orchestrator) -> None:
|
||||
self.orchestrator = orchestrator
|
||||
self.orchestrator.register(self)
|
||||
self.c = self.orchestrator.console
|
||||
|
||||
@abstractmethod
|
||||
def realize(self) -> None: ...
|
||||
|
||||
|
||||
class Orchestrator:
|
||||
def __init__(self, dry_run: bool = False) -> None:
|
||||
def __init__(self, name: str = "pasch", dry_run: bool = False) -> None:
|
||||
self.name = name
|
||||
self.dry_run = dry_run
|
||||
self.state_dir = xdg_state_home() / self.name
|
||||
self.console = Console(highlight=False)
|
||||
|
||||
self._frozen: bool = False
|
||||
self._modules: list[Module] = []
|
||||
|
|
|
|||
|
|
@ -4,7 +4,7 @@ version = "0.0.0"
|
|||
description = "Python-based Arch System Config Helper"
|
||||
readme = "README.md"
|
||||
requires-python = ">=3.13"
|
||||
dependencies = ["rich>=14.1.0"]
|
||||
dependencies = ["rich>=14.1.0", "xdg-base-dirs>=6.0.2"]
|
||||
|
||||
[tool.ruff.lint]
|
||||
select = ["RUF", "F"]
|
||||
|
|
|
|||
15
uv.lock
generated
15
uv.lock
generated
|
|
@ -29,10 +29,14 @@ version = "0.0.0"
|
|||
source = { virtual = "." }
|
||||
dependencies = [
|
||||
{ name = "rich" },
|
||||
{ name = "xdg-base-dirs" },
|
||||
]
|
||||
|
||||
[package.metadata]
|
||||
requires-dist = [{ name = "rich", specifier = ">=14.1.0" }]
|
||||
requires-dist = [
|
||||
{ name = "rich", specifier = ">=14.1.0" },
|
||||
{ name = "xdg-base-dirs", specifier = ">=6.0.2" },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "pygments"
|
||||
|
|
@ -55,3 +59,12 @@ sdist = { url = "https://files.pythonhosted.org/packages/fe/75/af448d8e52bf1d8fa
|
|||
wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/e3/30/3c4d035596d3cf444529e0b2953ad0466f6049528a879d27534700580395/rich-14.1.0-py3-none-any.whl", hash = "sha256:536f5f1785986d6dbdea3c75205c473f970777b4a0d6c6dd1b696aa05a3fa04f", size = 243368, upload-time = "2025-07-25T07:32:56.73Z" },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "xdg-base-dirs"
|
||||
version = "6.0.2"
|
||||
source = { registry = "https://pypi.org/simple" }
|
||||
sdist = { url = "https://files.pythonhosted.org/packages/bf/d0/bbe05a15347538aaf9fa5b51ac3b97075dfb834931fcb77d81fbdb69e8f6/xdg_base_dirs-6.0.2.tar.gz", hash = "sha256:950504e14d27cf3c9cb37744680a43bf0ac42efefc4ef4acf98dc736cab2bced", size = 4085, upload-time = "2024-10-19T14:35:08.114Z" }
|
||||
wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/fc/03/030b47fd46b60fc87af548e57ff59c2ca84b2a1dadbe721bb0ce33896b2e/xdg_base_dirs-6.0.2-py3-none-any.whl", hash = "sha256:3c01d1b758ed4ace150ac960ac0bd13ce4542b9e2cdf01312dcda5012cfebabe", size = 4747, upload-time = "2024-10-19T14:35:05.931Z" },
|
||||
]
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue